Airbyte Python CDK
The Airbyte Python CDK is a framework designed for rapidly developing production-grade Airbyte source connectors. It provides helpers for building connectors against HTTP APIs (REST, GraphQL, etc.) and other generic Python data sources. It underpins Airbyte's Connector Builder and low-code CDK, offering full flexibility for complex integration scenarios. The library maintains an active development pace with frequent releases, currently at version 7.17.0.
Warnings
- breaking The alias `MessageRepresentationAirbyteTracedErrors` was temporarily removed and then restored in v7.16.0. If you were using a version between its removal and restoration, connectors relying on this alias might have broken.
- gotcha As of v7.17.0, the CDK includes a 'fail fast' mechanism for non-JSON-serializable types during serialization fallback. This means records containing complex types that cannot be JSON-serialized will cause the connector to fail, instead of silently converting them or dropping them.
- gotcha The CDK introduces fail-fast shutdown based on memory thresholding and source-side memory monitoring. Connectors exceeding defined memory limits may be terminated, especially under concurrent processing.
- gotcha Official documentation for developing Airbyte connectors sometimes recommends cloning the entire Airbyte repository and using the `airbyte-ci` tool. This approach can be cumbersome for developing standalone custom connectors in a separate repository.
Install
-
pip install airbyte-cdk
Imports
- Source
from airbyte_cdk.sources import Source
- AbstractSource
from airbyte_cdk.sources.declarative.base_source import AbstractSource
- Stream
from airbyte_cdk.sources.streams import Stream
- HttpStream
from airbyte_cdk.sources.streams.http import HttpStream
- launch
from airbyte_cdk.entrypoint import launch
Quickstart
import sys
from typing import Any, Iterable, Mapping
from airbyte_cdk.entrypoint import launch
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, AirbyteStream, AirbyteMessage, Type, AirbyteRecordMessage
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream
# Define a simple stream
class MySimpleStream(Stream):
primary_key = None
@property
def name(self) -> str:
return "my_data_stream"
def read_records(self,
sync_mode: SyncMode,
cursor_field: list[str] = None,
stream_state: Mapping[str, Any] = None)
-> Iterable[Mapping[str, Any]]:
# In a real connector, you would fetch data from an API or database
# For this example, we return static data.
yield {"id": 1, "name": "Alice", "value": 100}
yield {"id": 2, "name": "Bob", "value": 200}
def get_json_schema(self) -> Mapping[str, Any]:
return {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"value": {"type": "integer"}
}
}
# Define the source connector
class MyCustomSource(Source):
def check_connection(self, logger, config: Mapping[str, Any]) -> tuple[bool, Any]:
# In a real connector, this would validate credentials/connectivity
# For this example, we always return success.
return True, None
def streams(self, config: Mapping[str, Any]) -> list[Stream]:
return [MySimpleStream()]
# Main entry point for the connector
if __name__ == "__main__":
# This part typically involves calling 'launch' with your Source class.
# For direct testing, you might instantiate and call methods manually.
# In a full Airbyte deployment, this script would be executed by the platform.
# A minimal `spec` command handling for demonstration.
if len(sys.argv) > 1 and sys.argv[1] == "spec":
print('{"connectionSpecification": {"type": "object", "properties": {"api_key": {"type": "string"}}}}')
else:
# In a real scenario, Airbyte framework passes config, catalog, etc.
# This is a simplified call to demonstrate launching.
# In practice, you'd use a Runner or rely on Airbyte's execution.
source = MyCustomSource()
# Simplified execution for demonstration (not how Airbyte runner works directly)
# A complete entrypoint would parse CLI args and execute check/discover/read
# Example of check connection
# success, _ = source.check_connection(None, {'api_key': 'test'})
# print(f"Connection check success: {success}")
# For a full run, you'd integrate with airbyte_cdk.entrypoint.launch
# For this simplified example, we'll just print a success message
print("To run a full connector, use `airbyte-cdk launch <SourceClass>` with appropriate arguments.\nThis is a minimal example.")
print("Successfully defined MyCustomSource with MySimpleStream.")