Dagster Airbyte Integration
The `dagster-airbyte` library provides a robust integration layer to connect Airbyte data synchronization jobs with Dagster's asset-based orchestration. It allows users to define Airbyte connections and syncs as Dagster assets, enabling data lineage tracking, scheduling, and monitoring within the Dagster UI. As part of the Dagster monorepo, its versions are tightly coupled with the core `dagster` library releases. The current version is 0.29.0.
Common errors
-
ModuleNotFoundError: No module named 'dagster_airbyte'
cause The `dagster-airbyte` package is not installed in the Python environment where Dagster is running.fixInstall the package: `pip install dagster-airbyte` -
dagster.core.errors.DagsterInvariantViolationError: Could not connect to Airbyte at http://localhost:8000. Please ensure that Airbyte is running and accessible.
cause Dagster's `AirbyteResource` failed to establish a connection to the specified Airbyte API endpoint. This often means Airbyte is not running or the network path is blocked.fix1. Verify the Airbyte instance is actively running. 2. Check the `host` and `port` configured in your `AirbyteResource` to ensure they are correct and reachable from where Dagster is running. 3. Check for firewall rules or network issues. -
airbyte_api.errors.UnprocessableEntity: Authentication failed. Please check your username and password.
cause The credentials (username, password) provided to the `AirbyteResource` are incorrect for authenticating with the Airbyte API.fixDouble-check the `AIRBYTE_USERNAME` and `AIRBYTE_PASSWORD` environment variables (or direct resource configuration) against your Airbyte instance's security settings. For local Airbyte, default credentials are often `airbyte`/`password`. -
dagster._core.errors.DagsterInvalidConfigError: Error validating config for resource 'airbyte': Missing required config field 'host'.
cause The `AirbyteResource` was not properly configured with required parameters like `host`, `port`, `username`, or `password`.fixEnsure all required configuration parameters (`host`, `port`, `username`, `password`) are provided when instantiating `AirbyteResource`. It's recommended to use environment variables for sensitive credentials.
Warnings
- breaking Dagster library versions are tightly coupled to core `dagster` versions. Updating `dagster` (e.g., to `1.x.x`) often requires updating `dagster-airbyte` to its corresponding compatible version (e.g., `0.x.x`). Always check the official Dagster release notes for version compatibility.
- gotcha Airbyte integration requires a running and accessible Airbyte instance. The `AirbyteResource`'s `host`, `port`, `username`, and `password` parameters must accurately reflect your Airbyte setup.
- gotcha The `load_assets_from_airbyte_instance` utility automatically generates Dagster asset keys based on Airbyte source and stream names. If you need custom asset keys or more fine-grained control over how Airbyte connections are mapped to Dagster assets, use `AirbyteSyncAssetFactory` instead.
Install
-
pip install dagster-airbyte
Imports
- AirbyteResource
from dagster_airbyte import AirbyteResource
- load_assets_from_airbyte_instance
from dagster_airbyte import load_assets_from_airbyte_instance
- AirbyteSyncAssetFactory
from dagster_airbyte import AirbyteSyncAssetFactory
Quickstart
import os
from dagster import Definitions, asset
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
# Configure Airbyte connection via environment variables for security
AIRBYTE_HOST = os.environ.get('AIRBYTE_HOST', 'localhost')
AIRBYTE_PORT = os.environ.get('AIRBYTE_PORT', '8000')
AIRBYTE_USERNAME = os.environ.get('AIRBYTE_USERNAME', 'airbyte') # Default for local Airbyte
AIRBYTE_PASSWORD = os.environ.get('AIRBYTE_PASSWORD', 'password') # Default for local Airbyte
# Instantiate the Airbyte resource
airbyte_resource = AirbyteResource(
host=AIRBYTE_HOST,
port=AIRBYTE_PORT,
username=AIRBYTE_USERNAME,
password=AIRBYTE_PASSWORD
)
# Load assets from your Airbyte instance
# By default, this will load assets for all connections in Airbyte
airbyte_assets = load_assets_from_airbyte_instance(airbyte_resource)
# Example of a downstream Dagster asset that depends on an Airbyte asset
@asset
def my_downstream_asset(my_airbyte_table):
# 'my_airbyte_table' would be automatically available if an Airbyte connection
# produces an asset with this key (e.g., 'airbyte/source_name/table_name')
# In a real scenario, you'd specify dependencies more explicitly.
print(f"Processing data from {my_airbyte_table}")
return 'processed_data'
# Combine assets into Dagster Definitions
defs = Definitions(
assets=[*airbyte_assets, my_downstream_asset],
resources={
"airbyte": airbyte_resource
}
)
# To run this, save as a Python file (e.g., 'airbyte_pipeline.py')
# Then, from your terminal, navigate to the directory and run:
# dagster dev -f airbyte_pipeline.py