Airbyte Protocol Dataclass Models
This library declares the Airbyte Protocol using Python Dataclasses. It is designed for scenarios where speed and memory usage are critical, offering less performance overhead compared to Pydantic models. The library is actively maintained with frequent updates, typically released monthly, in alignment with the broader Airbyte platform. The current version is 0.18.0.
Warnings
- breaking Protocol V1 models have been removed starting from version 0.17.0. Code relying on `v1` namespaces or structures will break.
- breaking The field name 'schema' was renamed to 'json-schema' in version 0.16.0. This affects how catalog objects are structured and accessed.
- gotcha This library uses Python's native dataclasses for protocol models, explicitly to reduce performance overhead compared to Pydantic. Users accustomed to Pydantic's features (e.g., automatic type coercion, runtime validation, custom validators) will find these are not natively available.
Install
-
pip install airbyte-protocol-models-dataclasses
Imports
- AirbyteMessage
from airbyte_protocol_models_dataclasses.v0.models import AirbyteMessage
- AirbyteRecordMessage
from airbyte_protocol_models_dataclasses.v0.models import AirbyteRecordMessage
- Type
from airbyte_protocol_models_dataclasses.v0.models import Type
Quickstart
import json
from dataclasses import asdict
from datetime import datetime
from airbyte_protocol_models_dataclasses.v0.models import AirbyteMessage, AirbyteRecordMessage, Type
# Create an AirbyteRecordMessage representing a data record
record_data = {
"id": 1,
"name": "Test User",
"email": "test.user@example.com",
"created_at": datetime.now().isoformat()
}
record_message = AirbyteRecordMessage(
stream="users",
data=record_data,
emitted_at=int(datetime.now().timestamp() * 1000),
)
# Wrap the record in a top-level AirbyteMessage
airbyte_message = AirbyteMessage(
type=Type.RECORD,
record=record_message,
)
# Convert the dataclass instance to a dictionary and then to a JSON string
json_output = json.dumps(asdict(airbyte_message), indent=2)
print("\n--- Airbyte Record Message ---")
print(json_output)
# Example of creating an AirbyteStateMessage
state_data = {"users_sync_progress": {"last_id": 100, "updated_at": datetime.now().isoformat()}}
state_message = AirbyteMessage(
type=Type.STATE,
state=state_data,
)
# Serialize the state message
json_state_output = json.dumps(asdict(state_message), indent=2)
print("\n--- Airbyte State Message ---")
print(json_state_output)