OpenTelemetry Marqo Instrumentation
This library provides OpenTelemetry instrumentation for the Marqo vector database Python client. It enables automatic tracing of client-side calls to Marqo, helping developers gain observability into their LLM and vector search applications. Part of the larger OpenLLMetry project, it adheres to OpenTelemetry semantic conventions for LLM and vector DB operations. The library is actively maintained, with frequent releases (multiple per month) to keep up with updates to semantic conventions and the instrumented libraries.
Warnings
- breaking The OpenTelemetry GenAI semantic conventions are actively evolving. Recent versions (e.g., 0.53.4 to 0.58.0) have seen significant updates to attribute names and structures for LLM-related telemetry, including vector DB interactions. While changes are additive and backward-compatible at the API level, the actual telemetry data (span attributes) may change, requiring adjustments in your observability backend queries or dashboards. [cite: 0.58.0 release, 0.57.0 release, 0.55.0 release, 0.54.0 release, 0.53.4 release, 8, 31]
- gotcha For programmatic instrumentation, the `MarqoInstrumentor().instrument()` call must be made *before* the `marqo` library or its client is imported or instantiated in your application code. If `marqo` is imported first, the instrumentation might not apply correctly.
- gotcha Like many LLM/VectorDB instrumentations, `opentelemetry-instrumentation-marqo` (especially when used with the broader Traceloop SDK) may capture prompts, responses, or document content by default. This data could contain sensitive or personally identifiable information (PII). While the `openllmetry` repository states telemetry is only collected in the SDK, specific instrumentations can still log content to spans.
- gotcha When deploying Python applications with multi-process web servers (e.g., Gunicorn with `workers > 1`), OpenTelemetry Python's automatic instrumentation (especially for metrics) can exhibit issues due to the forking model. This can lead to incomplete or incorrect telemetry data.
Install
-
pip install opentelemetry-instrumentation-marqo marqo
Imports
- MarqoInstrumentor
from opentelemetry.instrumentation.marqo import MarqoInstrumentor
Quickstart
import os
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor
from opentelemetry.trace import set_tracer_provider
# Import the Marqo instrumentation
from opentelemetry.instrumentation.marqo import MarqoInstrumentor
# --- OpenTelemetry Setup (Typical for any OTel Python app) ---
# Set up a TracerProvider
resource = Resource.create({"service.name": "my-marqo-app"})
tracer_provider = TracerProvider(resource=resource)
# Configure a SpanProcessor to export spans to the console
# In a real application, you would use an OTLPSpanExporter, JaegerExporter, etc.
console_exporter = ConsoleSpanExporter()
span_processor = BatchSpanProcessor(console_exporter)
tracer_provider.add_span_processor(span_processor)
# Set the global TracerProvider
set_tracer_provider(tracer_provider)
# Get a tracer for manual spans if needed
tracer = trace.get_tracer(__name__)
# --- Marqo Instrumentation Setup ---
# Instrument Marqo. This should ideally happen before 'marqo' is imported
# or its client is instantiated if MarqoInstrumentor().instrument() is called.
# For this example, we mock Marqo to ensure it's runnable without a live instance.
# Mock Marqo client for demonstration purposes
class MockMarqoClient:
def index(self, index_name):
return MockMarqoIndex(index_name)
class MockMarqoIndex:
def __init__(self, index_name):
self.index_name = index_name
def add_documents(self, documents, tensor_fields, client_batch_size=50):
with tracer.start_as_current_span(f"marqo.index.add_documents: {self.index_name}"):
print(f"Mock Marqo: Adding {len(documents)} documents to index '{self.index_name}'")
# Simulate Marqo client operation
return {"items": [{"_id": f"doc{i}"} for i in range(len(documents))]}
def search(self, q, searchable_attributes=None, limit=5):
with tracer.start_as_current_span(f"marqo.index.search: {self.index_name}"):
print(f"Mock Marqo: Searching for '{q}' in index '{self.index_name}'")
# Simulate Marqo client operation
return {"hits": [{"_id": "mock_doc_1", "_score": 0.9}, {"_id": "mock_doc_2", "_score": 0.8}]}}
# Enable Marqo instrumentation
# Note: In a real app, MarqoInstrumentor().instrument() should be called
# before you import the actual 'marqo' client if using programmatically.
MarqoInstrumentor().instrument()
# Simulate using the Marqo client
# If Marqo client was imported before instrumentation, you might need to re-import or defer client creation.
marqo_client = MockMarqoClient() # In a real app: mq.Client(url="http://localhost:8882")
# Perform Marqo operations
my_index = marqo_client.index("my_test_index")
my_index.add_documents(
documents=[
{"text": "hello world"},
{"text": "another document"}
],
tensor_fields=["text"]
)
my_index.search(q="world")
print("Marqo operations simulated with OpenTelemetry instrumentation.")
# Ensure all spans are processed before exiting
tracer_provider.shutdown()