pgcopy

1.6.2 · active · verified Thu Apr 16

pgcopy is a Python library designed for fast data loading into PostgreSQL databases leveraging the PostgreSQL binary COPY protocol. It supports various database adaptors like psycopg2, psycopg, pg8000, and PyGreSQL, and handles a wide range of PostgreSQL data types, including arrays. The library is currently at version 1.6.2 and focuses on efficient bulk data insertion.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to connect to a PostgreSQL database using `psycopg` (or `psycopg2`), create a `CopyManager` instance, and efficiently insert multiple records using the `copy` method. It emphasizes the importance of committing the transaction after the copy operation. Environment variables are used for database credentials for security and flexibility.

import os
from datetime import datetime
from pgcopy import CopyManager
import psycopg

# Ensure you have a running PostgreSQL instance and a 'weather_db' database
# You might need to create the database and a 'measurements_table'
# For example, using `psql -c 'CREATE DATABASE weather_db;'`
# And inside weather_db: `CREATE TABLE measurements_table (id INT, timestamp TIMESTAMP, location VARCHAR(255), temperature NUMERIC);`

db_name = os.environ.get('PG_DB_NAME', 'weather_db')
db_user = os.environ.get('PG_DB_USER', 'postgres')
db_password = os.environ.get('PG_DB_PASSWORD', '')
db_host = os.environ.get('PG_DB_HOST', 'localhost')
db_port = os.environ.get('PG_DB_PORT', '5432')

conn_string = f"dbname={db_name} user={db_user} password={db_password} host={db_host} port={db_port}"

try:
    conn = psycopg.connect(conn_string)
    print(f"Connected to database: {db_name}")
    cursor = conn.cursor()

    # Define columns and records
    cols = ('id', 'timestamp', 'location', 'temperature')
    now = datetime.now()
    records = [
        (0, now, 'Jerusalem', 72.2),
        (1, now, 'New York', 75.6),
        (2, now, 'Moscow', 54.3),
    ]

    # Use CopyManager for fast insertion
    mgr = CopyManager(conn, 'measurements_table', cols)
    mgr.copy(records)
    conn.commit()
    print("Records copied successfully.")

    # Verify data (optional)
    cursor.execute("SELECT * FROM measurements_table ORDER BY id;")
    print("\nData in table after copy:")
    for row in cursor.fetchall():
        print(row)

finally:
    if conn:
        cursor.close()
        conn.close()
        print("Database connection closed.")

view raw JSON →