{"id":2377,"library":"airbyte-cdk","title":"Airbyte Python CDK","description":"The Airbyte Python CDK is a framework designed for rapidly developing production-grade Airbyte source connectors. It provides helpers for building connectors against HTTP APIs (REST, GraphQL, etc.) and other generic Python data sources. It underpins Airbyte's Connector Builder and low-code CDK, offering full flexibility for complex integration scenarios. The library maintains an active development pace with frequent releases, currently at version 7.17.0.","status":"active","version":"7.17.0","language":"en","source_language":"en","source_url":"https://github.com/airbytehq/airbyte-python-cdk","tags":["data integration","ETL","connector development","Airbyte","source connector"],"install":[{"cmd":"pip install airbyte-cdk","lang":"bash","label":"Install latest version"}],"dependencies":[{"reason":"Requires Python 3.10 or newer, but less than 3.14.","package":"python","optional":false}],"imports":[{"note":"Base class for defining a custom Airbyte source connector.","symbol":"Source","correct":"from airbyte_cdk.sources import Source"},{"note":"A more opinionated base class implementing the Airbyte protocol operations.","symbol":"AbstractSource","correct":"from airbyte_cdk.sources.declarative.base_source import AbstractSource"},{"note":"Base class for defining a data stream within a source connector.","symbol":"Stream","correct":"from airbyte_cdk.sources.streams import Stream"},{"note":"Specialized stream for building connectors against HTTP APIs.","symbol":"HttpStream","correct":"from airbyte_cdk.sources.streams.http import HttpStream"},{"note":"Utility function to run an Airbyte connector entrypoint.","symbol":"launch","correct":"from airbyte_cdk.entrypoint import launch"}],"quickstart":{"code":"import sys\nfrom typing import Any, Iterable, Mapping\n\nfrom airbyte_cdk.entrypoint import launch\nfrom airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, AirbyteStream, AirbyteMessage, Type, AirbyteRecordMessage\nfrom airbyte_cdk.sources import Source\nfrom airbyte_cdk.sources.streams import Stream\n\n\n# Define a simple stream\nclass MySimpleStream(Stream):\n    primary_key = None\n\n    @property\n    def name(self) -> str:\n        return \"my_data_stream\"\n\n    def read_records(self, \n                     sync_mode: SyncMode,\n                     cursor_field: list[str] = None, \n                     stream_state: Mapping[str, Any] = None)\n                     -> Iterable[Mapping[str, Any]]:\n        # In a real connector, you would fetch data from an API or database\n        # For this example, we return static data.\n        yield {\"id\": 1, \"name\": \"Alice\", \"value\": 100}\n        yield {\"id\": 2, \"name\": \"Bob\", \"value\": 200}\n\n    def get_json_schema(self) -> Mapping[str, Any]:\n        return {\n            \"type\": \"object\",\n            \"properties\": {\n                \"id\": {\"type\": \"integer\"},\n                \"name\": {\"type\": \"string\"},\n                \"value\": {\"type\": \"integer\"}\n            }\n        }\n\n\n# Define the source connector\nclass MyCustomSource(Source):\n    def check_connection(self, logger, config: Mapping[str, Any]) -> tuple[bool, Any]:\n        # In a real connector, this would validate credentials/connectivity\n        # For this example, we always return success.\n        return True, None\n\n    def streams(self, config: Mapping[str, Any]) -> list[Stream]:\n        return [MySimpleStream()]\n\n\n# Main entry point for the connector\nif __name__ == \"__main__\":\n    # This part typically involves calling 'launch' with your Source class.\n    # For direct testing, you might instantiate and call methods manually.\n    # In a full Airbyte deployment, this script would be executed by the platform.\n    # A minimal `spec` command handling for demonstration.\n    if len(sys.argv) > 1 and sys.argv[1] == \"spec\":\n        print('{\"connectionSpecification\": {\"type\": \"object\", \"properties\": {\"api_key\": {\"type\": \"string\"}}}}')\n    else:\n        # In a real scenario, Airbyte framework passes config, catalog, etc.\n        # This is a simplified call to demonstrate launching.\n        # In practice, you'd use a Runner or rely on Airbyte's execution.\n        source = MyCustomSource()\n        # Simplified execution for demonstration (not how Airbyte runner works directly)\n        # A complete entrypoint would parse CLI args and execute check/discover/read\n        # Example of check connection\n        # success, _ = source.check_connection(None, {'api_key': 'test'})\n        # print(f\"Connection check success: {success}\")\n        \n        # For a full run, you'd integrate with airbyte_cdk.entrypoint.launch\n        # For this simplified example, we'll just print a success message\n        print(\"To run a full connector, use `airbyte-cdk launch <SourceClass>` with appropriate arguments.\\nThis is a minimal example.\")\n        print(\"Successfully defined MyCustomSource with MySimpleStream.\")\n\n","lang":"python","description":"This quickstart demonstrates how to define a minimal Airbyte source connector using the Python CDK. It includes a `Source` class with a `check_connection` method and a `streams` method that returns a `Stream` class. The `Stream` class defines the schema and `read_records` method for data extraction. For a full connector, you would also typically include a `spec.json` or `spec.yaml` file defining the configuration."},"warnings":[{"fix":"Upgrade to `airbyte-cdk>=7.16.0` to restore the alias or adjust your code to use the underlying class if the alias is not strictly necessary.","message":"The alias `MessageRepresentationAirbyteTracedErrors` was temporarily removed and then restored in v7.16.0. If you were using a version between its removal and restoration, connectors relying on this alias might have broken.","severity":"breaking","affected_versions":"Potentially some versions prior to v7.16.0"},{"fix":"Ensure all data emitted by your streams is JSON-serializable. Pre-process or transform non-serializable types (e.g., datetime objects, custom classes) into serializable formats (e.g., ISO-formatted strings, dicts) before yielding records.","message":"As of v7.17.0, the CDK includes a 'fail fast' mechanism for non-JSON-serializable types during serialization fallback. This means records containing complex types that cannot be JSON-serialized will cause the connector to fail, instead of silently converting them or dropping them.","severity":"gotcha","affected_versions":">=7.17.0"},{"fix":"Monitor connector memory usage, optimize data processing to reduce memory footprint (e.g., process records in smaller batches, avoid holding large datasets in memory), and review Airbyte platform's memory allocation settings for your connector.","message":"The CDK introduces fail-fast shutdown based on memory thresholding and source-side memory monitoring. Connectors exceeding defined memory limits may be terminated, especially under concurrent processing.","severity":"gotcha","affected_versions":">=7.11.0 (logging-only trial), >=7.16.0 (fail-fast shutdown)"},{"fix":"For independent connector development, create a new Python project, install `airbyte-cdk` as a dependency, and implement your `Source` and `Stream` classes. You can then build your connector as a Docker image independently of the main Airbyte repository.","message":"Official documentation for developing Airbyte connectors sometimes recommends cloning the entire Airbyte repository and using the `airbyte-ci` tool. This approach can be cumbersome for developing standalone custom connectors in a separate repository.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-10T00:00:00.000Z","next_check":"2026-07-09T00:00:00.000Z"}