Azure Schema Registry Avro Serializer
The `azure-schemaregistry-avroserializer` is a Python client library for serializing and deserializing data in Apache Avro format using Azure Schema Registry. As of version 1.0.0b4.post1, this package is no longer maintained and has been superseded by `azure-schemaregistry-avroencoder`. It provided functionalities for schema storage, versioning, and management in Azure Event Hubs. New projects should use the `avroencoder` package instead.
Warnings
- breaking This package (`azure-schemaregistry-avroserializer`) is deprecated and no longer maintained. Users should migrate to `azure-schemaregistry-avroencoder` for new development and existing applications.
- breaking API class and parameter renames occurred between versions 1.0.0b3 and 1.0.0b4. `SchemaRegistryAvroSerializer` was renamed to `AvroSerializer`. The constructor parameters `schema_registry` and `schema_group` were renamed to `client` and `group_name`, respectively. The `serialize` and `deserialize` methods' `data` parameter was renamed to `value`.
- breaking Python 3.5 support was dropped in version 1.0.0b2. All future versions require Python 2.7 or 3.6+ (later updated to 3.7+ for `avroencoder`).
- gotcha For Azure Active Directory (AAD) authentication, regional Schema Registry endpoints do not support AAD. You must create a custom subdomain for your Schema Registry resource to use AAD credentials.
- gotcha The `auto_register_schemas` parameter (or `auto_register` in `avroencoder`) defaults to `False`. If set to `True`, schemas will be automatically registered on serialization if they don't exist. While convenient for development, it's recommended to pre-register schemas during deployment and set this to `False` in production to avoid first-event latency penalties and unintended schema creations.
Install
-
pip install azure-schemaregistry-avroserializer -
pip install azure-schemaregistry-avroencoder azure-identity azure-eventhub
Imports
- AvroSerializer
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
- SchemaRegistryClient
from azure.schemaregistry import SchemaRegistryClient
- DefaultAzureCredential
from azure.identity import DefaultAzureCredential
Quickstart
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential
# NOTE: This package is deprecated. Use azure-schemaregistry-avroencoder instead.
# Configure environment variables or replace placeholders
SCHEMA_REGISTRY_ENDPOINT = os.environ.get(
'SCHEMA_REGISTRY_ENDPOINT',
'https://<your-namespace>.servicebus.windows.net'
)
SCHEMA_REGISTRY_GROUP_NAME = os.environ.get(
'SCHEMA_REGISTRY_GROUP_NAME',
'my-schema-group'
)
# Define an Avro schema
AVRO_SCHEMA = '''
{
"type": "record",
"name": "TestMessage",
"namespace": "com.example",
"fields": [
{"name": "name", "type": "string"},
{"name": "value", "type": "int"}
]
}
'''
# Example data to serialize
message_data = {"name": "example", "value": 123}
def main():
print("Initializing Schema Registry and Avro Serializer...")
# Authenticate using DefaultAzureCredential
credential = DefaultAzureCredential()
# Create a SchemaRegistryClient
schema_registry_client = SchemaRegistryClient(
fully_qualified_namespace=SCHEMA_REGISTRY_ENDPOINT,
credential=credential
)
# Create the AvroSerializer
# auto_register_schemas=True will automatically register the schema if not found
avro_serializer = AvroSerializer(
client=schema_registry_client,
group_name=SCHEMA_REGISTRY_GROUP_NAME,
auto_register_schemas=True # Consider disabling in production for performance
)
try:
# Serialize the data
print(f"Serializing data: {message_data}")
# The `schema` parameter is required for serialize in 1.0.0b4
encoded_data = avro_serializer.serialize(value=message_data, schema=AVRO_SCHEMA)
print(f"Serialized data (bytes): {encoded_data}")
# Deserialize the data
print(f"Deserializing data: {encoded_data}")
decoded_data = avro_serializer.deserialize(value=encoded_data)
print(f"Deserialized data: {decoded_data}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# It's good practice to close clients, especially in async scenarios
if hasattr(schema_registry_client, 'close'):
schema_registry_client.close()
if hasattr(avro_serializer, 'close'):
avro_serializer.close()
if __name__ == "__main__":
main()