Azure Schema Registry Avro Serializer

1.0.0b4.post1 · deprecated · verified Sun Apr 12

The `azure-schemaregistry-avroserializer` is a Python client library for serializing and deserializing data in Apache Avro format using Azure Schema Registry. As of version 1.0.0b4.post1, this package is no longer maintained and has been superseded by `azure-schemaregistry-avroencoder`. It provided functionalities for schema storage, versioning, and management in Azure Event Hubs. New projects should use the `avroencoder` package instead.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize the AvroSerializer, serialize Python dictionary data into Avro bytes, and then deserialize it back. It uses `DefaultAzureCredential` for authentication, requiring the `SCHEMA_REGISTRY_ENDPOINT` and `SCHEMA_REGISTRY_GROUP_NAME` environment variables to be set. The `auto_register_schemas` flag is set to `True` for convenience, but it is recommended to pre-register schemas in production environments.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# NOTE: This package is deprecated. Use azure-schemaregistry-avroencoder instead.

# Configure environment variables or replace placeholders
SCHEMA_REGISTRY_ENDPOINT = os.environ.get(
    'SCHEMA_REGISTRY_ENDPOINT',
    'https://<your-namespace>.servicebus.windows.net'
)
SCHEMA_REGISTRY_GROUP_NAME = os.environ.get(
    'SCHEMA_REGISTRY_GROUP_NAME',
    'my-schema-group'
)

# Define an Avro schema
AVRO_SCHEMA = '''
{
    "type": "record",
    "name": "TestMessage",
    "namespace": "com.example",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "value", "type": "int"}
    ]
}
'''

# Example data to serialize
message_data = {"name": "example", "value": 123}

def main():
    print("Initializing Schema Registry and Avro Serializer...")
    # Authenticate using DefaultAzureCredential
    credential = DefaultAzureCredential()

    # Create a SchemaRegistryClient
    schema_registry_client = SchemaRegistryClient(
        fully_qualified_namespace=SCHEMA_REGISTRY_ENDPOINT,
        credential=credential
    )

    # Create the AvroSerializer
    # auto_register_schemas=True will automatically register the schema if not found
    avro_serializer = AvroSerializer(
        client=schema_registry_client,
        group_name=SCHEMA_REGISTRY_GROUP_NAME,
        auto_register_schemas=True # Consider disabling in production for performance
    )

    try:
        # Serialize the data
        print(f"Serializing data: {message_data}")
        # The `schema` parameter is required for serialize in 1.0.0b4
        encoded_data = avro_serializer.serialize(value=message_data, schema=AVRO_SCHEMA)
        print(f"Serialized data (bytes): {encoded_data}")

        # Deserialize the data
        print(f"Deserializing data: {encoded_data}")
        decoded_data = avro_serializer.deserialize(value=encoded_data)
        print(f"Deserialized data: {decoded_data}")

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # It's good practice to close clients, especially in async scenarios
        if hasattr(schema_registry_client, 'close'):
            schema_registry_client.close()
        if hasattr(avro_serializer, 'close'):
            avro_serializer.close()

if __name__ == "__main__":
    main()

view raw JSON →