lakeFS Airflow Provider

raw JSON →
0.48.0 verified Thu Apr 16 auth: no python

The `airflow-provider-lakefs` library, currently at version 0.48.0, provides a seamless integration between Apache Airflow and lakeFS. It enables users to incorporate Git-like operations (branching, committing, merging, sensing) on data lakes directly within Airflow DAGs. The provider is actively maintained by Treeverse and follows a regular release cadence as updates to lakeFS and Airflow necessitate.

pip install airflow-provider-lakefs
error DAG 'my_dag_id' failed to import: ModuleNotFoundError: No module named 'lakefs_provider'
cause The `airflow-provider-lakefs` package is not installed in the Airflow environment where the scheduler or worker is running.
fix
pip install airflow-provider-lakefs in your Airflow environment. Ensure it's installed on all components (scheduler, worker, webserver) that interact with DAGs using this provider.
error airflow.exceptions.AirflowException: The connection 'lakefs_default' is not defined. Please define it in Airflow UI or environment variables.
cause The Airflow connection ID specified in the operator (e.g., `lakefs_conn_id='lakefs_default'`) does not exist in your Airflow environment.
fix
Create an Airflow connection with the ID lakefs_default (or whatever ID you're using). It should be of type 'HTTP', with conn_host set to your lakeFS endpoint, and conn_extra containing access_key_id and secret_access_key in JSON format.
error DAG 'lakefs_task_group_issue' did not appear in the Airflow UI/scheduler.
cause Using `airflow-provider-lakefs` operators directly within an Airflow TaskGroup can prevent the DAG from being parsed and displayed by the Airflow scheduler.
fix
Refactor your DAG to avoid placing airflow-provider-lakefs operators inside TaskGroups. Define them as regular tasks and manage dependencies explicitly. This is a known issue (as of version 0.44.0) that can be a 'puzzler'.
gotcha LakeFS operators within Airflow TaskGroups can cause DAGs to not appear in the scheduler. This behavior has been observed with `LakeFSMergeOperator` and likely affects others.
fix Avoid using `airflow-provider-lakefs` operators directly inside TaskGroups. Instead, define them outside and manage dependencies explicitly or use a `SubDagOperator` for complex groupings if absolutely necessary, though direct operator usage is recommended.
gotcha Ensuring proper Airflow connection setup for lakeFS is crucial. Incorrect `conn_type`, `conn_host`, or `conn_extra` parameters (especially for `access_key_id` and `secret_access_key`) will lead to authentication failures.
fix Verify that the Airflow connection of type 'HTTP' is correctly configured with your lakeFS endpoint as `conn_host` and the `access_key_id` and `secret_access_key` in the 'Extra' JSON field, e.g., `'{"access_key_id":"<YOUR_ACCESS_KEY>","secret_access_key":"<YOUR_SECRET_KEY>"}'`. Use environment variables or Airflow Variables for sensitive credentials.
breaking As a pre-1.0 release (0.x.x), minor versions of `airflow-provider-lakefs` may introduce breaking changes without strict adherence to semantic versioning, although major API changes are usually documented. Always consult the GitHub CHANGELOG.md before upgrading minor versions.
fix Pin the `airflow-provider-lakefs` version in your `requirements.txt` (e.g., `airflow-provider-lakefs==0.48.0`) and review the project's GitHub repository or PyPI changelog for any breaking changes between specific minor versions before updating.

This quickstart DAG demonstrates the core capabilities of the `airflow-provider-lakefs`. It creates a new lakeFS branch, simulates data being written and committed to it, waits for a file to be present on the branch, and then merges the changes back to a `main` branch. Before running, ensure you have an Airflow connection named `lakefs_default` (or set `LAKEFS_CONN_ID` environment variable) configured as an HTTP connection pointing to your lakeFS endpoint, with `access_key_id` and `secret_access_key` in the 'Extra' field. Example `airflow connections add` command: `airflow connections add conn_lakefs --conn-type=HTTP --conn-host=http://<LAKEFS_ENDPOINT> --conn-extra='{"access_key_id":"<LAKEFS_ACCESS_KEY_ID>","secret_access_key":"<LAKEFS_SECRET_ACCESS_KEY>"}'`

import os
import pendulum

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

from lakefs_provider.operators.create_branch import CreateBranchOperator
from lakefs_provider.operators.commit import CommitOperator
from lakefs_provider.operators.merge import MergeOperator
from lakefs_provider.sensors.file import FileSensor

LAKEFS_REPOSITORY = os.environ.get('LAKEFS_REPOSITORY', 'example-repo')
LAKEFS_BRANCH = os.environ.get('LAKEFS_BRANCH', 'example-branch-{{ ds_nodash }}')
LAKEFS_SOURCE_BRANCH = os.environ.get('LAKEFS_SOURCE_BRANCH', 'main')
LAKEFS_CONN_ID = os.environ.get('LAKEFS_CONN_ID', 'lakefs_default')

with DAG(
    dag_id='lakefs_quickstart_dag',
    start_date=pendulum.datetime(2023, 1, 1, tz='UTC'),
    schedule=None,
    catchup=False,
    tags=['lakefs', 'example'],
) as dag:
    # Create a new branch for isolated work
    create_branch_task = CreateBranchOperator(
        task_id='create_lakefs_branch',
        lakefs_conn_id=LAKEFS_CONN_ID,
        repo=LAKEFS_REPOSITORY,
        branch=LAKEFS_BRANCH,
        source_branch=LAKEFS_SOURCE_BRANCH,
    )

    # Simulate data generation and writing to the lakeFS branch
    # Replace with your actual data processing logic writing to lakeFS via S3-compatible API
    write_data_task = BashOperator(
        task_id='write_simulated_data',
        bash_command=f"echo 'test data' > /tmp/data_on_{LAKEFS_BRANCH}.txt && echo 'Simulated data written for branch {LAKEFS_BRANCH}'",
    )

    # Commit the changes to the lakeFS branch
    commit_changes_task = CommitOperator(
        task_id='commit_to_lakefs',
        lakefs_conn_id=LAKEFS_CONN_ID,
        repo=LAKEFS_REPOSITORY,
        branch=LAKEFS_BRANCH,
        msg=f'Committed data for DAG run {{ {{ ds }} }} on branch {LAKEFS_BRANCH}',
        metadata={'airflow_dag_id': '{{ dag.dag_id }}', 'airflow_run_id': '{{ run_id }}'}
    )

    # Optionally, wait for a file to appear on the branch (e.g., from an external process)
    # This operator is illustrative; in a real DAG, 'write_data_task' would likely be writing to lakeFS directly.
    # The FileSensor would then check for that file within lakeFS.
    sense_file_task = FileSensor(
        task_id='sense_committed_file',
        lakefs_conn_id=LAKEFS_CONN_ID,
        repo=LAKEFS_REPOSITORY,
        branch=LAKEFS_BRANCH,
        path=f'/data_on_{LAKEFS_BRANCH}.txt', # Assuming data is written to the root of the branch
        poke_interval=5,
        timeout=60,
        soft_fail=True # Mark as soft_fail to allow DAG to continue if file not present in this example
    )

    # Merge the changes from the feature branch back to the main branch
    merge_branch_task = MergeOperator(
        task_id='merge_to_main',
        lakefs_conn_id=LAKEFS_CONN_ID,
        repo=LAKEFS_REPOSITORY,
        source_ref=LAKEFS_BRANCH,
        destination_branch=LAKEFS_SOURCE_BRANCH,
        msg=f'Merged changes from {LAKEFS_BRANCH} by Airflow DAG {{ {{ dag.dag_id }} }}',
    )

    create_branch_task >> write_data_task >> commit_changes_task >> sense_file_task >> merge_branch_task