lakeFS Airflow Provider
The `airflow-provider-lakefs` library, currently at version 0.48.0, provides a seamless integration between Apache Airflow and lakeFS. It enables users to incorporate Git-like operations (branching, committing, merging, sensing) on data lakes directly within Airflow DAGs. The provider is actively maintained by Treeverse and follows a regular release cadence as updates to lakeFS and Airflow necessitate.
Common errors
-
DAG 'my_dag_id' failed to import: ModuleNotFoundError: No module named 'lakefs_provider'
cause The `airflow-provider-lakefs` package is not installed in the Airflow environment where the scheduler or worker is running.fix`pip install airflow-provider-lakefs` in your Airflow environment. Ensure it's installed on all components (scheduler, worker, webserver) that interact with DAGs using this provider. -
airflow.exceptions.AirflowException: The connection 'lakefs_default' is not defined. Please define it in Airflow UI or environment variables.
cause The Airflow connection ID specified in the operator (e.g., `lakefs_conn_id='lakefs_default'`) does not exist in your Airflow environment.fixCreate an Airflow connection with the ID `lakefs_default` (or whatever ID you're using). It should be of type 'HTTP', with `conn_host` set to your lakeFS endpoint, and `conn_extra` containing `access_key_id` and `secret_access_key` in JSON format. -
DAG 'lakefs_task_group_issue' did not appear in the Airflow UI/scheduler.
cause Using `airflow-provider-lakefs` operators directly within an Airflow TaskGroup can prevent the DAG from being parsed and displayed by the Airflow scheduler.fixRefactor your DAG to avoid placing `airflow-provider-lakefs` operators inside TaskGroups. Define them as regular tasks and manage dependencies explicitly. This is a known issue (as of version 0.44.0) that can be a 'puzzler'.
Warnings
- gotcha LakeFS operators within Airflow TaskGroups can cause DAGs to not appear in the scheduler. This behavior has been observed with `LakeFSMergeOperator` and likely affects others.
- gotcha Ensuring proper Airflow connection setup for lakeFS is crucial. Incorrect `conn_type`, `conn_host`, or `conn_extra` parameters (especially for `access_key_id` and `secret_access_key`) will lead to authentication failures.
- breaking As a pre-1.0 release (0.x.x), minor versions of `airflow-provider-lakefs` may introduce breaking changes without strict adherence to semantic versioning, although major API changes are usually documented. Always consult the GitHub CHANGELOG.md before upgrading minor versions.
Install
-
pip install airflow-provider-lakefs
Imports
- CreateBranchOperator
from lakefs_provider.operators.create_branch import CreateBranchOperator
- CommitOperator
from lakefs_provider.operators.commit import CommitOperator
- MergeOperator
from lakefs_provider.operators.merge import MergeOperator
- FileSensor
from lakefs_provider.sensors.file import FileSensor
- CommitSensor
from lakefs_provider.sensors.commit import CommitSensor
Quickstart
import os
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from lakefs_provider.operators.create_branch import CreateBranchOperator
from lakefs_provider.operators.commit import CommitOperator
from lakefs_provider.operators.merge import MergeOperator
from lakefs_provider.sensors.file import FileSensor
LAKEFS_REPOSITORY = os.environ.get('LAKEFS_REPOSITORY', 'example-repo')
LAKEFS_BRANCH = os.environ.get('LAKEFS_BRANCH', 'example-branch-{{ ds_nodash }}')
LAKEFS_SOURCE_BRANCH = os.environ.get('LAKEFS_SOURCE_BRANCH', 'main')
LAKEFS_CONN_ID = os.environ.get('LAKEFS_CONN_ID', 'lakefs_default')
with DAG(
dag_id='lakefs_quickstart_dag',
start_date=pendulum.datetime(2023, 1, 1, tz='UTC'),
schedule=None,
catchup=False,
tags=['lakefs', 'example'],
) as dag:
# Create a new branch for isolated work
create_branch_task = CreateBranchOperator(
task_id='create_lakefs_branch',
lakefs_conn_id=LAKEFS_CONN_ID,
repo=LAKEFS_REPOSITORY,
branch=LAKEFS_BRANCH,
source_branch=LAKEFS_SOURCE_BRANCH,
)
# Simulate data generation and writing to the lakeFS branch
# Replace with your actual data processing logic writing to lakeFS via S3-compatible API
write_data_task = BashOperator(
task_id='write_simulated_data',
bash_command=f"echo 'test data' > /tmp/data_on_{LAKEFS_BRANCH}.txt && echo 'Simulated data written for branch {LAKEFS_BRANCH}'",
)
# Commit the changes to the lakeFS branch
commit_changes_task = CommitOperator(
task_id='commit_to_lakefs',
lakefs_conn_id=LAKEFS_CONN_ID,
repo=LAKEFS_REPOSITORY,
branch=LAKEFS_BRANCH,
msg=f'Committed data for DAG run {{ {{ ds }} }} on branch {LAKEFS_BRANCH}',
metadata={'airflow_dag_id': '{{ dag.dag_id }}', 'airflow_run_id': '{{ run_id }}'}
)
# Optionally, wait for a file to appear on the branch (e.g., from an external process)
# This operator is illustrative; in a real DAG, 'write_data_task' would likely be writing to lakeFS directly.
# The FileSensor would then check for that file within lakeFS.
sense_file_task = FileSensor(
task_id='sense_committed_file',
lakefs_conn_id=LAKEFS_CONN_ID,
repo=LAKEFS_REPOSITORY,
branch=LAKEFS_BRANCH,
path=f'/data_on_{LAKEFS_BRANCH}.txt', # Assuming data is written to the root of the branch
poke_interval=5,
timeout=60,
soft_fail=True # Mark as soft_fail to allow DAG to continue if file not present in this example
)
# Merge the changes from the feature branch back to the main branch
merge_branch_task = MergeOperator(
task_id='merge_to_main',
lakefs_conn_id=LAKEFS_CONN_ID,
repo=LAKEFS_REPOSITORY,
source_ref=LAKEFS_BRANCH,
destination_branch=LAKEFS_SOURCE_BRANCH,
msg=f'Merged changes from {LAKEFS_BRANCH} by Airflow DAG {{ {{ dag.dag_id }} }}',
)
create_branch_task >> write_data_task >> commit_changes_task >> sense_file_task >> merge_branch_task