OpenTelemetry Marqo Instrumentation

0.58.0 · active · verified Fri Apr 10

This library provides OpenTelemetry instrumentation for the Marqo vector database Python client. It enables automatic tracing of client-side calls to Marqo, helping developers gain observability into their LLM and vector search applications. Part of the larger OpenLLMetry project, it adheres to OpenTelemetry semantic conventions for LLM and vector DB operations. The library is actively maintained, with frequent releases (multiple per month) to keep up with updates to semantic conventions and the instrumented libraries.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up OpenTelemetry with the Marqo instrumentation. It configures a console exporter to print trace data directly to the terminal, enables the Marqo instrumentor, and then simulates basic Marqo client operations. The Marqo client is mocked to make the example runnable without requiring a live Marqo instance. In a production environment, you would replace `ConsoleSpanExporter` with an appropriate exporter (e.g., `OTLPSpanExporter`) and use the actual `marqo.Client`.

import os
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor
from opentelemetry.trace import set_tracer_provider

# Import the Marqo instrumentation
from opentelemetry.instrumentation.marqo import MarqoInstrumentor

# --- OpenTelemetry Setup (Typical for any OTel Python app) ---
# Set up a TracerProvider
resource = Resource.create({"service.name": "my-marqo-app"})
tracer_provider = TracerProvider(resource=resource)

# Configure a SpanProcessor to export spans to the console
# In a real application, you would use an OTLPSpanExporter, JaegerExporter, etc.
console_exporter = ConsoleSpanExporter()
span_processor = BatchSpanProcessor(console_exporter)
tracer_provider.add_span_processor(span_processor)

# Set the global TracerProvider
set_tracer_provider(tracer_provider)

# Get a tracer for manual spans if needed
tracer = trace.get_tracer(__name__)

# --- Marqo Instrumentation Setup ---
# Instrument Marqo. This should ideally happen before 'marqo' is imported
# or its client is instantiated if MarqoInstrumentor().instrument() is called.
# For this example, we mock Marqo to ensure it's runnable without a live instance.

# Mock Marqo client for demonstration purposes
class MockMarqoClient:
    def index(self, index_name):
        return MockMarqoIndex(index_name)

class MockMarqoIndex:
    def __init__(self, index_name):
        self.index_name = index_name

    def add_documents(self, documents, tensor_fields, client_batch_size=50):
        with tracer.start_as_current_span(f"marqo.index.add_documents: {self.index_name}"):
            print(f"Mock Marqo: Adding {len(documents)} documents to index '{self.index_name}'")
            # Simulate Marqo client operation
            return {"items": [{"_id": f"doc{i}"} for i in range(len(documents))]}

    def search(self, q, searchable_attributes=None, limit=5):
        with tracer.start_as_current_span(f"marqo.index.search: {self.index_name}"):
            print(f"Mock Marqo: Searching for '{q}' in index '{self.index_name}'")
            # Simulate Marqo client operation
            return {"hits": [{"_id": "mock_doc_1", "_score": 0.9}, {"_id": "mock_doc_2", "_score": 0.8}]}}

# Enable Marqo instrumentation
# Note: In a real app, MarqoInstrumentor().instrument() should be called
# before you import the actual 'marqo' client if using programmatically.
MarqoInstrumentor().instrument()

# Simulate using the Marqo client
# If Marqo client was imported before instrumentation, you might need to re-import or defer client creation.
marqo_client = MockMarqoClient() # In a real app: mq.Client(url="http://localhost:8882")

# Perform Marqo operations
my_index = marqo_client.index("my_test_index")
my_index.add_documents(
    documents=[
        {"text": "hello world"},
        {"text": "another document"}
    ],
    tensor_fields=["text"]
)

my_index.search(q="world")

print("Marqo operations simulated with OpenTelemetry instrumentation.")

# Ensure all spans are processed before exiting
tracer_provider.shutdown()

view raw JSON →