{"id":5441,"library":"python-schema-registry-client","title":"Python Schema Registry Client","description":"A Python client to interact with the Confluent Schema Registry's REST API. It allows managing Avro and JSON schemas, including registering new schemas, retrieving existing ones by ID or subject, and performing compatibility checks. The library is currently at version 2.6.1 and is actively maintained, with releases typically tied to feature enhancements and bug fixes.","status":"active","version":"2.6.1","language":"en","source_language":"en","source_url":"https://github.com/marcosschroh/python-schema-registry-client","tags":["Kafka","Confluent","Schema Registry","Avro","JSON Schema","serialization"],"install":[{"cmd":"pip install python-schema-registry-client","lang":"bash","label":"Standard installation"},{"cmd":"pip install python-schema-registry-client[faust]","lang":"bash","label":"With Faust integration"}],"dependencies":[{"reason":"Required Python version as specified by the project.","package":"python>=3.8,<4.0"},{"reason":"Used internally for HTTP requests, although not explicitly listed as a direct dependency for users, it's a core component. May be pulled in by direct dependencies.","package":"httpx","optional":false},{"reason":"Required for the optional 'faust' integration extra. This is a fork of faust.","package":"faust-streaming","optional":true}],"imports":[{"symbol":"SchemaRegistryClient","correct":"from schema_registry.client import SchemaRegistryClient"},{"symbol":"AsyncSchemaRegistryClient","correct":"from schema_registry.client import AsyncSchemaRegistryClient"},{"symbol":"AvroSchema","correct":"from schema_registry.client.schema import AvroSchema"},{"symbol":"JsonSchema","correct":"from schema_registry.client.schema import JsonSchema"},{"symbol":"MessageSerializer","correct":"from schema_registry.serializers import MessageSerializer"}],"quickstart":{"code":"import os\nimport asyncio\nfrom schema_registry.client import SchemaRegistryClient\nfrom schema_registry.client.schema import AvroSchema\n\nSCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:8081')\n\nasync def main():\n    client = SchemaRegistryClient(url=SCHEMA_REGISTRY_URL)\n\n    avro_schema_definition = {\n        \"type\": \"record\",\n        \"namespace\": \"com.example\",\n        \"name\": \"SensorReading\",\n        \"fields\": [\n            {\"name\": \"id\", \"type\": \"string\"},\n            {\"name\": \"value\", \"type\": \"int\"}\n        ]\n    }\n    \n    # Create an AvroSchema object\n    avro_schema = AvroSchema(avro_schema_definition)\n\n    # Define a subject name\n    subject = \"sensor-readings-value\"\n\n    try:\n        # Register the schema\n        registered_schema = client.register(subject, avro_schema)\n        print(f\"Schema registered with ID: {registered_schema.schema_id}\")\n\n        # Get the schema by ID\n        retrieved_schema = client.get_by_id(registered_schema.schema_id)\n        print(f\"Retrieved schema (ID {registered_schema.schema_id}): {retrieved_schema.schema.to_dict()}\")\n\n        # Get the latest schema for a subject\n        latest_schema_info = client.get_latest_version(subject)\n        print(f\"Latest schema for '{subject}' (version {latest_schema_info.version}, ID {latest_schema_info.schema.schema_id}): {latest_schema_info.schema.to_dict()}\")\n\n    except Exception as e:\n        print(f\"An error occurred: {e}\")\n\nif __name__ == \"__main__\":\n    # For synchronous use, just call client methods directly.\n    # The example uses async to show client.register which is an async method by default\n    # if the client is AsyncSchemaRegistryClient. For SchemaRegistryClient, it's synchronous.\n    # However, the example above will work for both if using the sync client\n    # due to how the `register` method is implemented (it's not truly async with await here).\n    # Let's adjust for clarity to ensure it runs correctly with the default SchemaRegistryClient.\n    # For a truly async example, one would use AsyncSchemaRegistryClient and 'await'.\n    # For this quickstart, we'll keep it simple with the synchronous client.\n    \n    # Rerunning the quickstart for synchronous client clarification\n    sync_client = SchemaRegistryClient(url=SCHEMA_REGISTRY_URL)\n    \n    avro_schema_definition = {\n        \"type\": \"record\",\n        \"namespace\": \"com.example\",\n        \"name\": \"SensorReading\",\n        \"fields\": [\n            {\"name\": \"id\", \"type\": \"string\"},\n            {\"name\": \"value\", \"type\": \"int\"}\n        ]\n    }\n    \n    avro_schema = AvroSchema(avro_schema_definition)\n    subject = \"sensor-readings-value-sync\"\n\n    try:\n        registered_schema = sync_client.register(subject, avro_schema)\n        print(f\"Sync: Schema registered with ID: {registered_schema.schema_id}\")\n\n        retrieved_schema = sync_client.get_by_id(registered_schema.schema_id)\n        print(f\"Sync: Retrieved schema (ID {registered_schema.schema_id}): {retrieved_schema.schema.to_dict()}\")\n\n        latest_schema_info = sync_client.get_latest_version(subject)\n        print(f\"Sync: Latest schema for '{subject}' (version {latest_schema_info.version}, ID {latest_schema_info.schema.schema_id}): {latest_schema_info.schema.to_dict()}\")\n\n    except Exception as e:\n        print(f\"Sync: An error occurred: {e}\")","lang":"python","description":"This quickstart demonstrates how to initialize the `SchemaRegistryClient`, define an Avro schema, register it with the Schema Registry, and then retrieve it by its ID and subject name. Ensure a Confluent Schema Registry instance is running and accessible at the specified `SCHEMA_REGISTRY_URL` (default: http://localhost:8081)."},"warnings":[{"fix":"Avoid passing `SchemaRegistryClient` instances directly between processes. Instead, initialize a new client instance within each process or task where it's needed. For PySpark, consider using broadcast variables for configuration and re-initializing the client in UDFs.","message":"The `SchemaRegistryClient` may not be picklable, which can cause issues in distributed computing environments like Apache Spark or when using multiprocessing that relies on pickling objects. This is often due to its internal use of `requests.Session` (or `httpx.Client`).","severity":"gotcha","affected_versions":"All versions"},{"fix":"If you require a specific `faust` version or wish to avoid the `faust-streaming` fork, install `faust` manually first, then install `python-schema-registry-client` without the `[faust]` extra: `pip install faust` followed by `pip install python-schema-registry-client`.","message":"When using the `faust` extra (`pip install python-schema-registry-client[faust]`), the library pulls in `faust-streaming`, which is a fork of the original `faust` library. This might lead to version conflicts or unexpected behavior if your project already depends on a specific version of `faust`.","severity":"gotcha","affected_versions":"All versions with faust extra"},{"fix":"Ensure `ca_location`, `cert_location`, and `key_location` are correctly configured with paths to valid certificates. For development or controlled environments, `verify=False` can be passed to the underlying HTTP client (if exposed, or by subclassing/monkey-patching carefully) but is NOT recommended for production. Refer to `httpx` or `requests` SSL documentation for advanced configuration.","message":"SSL certificate verification can fail, especially when connecting to Schema Registry instances with self-signed certificates or improperly configured CA chains. This manifests as `requests.exceptions.SSLError` (or similar `httpx` errors).","severity":"gotcha","affected_versions":"All versions"},{"fix":"Always test compatibility before registering a new schema using `client.test_compatibility(subject_name, new_schema)`. Ensure your schema changes adhere to the configured compatibility rules (e.g., `BACKWARD`, `FORWARD`, `FULL`). Consider adding default values to new fields in Avro schemas to maintain backward compatibility.","message":"Schema compatibility violations (e.g., error code 409) occur when registering a new schema that is incompatible with previous versions based on the subject's compatibility level.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-13T00:00:00.000Z","next_check":"2026-07-12T00:00:00.000Z"}