OpenTelemetry AsyncPG Instrumentation
This library provides OpenTelemetry tracing instrumentation for the `asyncpg` PostgreSQL driver. It allows for automatic capturing of database queries as spans, providing visibility into asynchronous PostgreSQL operations within an OpenTelemetry-instrumented Python application. As part of the `opentelemetry-python-contrib` repository, it often receives frequent beta releases.
Warnings
- gotcha This instrumentation is currently in beta. While generally stable, its API and behavior might undergo minor changes in future pre-releases before reaching a stable (1.0) version.
- breaking Iterating over `asyncpg.Cursor` can generate an excessive number of spans, with one span per row fetched via `CursorIterator.__anext__`. This can lead to very large traces and potential performance overhead.
- gotcha The `AsyncPGInstrumentor().instrument()` call must occur early in your application's lifecycle, specifically before any `asyncpg` connections are established or its modules are significantly used. If `asyncpg` functions are called before instrumentation, those calls will not be traced.
- gotcha The `opentelemetry-instrumentation-asyncpg` library requires Python 3.9 or higher. Attempting to use it with older Python versions will result in compatibility errors.
Install
-
pip install opentelemetry-instrumentation-asyncpg -
pip install asyncpg opentelemetry-sdk opentelemetry-exporter-console
Imports
- AsyncPGInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
Quickstart
import asyncio
import os
import asyncpg
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
# 1. Setup OpenTelemetry TracerProvider
resource = Resource.create({"service.name": "asyncpg-app"})
tracer_provider = TracerProvider(resource=resource)
span_processor = BatchSpanProcessor(ConsoleSpanExporter())
tracer_provider.add_span_processor(span_processor)
trace.set_tracer_provider(tracer_provider)
# 2. Instrument asyncpg
AsyncPGInstrumentor().instrument()
async def main():
# Database connection details from environment variables or defaults
user = os.environ.get('POSTGRES_USER', 'user')
password = os.environ.get('POSTGRES_PASSWORD', 'password')
database = os.environ.get('POSTGRES_DB', 'database')
host = os.environ.get('POSTGRES_HOST', 'localhost')
port = os.environ.get('POSTGRES_PORT', '5432')
# Ensure asyncpg is available before attempting connection
try:
conn = await asyncpg.connect(
user=user, password=password, database=database, host=host, port=port
)
print(f"Connected to PostgreSQL at {host}:{port}/{database}")
# Perform a database operation that will be traced
result = await conn.fetchval("SELECT 42 as my_value;")
print(f"Query result: {result}")
await conn.close()
print("Connection closed.")
except Exception as e:
print(f"Failed to connect or query PostgreSQL: {e}")
print("Ensure a PostgreSQL instance is running and accessible (e.g., via Docker):")
print("docker run -e POSTGRES_USER=user -e POSTGRES_PASSWORD=password -e POSTGRES_DB=database -p 5432:5432 postgres")
if __name__ == "__main__":
# Run the asynchronous main function
asyncio.run(main())
# Optional: Flush spans before exiting for console exporter
# For real applications, use a proper exporter like OTLPSpanExporter
# and ensure processes have time to send data.