lakeFS Airflow Provider
raw JSON → 0.48.0 verified Thu Apr 16 auth: no python
The `airflow-provider-lakefs` library, currently at version 0.48.0, provides a seamless integration between Apache Airflow and lakeFS. It enables users to incorporate Git-like operations (branching, committing, merging, sensing) on data lakes directly within Airflow DAGs. The provider is actively maintained by Treeverse and follows a regular release cadence as updates to lakeFS and Airflow necessitate.
pip install airflow-provider-lakefs Common errors
error DAG 'my_dag_id' failed to import: ModuleNotFoundError: No module named 'lakefs_provider' ↓
cause The `airflow-provider-lakefs` package is not installed in the Airflow environment where the scheduler or worker is running.
fix
pip install airflow-provider-lakefs in your Airflow environment. Ensure it's installed on all components (scheduler, worker, webserver) that interact with DAGs using this provider. error airflow.exceptions.AirflowException: The connection 'lakefs_default' is not defined. Please define it in Airflow UI or environment variables. ↓
cause The Airflow connection ID specified in the operator (e.g., `lakefs_conn_id='lakefs_default'`) does not exist in your Airflow environment.
fix
Create an Airflow connection with the ID
lakefs_default (or whatever ID you're using). It should be of type 'HTTP', with conn_host set to your lakeFS endpoint, and conn_extra containing access_key_id and secret_access_key in JSON format. error DAG 'lakefs_task_group_issue' did not appear in the Airflow UI/scheduler. ↓
cause Using `airflow-provider-lakefs` operators directly within an Airflow TaskGroup can prevent the DAG from being parsed and displayed by the Airflow scheduler.
fix
Refactor your DAG to avoid placing
airflow-provider-lakefs operators inside TaskGroups. Define them as regular tasks and manage dependencies explicitly. This is a known issue (as of version 0.44.0) that can be a 'puzzler'. Warnings
gotcha LakeFS operators within Airflow TaskGroups can cause DAGs to not appear in the scheduler. This behavior has been observed with `LakeFSMergeOperator` and likely affects others. ↓
fix Avoid using `airflow-provider-lakefs` operators directly inside TaskGroups. Instead, define them outside and manage dependencies explicitly or use a `SubDagOperator` for complex groupings if absolutely necessary, though direct operator usage is recommended.
gotcha Ensuring proper Airflow connection setup for lakeFS is crucial. Incorrect `conn_type`, `conn_host`, or `conn_extra` parameters (especially for `access_key_id` and `secret_access_key`) will lead to authentication failures. ↓
fix Verify that the Airflow connection of type 'HTTP' is correctly configured with your lakeFS endpoint as `conn_host` and the `access_key_id` and `secret_access_key` in the 'Extra' JSON field, e.g., `'{"access_key_id":"<YOUR_ACCESS_KEY>","secret_access_key":"<YOUR_SECRET_KEY>"}'`. Use environment variables or Airflow Variables for sensitive credentials.
breaking As a pre-1.0 release (0.x.x), minor versions of `airflow-provider-lakefs` may introduce breaking changes without strict adherence to semantic versioning, although major API changes are usually documented. Always consult the GitHub CHANGELOG.md before upgrading minor versions. ↓
fix Pin the `airflow-provider-lakefs` version in your `requirements.txt` (e.g., `airflow-provider-lakefs==0.48.0`) and review the project's GitHub repository or PyPI changelog for any breaking changes between specific minor versions before updating.
Imports
- CreateBranchOperator
from lakefs_provider.operators.create_branch import CreateBranchOperator - CommitOperator
from lakefs_provider.operators.commit import CommitOperator - MergeOperator
from lakefs_provider.operators.merge import MergeOperator - FileSensor
from lakefs_provider.sensors.file import FileSensor - CommitSensor
from lakefs_provider.sensors.commit import CommitSensor
Quickstart
import os
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from lakefs_provider.operators.create_branch import CreateBranchOperator
from lakefs_provider.operators.commit import CommitOperator
from lakefs_provider.operators.merge import MergeOperator
from lakefs_provider.sensors.file import FileSensor
LAKEFS_REPOSITORY = os.environ.get('LAKEFS_REPOSITORY', 'example-repo')
LAKEFS_BRANCH = os.environ.get('LAKEFS_BRANCH', 'example-branch-{{ ds_nodash }}')
LAKEFS_SOURCE_BRANCH = os.environ.get('LAKEFS_SOURCE_BRANCH', 'main')
LAKEFS_CONN_ID = os.environ.get('LAKEFS_CONN_ID', 'lakefs_default')
with DAG(
dag_id='lakefs_quickstart_dag',
start_date=pendulum.datetime(2023, 1, 1, tz='UTC'),
schedule=None,
catchup=False,
tags=['lakefs', 'example'],
) as dag:
# Create a new branch for isolated work
create_branch_task = CreateBranchOperator(
task_id='create_lakefs_branch',
lakefs_conn_id=LAKEFS_CONN_ID,
repo=LAKEFS_REPOSITORY,
branch=LAKEFS_BRANCH,
source_branch=LAKEFS_SOURCE_BRANCH,
)
# Simulate data generation and writing to the lakeFS branch
# Replace with your actual data processing logic writing to lakeFS via S3-compatible API
write_data_task = BashOperator(
task_id='write_simulated_data',
bash_command=f"echo 'test data' > /tmp/data_on_{LAKEFS_BRANCH}.txt && echo 'Simulated data written for branch {LAKEFS_BRANCH}'",
)
# Commit the changes to the lakeFS branch
commit_changes_task = CommitOperator(
task_id='commit_to_lakefs',
lakefs_conn_id=LAKEFS_CONN_ID,
repo=LAKEFS_REPOSITORY,
branch=LAKEFS_BRANCH,
msg=f'Committed data for DAG run {{ {{ ds }} }} on branch {LAKEFS_BRANCH}',
metadata={'airflow_dag_id': '{{ dag.dag_id }}', 'airflow_run_id': '{{ run_id }}'}
)
# Optionally, wait for a file to appear on the branch (e.g., from an external process)
# This operator is illustrative; in a real DAG, 'write_data_task' would likely be writing to lakeFS directly.
# The FileSensor would then check for that file within lakeFS.
sense_file_task = FileSensor(
task_id='sense_committed_file',
lakefs_conn_id=LAKEFS_CONN_ID,
repo=LAKEFS_REPOSITORY,
branch=LAKEFS_BRANCH,
path=f'/data_on_{LAKEFS_BRANCH}.txt', # Assuming data is written to the root of the branch
poke_interval=5,
timeout=60,
soft_fail=True # Mark as soft_fail to allow DAG to continue if file not present in this example
)
# Merge the changes from the feature branch back to the main branch
merge_branch_task = MergeOperator(
task_id='merge_to_main',
lakefs_conn_id=LAKEFS_CONN_ID,
repo=LAKEFS_REPOSITORY,
source_ref=LAKEFS_BRANCH,
destination_branch=LAKEFS_SOURCE_BRANCH,
msg=f'Merged changes from {LAKEFS_BRANCH} by Airflow DAG {{ {{ dag.dag_id }} }}',
)
create_branch_task >> write_data_task >> commit_changes_task >> sense_file_task >> merge_branch_task