Asynchronous ClickHouse Driver (asynch)

0.3.1 · active · verified Fri Apr 10

asynch is an asynchronous ClickHouse Python driver with native TCP interface support. It reuses many features from `clickhouse-driver` and adheres to PEP249. Currently at version 0.3.1, it undergoes active development with a focus on providing a robust asyncio-compatible interface for ClickHouse interactions.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to establish a connection to a ClickHouse server, create a table, insert data, and fetch results using `asynch`. It showcases the use of `async with` for managing connections and cursors, including `DictCursor` for dictionary-like row access. Environment variables are used for connection details for security.

import asyncio
from asynch import Connection
from asynch.cursors import DictCursor
import os

async def main():
    # Connect using DSN parameters
    async with Connection(
        user=os.environ.get('CH_USER', 'default'),
        password=os.environ.get('CH_PASSWORD', ''),
        host=os.environ.get('CH_HOST', '127.0.0.1'),
        port=int(os.environ.get('CH_PORT', 9000)),
        database=os.environ.get('CH_DATABASE', 'default'),
    ) as conn:
        print(f"Connected to ClickHouse: {conn.opened}")

        # Create a table
        async with conn.cursor() as cursor:
            await cursor.execute(
                "CREATE TABLE IF NOT EXISTS test_table (id UInt64, value String) ENGINE = MergeTree ORDER BY id"
            )
            print("Table 'test_table' created or already exists.")

            # Insert data
            await cursor.execute(
                "INSERT INTO test_table (id, value) VALUES",
                [[1, 'hello'], [2, 'world']]
            )
            print("Data inserted.")

        # Select data using DictCursor
        async with conn.cursor(cursor=DictCursor) as cursor:
            await cursor.execute("SELECT id, value FROM test_table ORDER BY id")
            result = await cursor.fetchall()
            print(f"Fetched data: {result}")
            
        # Clean up (optional)
        async with conn.cursor() as cursor:
            await cursor.execute("DROP TABLE IF EXISTS test_table")
            print("Table 'test_table' dropped.")

if __name__ == "__main__":
    # Example environment variables (replace with your ClickHouse instance details)
    # os.environ['CH_USER'] = 'my_user'
    # os.environ['CH_PASSWORD'] = 'my_password'
    # os.environ['CH_HOST'] = 'localhost'
    # os.environ['CH_PORT'] = '9000'
    # os.environ['CH_DATABASE'] = 'my_db'
    
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"An error occurred: {e}")

view raw JSON →