Airbyte Python CDK

7.17.0 · active · verified Fri Apr 10

The Airbyte Python CDK is a framework designed for rapidly developing production-grade Airbyte source connectors. It provides helpers for building connectors against HTTP APIs (REST, GraphQL, etc.) and other generic Python data sources. It underpins Airbyte's Connector Builder and low-code CDK, offering full flexibility for complex integration scenarios. The library maintains an active development pace with frequent releases, currently at version 7.17.0.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define a minimal Airbyte source connector using the Python CDK. It includes a `Source` class with a `check_connection` method and a `streams` method that returns a `Stream` class. The `Stream` class defines the schema and `read_records` method for data extraction. For a full connector, you would also typically include a `spec.json` or `spec.yaml` file defining the configuration.

import sys
from typing import Any, Iterable, Mapping

from airbyte_cdk.entrypoint import launch
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, AirbyteStream, AirbyteMessage, Type, AirbyteRecordMessage
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream


# Define a simple stream
class MySimpleStream(Stream):
    primary_key = None

    @property
    def name(self) -> str:
        return "my_data_stream"

    def read_records(self, 
                     sync_mode: SyncMode,
                     cursor_field: list[str] = None, 
                     stream_state: Mapping[str, Any] = None)
                     -> Iterable[Mapping[str, Any]]:
        # In a real connector, you would fetch data from an API or database
        # For this example, we return static data.
        yield {"id": 1, "name": "Alice", "value": 100}
        yield {"id": 2, "name": "Bob", "value": 200}

    def get_json_schema(self) -> Mapping[str, Any]:
        return {
            "type": "object",
            "properties": {
                "id": {"type": "integer"},
                "name": {"type": "string"},
                "value": {"type": "integer"}
            }
        }


# Define the source connector
class MyCustomSource(Source):
    def check_connection(self, logger, config: Mapping[str, Any]) -> tuple[bool, Any]:
        # In a real connector, this would validate credentials/connectivity
        # For this example, we always return success.
        return True, None

    def streams(self, config: Mapping[str, Any]) -> list[Stream]:
        return [MySimpleStream()]


# Main entry point for the connector
if __name__ == "__main__":
    # This part typically involves calling 'launch' with your Source class.
    # For direct testing, you might instantiate and call methods manually.
    # In a full Airbyte deployment, this script would be executed by the platform.
    # A minimal `spec` command handling for demonstration.
    if len(sys.argv) > 1 and sys.argv[1] == "spec":
        print('{"connectionSpecification": {"type": "object", "properties": {"api_key": {"type": "string"}}}}')
    else:
        # In a real scenario, Airbyte framework passes config, catalog, etc.
        # This is a simplified call to demonstrate launching.
        # In practice, you'd use a Runner or rely on Airbyte's execution.
        source = MyCustomSource()
        # Simplified execution for demonstration (not how Airbyte runner works directly)
        # A complete entrypoint would parse CLI args and execute check/discover/read
        # Example of check connection
        # success, _ = source.check_connection(None, {'api_key': 'test'})
        # print(f"Connection check success: {success}")
        
        # For a full run, you'd integrate with airbyte_cdk.entrypoint.launch
        # For this simplified example, we'll just print a success message
        print("To run a full connector, use `airbyte-cdk launch <SourceClass>` with appropriate arguments.\nThis is a minimal example.")
        print("Successfully defined MyCustomSource with MySimpleStream.")

view raw JSON →