lakeFS Airflow Provider

0.48.0 · active · verified Thu Apr 16

The `airflow-provider-lakefs` library, currently at version 0.48.0, provides a seamless integration between Apache Airflow and lakeFS. It enables users to incorporate Git-like operations (branching, committing, merging, sensing) on data lakes directly within Airflow DAGs. The provider is actively maintained by Treeverse and follows a regular release cadence as updates to lakeFS and Airflow necessitate.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart DAG demonstrates the core capabilities of the `airflow-provider-lakefs`. It creates a new lakeFS branch, simulates data being written and committed to it, waits for a file to be present on the branch, and then merges the changes back to a `main` branch. Before running, ensure you have an Airflow connection named `lakefs_default` (or set `LAKEFS_CONN_ID` environment variable) configured as an HTTP connection pointing to your lakeFS endpoint, with `access_key_id` and `secret_access_key` in the 'Extra' field. Example `airflow connections add` command: `airflow connections add conn_lakefs --conn-type=HTTP --conn-host=http://<LAKEFS_ENDPOINT> --conn-extra='{"access_key_id":"<LAKEFS_ACCESS_KEY_ID>","secret_access_key":"<LAKEFS_SECRET_ACCESS_KEY>"}'`

import os
import pendulum

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

from lakefs_provider.operators.create_branch import CreateBranchOperator
from lakefs_provider.operators.commit import CommitOperator
from lakefs_provider.operators.merge import MergeOperator
from lakefs_provider.sensors.file import FileSensor

LAKEFS_REPOSITORY = os.environ.get('LAKEFS_REPOSITORY', 'example-repo')
LAKEFS_BRANCH = os.environ.get('LAKEFS_BRANCH', 'example-branch-{{ ds_nodash }}')
LAKEFS_SOURCE_BRANCH = os.environ.get('LAKEFS_SOURCE_BRANCH', 'main')
LAKEFS_CONN_ID = os.environ.get('LAKEFS_CONN_ID', 'lakefs_default')

with DAG(
    dag_id='lakefs_quickstart_dag',
    start_date=pendulum.datetime(2023, 1, 1, tz='UTC'),
    schedule=None,
    catchup=False,
    tags=['lakefs', 'example'],
) as dag:
    # Create a new branch for isolated work
    create_branch_task = CreateBranchOperator(
        task_id='create_lakefs_branch',
        lakefs_conn_id=LAKEFS_CONN_ID,
        repo=LAKEFS_REPOSITORY,
        branch=LAKEFS_BRANCH,
        source_branch=LAKEFS_SOURCE_BRANCH,
    )

    # Simulate data generation and writing to the lakeFS branch
    # Replace with your actual data processing logic writing to lakeFS via S3-compatible API
    write_data_task = BashOperator(
        task_id='write_simulated_data',
        bash_command=f"echo 'test data' > /tmp/data_on_{LAKEFS_BRANCH}.txt && echo 'Simulated data written for branch {LAKEFS_BRANCH}'",
    )

    # Commit the changes to the lakeFS branch
    commit_changes_task = CommitOperator(
        task_id='commit_to_lakefs',
        lakefs_conn_id=LAKEFS_CONN_ID,
        repo=LAKEFS_REPOSITORY,
        branch=LAKEFS_BRANCH,
        msg=f'Committed data for DAG run {{ {{ ds }} }} on branch {LAKEFS_BRANCH}',
        metadata={'airflow_dag_id': '{{ dag.dag_id }}', 'airflow_run_id': '{{ run_id }}'}
    )

    # Optionally, wait for a file to appear on the branch (e.g., from an external process)
    # This operator is illustrative; in a real DAG, 'write_data_task' would likely be writing to lakeFS directly.
    # The FileSensor would then check for that file within lakeFS.
    sense_file_task = FileSensor(
        task_id='sense_committed_file',
        lakefs_conn_id=LAKEFS_CONN_ID,
        repo=LAKEFS_REPOSITORY,
        branch=LAKEFS_BRANCH,
        path=f'/data_on_{LAKEFS_BRANCH}.txt', # Assuming data is written to the root of the branch
        poke_interval=5,
        timeout=60,
        soft_fail=True # Mark as soft_fail to allow DAG to continue if file not present in this example
    )

    # Merge the changes from the feature branch back to the main branch
    merge_branch_task = MergeOperator(
        task_id='merge_to_main',
        lakefs_conn_id=LAKEFS_CONN_ID,
        repo=LAKEFS_REPOSITORY,
        source_ref=LAKEFS_BRANCH,
        destination_branch=LAKEFS_SOURCE_BRANCH,
        msg=f'Merged changes from {LAKEFS_BRANCH} by Airflow DAG {{ {{ dag.dag_id }} }}',
    )

    create_branch_task >> write_data_task >> commit_changes_task >> sense_file_task >> merge_branch_task

view raw JSON →