Dagster Airbyte Integration

0.29.0 · active · verified Thu Apr 16

The `dagster-airbyte` library provides a robust integration layer to connect Airbyte data synchronization jobs with Dagster's asset-based orchestration. It allows users to define Airbyte connections and syncs as Dagster assets, enabling data lineage tracking, scheduling, and monitoring within the Dagster UI. As part of the Dagster monorepo, its versions are tightly coupled with the core `dagster` library releases. The current version is 0.29.0.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define Airbyte connections as a Dagster resource and load all available Airbyte syncs as Dagster assets using `load_assets_from_airbyte_instance`. It also shows how a Dagster asset can depend on an Airbyte-managed asset. Ensure your Airbyte instance is running and accessible from where Dagster is executed, and configure credentials via environment variables.

import os
from dagster import Definitions, asset
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance

# Configure Airbyte connection via environment variables for security
AIRBYTE_HOST = os.environ.get('AIRBYTE_HOST', 'localhost')
AIRBYTE_PORT = os.environ.get('AIRBYTE_PORT', '8000')
AIRBYTE_USERNAME = os.environ.get('AIRBYTE_USERNAME', 'airbyte') # Default for local Airbyte
AIRBYTE_PASSWORD = os.environ.get('AIRBYTE_PASSWORD', 'password') # Default for local Airbyte

# Instantiate the Airbyte resource
airbyte_resource = AirbyteResource(
    host=AIRBYTE_HOST,
    port=AIRBYTE_PORT,
    username=AIRBYTE_USERNAME,
    password=AIRBYTE_PASSWORD
)

# Load assets from your Airbyte instance
# By default, this will load assets for all connections in Airbyte
airbyte_assets = load_assets_from_airbyte_instance(airbyte_resource)

# Example of a downstream Dagster asset that depends on an Airbyte asset
@asset
def my_downstream_asset(my_airbyte_table):
    # 'my_airbyte_table' would be automatically available if an Airbyte connection
    # produces an asset with this key (e.g., 'airbyte/source_name/table_name')
    # In a real scenario, you'd specify dependencies more explicitly.
    print(f"Processing data from {my_airbyte_table}")
    return 'processed_data'

# Combine assets into Dagster Definitions
defs = Definitions(
    assets=[*airbyte_assets, my_downstream_asset],
    resources={
        "airbyte": airbyte_resource
    }
)

# To run this, save as a Python file (e.g., 'airbyte_pipeline.py')
# Then, from your terminal, navigate to the directory and run:
# dagster dev -f airbyte_pipeline.py

view raw JSON →