Airbyte Protocol Models (Pydantic v2)
This library provides Pydantic v2 models that declare the Airbyte Protocol. It defines the structured messages (like records, states, logs, and catalogs) used for inter-process communication between Airbyte sources, destinations, and the platform. It is currently at version 0.19.0 and receives frequent updates, often adding new features and protocol extensions.
Warnings
- breaking This library (`-pdv2`) is built exclusively on Pydantic v2. If your project or other dependencies use Pydantic v1, mixing these models can lead to runtime errors or unexpected behavior due to API changes and incompatibility between Pydantic versions.
- breaking Protocol v1 models have been removed. Versions v0.17.0, v0.14.5, and v0.14.4 explicitly include changes to remove or move V1 models.
- breaking The field 'schema' was renamed to 'json_schema' in the protocol models.
- gotcha The Airbyte Protocol is versioned independently of the main Airbyte Platform. This means that upgrading one does not automatically guarantee compatibility with the other.
Install
-
pip install airbyte-protocol-models-pdv2
Imports
- AirbyteMessage
from airbyte_protocol_models_pdv2.airbyte_protocol import AirbyteMessage
- AirbyteRecordMessage
from airbyte_protocol_models_pdv2.airbyte_protocol import AirbyteRecordMessage
- Type
from airbyte_protocol_models_pdv2.airbyte_protocol import Type
Quickstart
import json
from airbyte_protocol_models_pdv2.airbyte_protocol import AirbyteMessage, AirbyteRecordMessage, Type
# Create a sample record message
record_data = {"id": 1, "name": "Alice", "age": 30}
record_message = AirbyteRecordMessage(
stream="users",
data=record_data,
emitted_at=1678886400000 # Unix timestamp in milliseconds
)
# Wrap it in an AirbyteMessage envelope
airbyte_message = AirbyteMessage(
type=Type.RECORD,
record=record_message
)
# Serialize the message to JSON (as expected by the Airbyte Protocol)
json_output = airbyte_message.model_dump_json(by_alias=True)
print("Serialized JSON message:")
print(json_output)
# Deserialize the JSON back into an AirbyteMessage object
deserialized_message = AirbyteMessage.model_validate_json(json_output)
print("\nDeserialized message type:", deserialized_message.type)
if deserialized_message.record:
print("Deserialized record data:", deserialized_message.record.data)