Python Schema Registry Client

2.6.1 · active · verified Mon Apr 13

A Python client to interact with the Confluent Schema Registry's REST API. It allows managing Avro and JSON schemas, including registering new schemas, retrieving existing ones by ID or subject, and performing compatibility checks. The library is currently at version 2.6.1 and is actively maintained, with releases typically tied to feature enhancements and bug fixes.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize the `SchemaRegistryClient`, define an Avro schema, register it with the Schema Registry, and then retrieve it by its ID and subject name. Ensure a Confluent Schema Registry instance is running and accessible at the specified `SCHEMA_REGISTRY_URL` (default: http://localhost:8081).

import os
import asyncio
from schema_registry.client import SchemaRegistryClient
from schema_registry.client.schema import AvroSchema

SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:8081')

async def main():
    client = SchemaRegistryClient(url=SCHEMA_REGISTRY_URL)

    avro_schema_definition = {
        "type": "record",
        "namespace": "com.example",
        "name": "SensorReading",
        "fields": [
            {"name": "id", "type": "string"},
            {"name": "value", "type": "int"}
        ]
    }
    
    # Create an AvroSchema object
    avro_schema = AvroSchema(avro_schema_definition)

    # Define a subject name
    subject = "sensor-readings-value"

    try:
        # Register the schema
        registered_schema = client.register(subject, avro_schema)
        print(f"Schema registered with ID: {registered_schema.schema_id}")

        # Get the schema by ID
        retrieved_schema = client.get_by_id(registered_schema.schema_id)
        print(f"Retrieved schema (ID {registered_schema.schema_id}): {retrieved_schema.schema.to_dict()}")

        # Get the latest schema for a subject
        latest_schema_info = client.get_latest_version(subject)
        print(f"Latest schema for '{subject}' (version {latest_schema_info.version}, ID {latest_schema_info.schema.schema_id}): {latest_schema_info.schema.to_dict()}")

    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    # For synchronous use, just call client methods directly.
    # The example uses async to show client.register which is an async method by default
    # if the client is AsyncSchemaRegistryClient. For SchemaRegistryClient, it's synchronous.
    # However, the example above will work for both if using the sync client
    # due to how the `register` method is implemented (it's not truly async with await here).
    # Let's adjust for clarity to ensure it runs correctly with the default SchemaRegistryClient.
    # For a truly async example, one would use AsyncSchemaRegistryClient and 'await'.
    # For this quickstart, we'll keep it simple with the synchronous client.
    
    # Rerunning the quickstart for synchronous client clarification
    sync_client = SchemaRegistryClient(url=SCHEMA_REGISTRY_URL)
    
    avro_schema_definition = {
        "type": "record",
        "namespace": "com.example",
        "name": "SensorReading",
        "fields": [
            {"name": "id", "type": "string"},
            {"name": "value", "type": "int"}
        ]
    }
    
    avro_schema = AvroSchema(avro_schema_definition)
    subject = "sensor-readings-value-sync"

    try:
        registered_schema = sync_client.register(subject, avro_schema)
        print(f"Sync: Schema registered with ID: {registered_schema.schema_id}")

        retrieved_schema = sync_client.get_by_id(registered_schema.schema_id)
        print(f"Sync: Retrieved schema (ID {registered_schema.schema_id}): {retrieved_schema.schema.to_dict()}")

        latest_schema_info = sync_client.get_latest_version(subject)
        print(f"Sync: Latest schema for '{subject}' (version {latest_schema_info.version}, ID {latest_schema_info.schema.schema_id}): {latest_schema_info.schema.to_dict()}")

    except Exception as e:
        print(f"Sync: An error occurred: {e}")

view raw JSON →