Asynchronous ClickHouse Driver (asynch)
asynch is an asynchronous ClickHouse Python driver with native TCP interface support. It reuses many features from `clickhouse-driver` and adheres to PEP249. Currently at version 0.3.1, it undergoes active development with a focus on providing a robust asyncio-compatible interface for ClickHouse interactions.
Warnings
- breaking The top-level `asynch.connect()` function has been removed. Direct instantiation and usage of `Connection` within an `async with` statement is now required.
- breaking The `Connection.connected` property, used to check connection status, has been renamed.
- breaking The `asynch.pool.create_pool()` function has been removed. Connection pooling should now be managed directly using the `Pool` class.
- gotcha Always use `async with Connection(...)` and `async with conn.cursor(...)` to ensure connections and cursors are properly managed and closed. Failing to do so can lead to resource leaks and unclosed connections, especially under heavy load.
- gotcha Blocking the asyncio event loop with synchronous I/O or long-running CPU-bound tasks within an async function can severely degrade performance and responsiveness of your application. Ensure all I/O operations are truly asynchronous or offloaded to a thread/process pool.
Install
-
pip install asynch -
pip install asynch[compression]
Imports
- Connection
from asynch import Connection
- Pool
from asynch import Pool
- Cursor
from asynch.cursors import Cursor
- DictCursor
from asynch.cursors import DictCursor
- connect
from asynch import Connection # Use Connection class directly
- create_pool
from asynch import Pool # Use Pool class directly
Quickstart
import asyncio
from asynch import Connection
from asynch.cursors import DictCursor
import os
async def main():
# Connect using DSN parameters
async with Connection(
user=os.environ.get('CH_USER', 'default'),
password=os.environ.get('CH_PASSWORD', ''),
host=os.environ.get('CH_HOST', '127.0.0.1'),
port=int(os.environ.get('CH_PORT', 9000)),
database=os.environ.get('CH_DATABASE', 'default'),
) as conn:
print(f"Connected to ClickHouse: {conn.opened}")
# Create a table
async with conn.cursor() as cursor:
await cursor.execute(
"CREATE TABLE IF NOT EXISTS test_table (id UInt64, value String) ENGINE = MergeTree ORDER BY id"
)
print("Table 'test_table' created or already exists.")
# Insert data
await cursor.execute(
"INSERT INTO test_table (id, value) VALUES",
[[1, 'hello'], [2, 'world']]
)
print("Data inserted.")
# Select data using DictCursor
async with conn.cursor(cursor=DictCursor) as cursor:
await cursor.execute("SELECT id, value FROM test_table ORDER BY id")
result = await cursor.fetchall()
print(f"Fetched data: {result}")
# Clean up (optional)
async with conn.cursor() as cursor:
await cursor.execute("DROP TABLE IF EXISTS test_table")
print("Table 'test_table' dropped.")
if __name__ == "__main__":
# Example environment variables (replace with your ClickHouse instance details)
# os.environ['CH_USER'] = 'my_user'
# os.environ['CH_PASSWORD'] = 'my_password'
# os.environ['CH_HOST'] = 'localhost'
# os.environ['CH_PORT'] = '9000'
# os.environ['CH_DATABASE'] = 'my_db'
try:
asyncio.run(main())
except Exception as e:
print(f"An error occurred: {e}")