Airbyte Protocol Models (Pydantic v2)

0.19.0 · active · verified Sun Apr 12

This library provides Pydantic v2 models that declare the Airbyte Protocol. It defines the structured messages (like records, states, logs, and catalogs) used for inter-process communication between Airbyte sources, destinations, and the platform. It is currently at version 0.19.0 and receives frequent updates, often adding new features and protocol extensions.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to create an `AirbyteRecordMessage`, wrap it within an `AirbyteMessage` envelope, serialize it to a JSON string, and then deserialize it back into a Pydantic object. This is fundamental for interacting with the Airbyte Protocol, where all messages are exchanged as JSON.

import json
from airbyte_protocol_models_pdv2.airbyte_protocol import AirbyteMessage, AirbyteRecordMessage, Type

# Create a sample record message
record_data = {"id": 1, "name": "Alice", "age": 30}
record_message = AirbyteRecordMessage(
    stream="users",
    data=record_data,
    emitted_at=1678886400000 # Unix timestamp in milliseconds
)

# Wrap it in an AirbyteMessage envelope
airbyte_message = AirbyteMessage(
    type=Type.RECORD,
    record=record_message
)

# Serialize the message to JSON (as expected by the Airbyte Protocol)
json_output = airbyte_message.model_dump_json(by_alias=True)
print("Serialized JSON message:")
print(json_output)

# Deserialize the JSON back into an AirbyteMessage object
deserialized_message = AirbyteMessage.model_validate_json(json_output)
print("\nDeserialized message type:", deserialized_message.type)
if deserialized_message.record:
    print("Deserialized record data:", deserialized_message.record.data)

view raw JSON →