Python Schema Registry Client
A Python client to interact with the Confluent Schema Registry's REST API. It allows managing Avro and JSON schemas, including registering new schemas, retrieving existing ones by ID or subject, and performing compatibility checks. The library is currently at version 2.6.1 and is actively maintained, with releases typically tied to feature enhancements and bug fixes.
Warnings
- gotcha The `SchemaRegistryClient` may not be picklable, which can cause issues in distributed computing environments like Apache Spark or when using multiprocessing that relies on pickling objects. This is often due to its internal use of `requests.Session` (or `httpx.Client`).
- gotcha When using the `faust` extra (`pip install python-schema-registry-client[faust]`), the library pulls in `faust-streaming`, which is a fork of the original `faust` library. This might lead to version conflicts or unexpected behavior if your project already depends on a specific version of `faust`.
- gotcha SSL certificate verification can fail, especially when connecting to Schema Registry instances with self-signed certificates or improperly configured CA chains. This manifests as `requests.exceptions.SSLError` (or similar `httpx` errors).
- gotcha Schema compatibility violations (e.g., error code 409) occur when registering a new schema that is incompatible with previous versions based on the subject's compatibility level.
Install
-
pip install python-schema-registry-client -
pip install python-schema-registry-client[faust]
Imports
- SchemaRegistryClient
from schema_registry.client import SchemaRegistryClient
- AsyncSchemaRegistryClient
from schema_registry.client import AsyncSchemaRegistryClient
- AvroSchema
from schema_registry.client.schema import AvroSchema
- JsonSchema
from schema_registry.client.schema import JsonSchema
- MessageSerializer
from schema_registry.serializers import MessageSerializer
Quickstart
import os
import asyncio
from schema_registry.client import SchemaRegistryClient
from schema_registry.client.schema import AvroSchema
SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:8081')
async def main():
client = SchemaRegistryClient(url=SCHEMA_REGISTRY_URL)
avro_schema_definition = {
"type": "record",
"namespace": "com.example",
"name": "SensorReading",
"fields": [
{"name": "id", "type": "string"},
{"name": "value", "type": "int"}
]
}
# Create an AvroSchema object
avro_schema = AvroSchema(avro_schema_definition)
# Define a subject name
subject = "sensor-readings-value"
try:
# Register the schema
registered_schema = client.register(subject, avro_schema)
print(f"Schema registered with ID: {registered_schema.schema_id}")
# Get the schema by ID
retrieved_schema = client.get_by_id(registered_schema.schema_id)
print(f"Retrieved schema (ID {registered_schema.schema_id}): {retrieved_schema.schema.to_dict()}")
# Get the latest schema for a subject
latest_schema_info = client.get_latest_version(subject)
print(f"Latest schema for '{subject}' (version {latest_schema_info.version}, ID {latest_schema_info.schema.schema_id}): {latest_schema_info.schema.to_dict()}")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
# For synchronous use, just call client methods directly.
# The example uses async to show client.register which is an async method by default
# if the client is AsyncSchemaRegistryClient. For SchemaRegistryClient, it's synchronous.
# However, the example above will work for both if using the sync client
# due to how the `register` method is implemented (it's not truly async with await here).
# Let's adjust for clarity to ensure it runs correctly with the default SchemaRegistryClient.
# For a truly async example, one would use AsyncSchemaRegistryClient and 'await'.
# For this quickstart, we'll keep it simple with the synchronous client.
# Rerunning the quickstart for synchronous client clarification
sync_client = SchemaRegistryClient(url=SCHEMA_REGISTRY_URL)
avro_schema_definition = {
"type": "record",
"namespace": "com.example",
"name": "SensorReading",
"fields": [
{"name": "id", "type": "string"},
{"name": "value", "type": "int"}
]
}
avro_schema = AvroSchema(avro_schema_definition)
subject = "sensor-readings-value-sync"
try:
registered_schema = sync_client.register(subject, avro_schema)
print(f"Sync: Schema registered with ID: {registered_schema.schema_id}")
retrieved_schema = sync_client.get_by_id(registered_schema.schema_id)
print(f"Sync: Retrieved schema (ID {registered_schema.schema_id}): {retrieved_schema.schema.to_dict()}")
latest_schema_info = sync_client.get_latest_version(subject)
print(f"Sync: Latest schema for '{subject}' (version {latest_schema_info.version}, ID {latest_schema_info.schema.schema_id}): {latest_schema_info.schema.to_dict()}")
except Exception as e:
print(f"Sync: An error occurred: {e}")