Chalk SQLAlchemy Redshift

0.8.15.dev0 · active · verified Thu Apr 16

chalk-sqlalchemy-redshift is an Amazon Redshift Dialect for SQLAlchemy, maintained by Chalk. It extends SQLAlchemy to allow seamless interaction with Redshift databases, providing specific type mappings and optimizations for Redshift's nuances. The current version is 0.8.15.dev0, and it generally follows a minor version release cadence driven by Redshift feature updates or SQLAlchemy compatibility changes.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to establish a connection to an Amazon Redshift cluster using `chalk-sqlalchemy-redshift` and execute a simple query. It relies on environment variables for sensitive credentials and the `psycopg2-binary` driver. Ensure you have `sqlalchemy` and `psycopg2-binary` installed.

import os
from sqlalchemy import create_engine, text
from sqlalchemy.exc import OperationalError, ProgrammingError

# Get connection details from environment variables for security
REDSHIFT_USER = os.environ.get('REDSHIFT_USER', 'your_user')
REDSHIFT_PASSWORD = os.environ.get('REDSHIFT_PASSWORD', 'your_password')
REDSHIFT_HOST = os.environ.get('REDSHIFT_HOST', 'your-redshift-cluster.abcdef123456.us-east-1.redshift.amazonaws.com')
REDSHIFT_PORT = os.environ.get('REDSHIFT_PORT', '5439')
REDSHIFT_DB = os.environ.get('REDSHIFT_DB', 'dev')

# Construct the Redshift connection string using psycopg2 driver
# The 'chalk-sqlalchemy-redshift' package registers the 'redshift+psycopg2' dialect
connection_string = (
    f"redshift+psycopg2://{REDSHIFT_USER}:{REDSHIFT_PASSWORD}"
    f"@{REDSHIFT_HOST}:{REDSHIFT_PORT}/{REDSHIFT_DB}"
)

if REDSHIFT_USER == 'your_user':
    print("Please set REDSHIFT_USER, REDSHIFT_PASSWORD, REDSHIFT_HOST, REDSHIFT_PORT, and REDSHIFT_DB environment variables.")
    print("Skipping quickstart execution.")
else:
    try:
        engine = create_engine(connection_string)

        with engine.connect() as connection:
            # Example: Execute a simple query
            result = connection.execute(text("SELECT 1 AS test_column")).scalar()
            print(f"Successfully connected to Redshift. Query result: {result}")

            # Example: Fetching data
            data_result = connection.execute(text("SELECT current_database(), current_user()")).fetchall()
            print(f"Current database: {data_result[0][0]}, Current user: {data_result[0][1]}")

    except (OperationalError, ProgrammingError) as e:
        print(f"Error connecting to Redshift or executing query: {e}")
        print("Please ensure environment variables are set correctly, the Redshift cluster is accessible,")
        print("and that `psycopg2-binary` is installed (`pip install psycopg2-binary`).")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

view raw JSON →