Airbyte Protocol Dataclass Models

0.18.0 · active · verified Sun Apr 12

This library declares the Airbyte Protocol using Python Dataclasses. It is designed for scenarios where speed and memory usage are critical, offering less performance overhead compared to Pydantic models. The library is actively maintained with frequent updates, typically released monthly, in alignment with the broader Airbyte platform. The current version is 0.18.0.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to construct basic Airbyte Protocol messages (Record and State) using the dataclass models. It shows how to create a `AirbyteRecordMessage` and `AirbyteStateMessage`, wrap them in a generic `AirbyteMessage`, and then serialize them to JSON format for inter-process communication, which is standard for the Airbyte Protocol. All fields are expected to be correctly typed according to the protocol's JSON schema.

import json
from dataclasses import asdict
from datetime import datetime
from airbyte_protocol_models_dataclasses.v0.models import AirbyteMessage, AirbyteRecordMessage, Type

# Create an AirbyteRecordMessage representing a data record
record_data = {
    "id": 1,
    "name": "Test User",
    "email": "test.user@example.com",
    "created_at": datetime.now().isoformat()
}
record_message = AirbyteRecordMessage(
    stream="users",
    data=record_data,
    emitted_at=int(datetime.now().timestamp() * 1000),
)

# Wrap the record in a top-level AirbyteMessage
airbyte_message = AirbyteMessage(
    type=Type.RECORD,
    record=record_message,
)

# Convert the dataclass instance to a dictionary and then to a JSON string
json_output = json.dumps(asdict(airbyte_message), indent=2)
print("\n--- Airbyte Record Message ---")
print(json_output)

# Example of creating an AirbyteStateMessage
state_data = {"users_sync_progress": {"last_id": 100, "updated_at": datetime.now().isoformat()}}
state_message = AirbyteMessage(
    type=Type.STATE,
    state=state_data,
)

# Serialize the state message
json_state_output = json.dumps(asdict(state_message), indent=2)
print("\n--- Airbyte State Message ---")
print(json_state_output)

view raw JSON →