{"id":7914,"library":"airflow-provider-lakefs","title":"lakeFS Airflow Provider","description":"The `airflow-provider-lakefs` library, currently at version 0.48.0, provides a seamless integration between Apache Airflow and lakeFS. It enables users to incorporate Git-like operations (branching, committing, merging, sensing) on data lakes directly within Airflow DAGs. The provider is actively maintained by Treeverse and follows a regular release cadence as updates to lakeFS and Airflow necessitate.","status":"active","version":"0.48.0","language":"en","source_language":"en","source_url":"https://github.com/treeverse/airflow-provider-lakeFS","tags":["airflow","lakefs","data lake","version control","ETL","data governance","provider"],"install":[{"cmd":"pip install airflow-provider-lakefs","lang":"bash","label":"Install latest version"}],"dependencies":[{"reason":"This is an Airflow provider and requires an existing Airflow installation.","package":"apache-airflow","optional":false},{"reason":"While not a direct install dependency for the provider itself, interacting with lakeFS often requires the client library for more advanced operations or local testing.","package":"lakefs-client","optional":true}],"imports":[{"symbol":"CreateBranchOperator","correct":"from lakefs_provider.operators.create_branch import CreateBranchOperator"},{"symbol":"CommitOperator","correct":"from lakefs_provider.operators.commit import CommitOperator"},{"symbol":"MergeOperator","correct":"from lakefs_provider.operators.merge import MergeOperator"},{"symbol":"FileSensor","correct":"from lakefs_provider.sensors.file import FileSensor"},{"symbol":"CommitSensor","correct":"from lakefs_provider.sensors.commit import CommitSensor"}],"quickstart":{"code":"import os\nimport pendulum\n\nfrom airflow.models.dag import DAG\nfrom airflow.operators.bash import BashOperator\n\nfrom lakefs_provider.operators.create_branch import CreateBranchOperator\nfrom lakefs_provider.operators.commit import CommitOperator\nfrom lakefs_provider.operators.merge import MergeOperator\nfrom lakefs_provider.sensors.file import FileSensor\n\nLAKEFS_REPOSITORY = os.environ.get('LAKEFS_REPOSITORY', 'example-repo')\nLAKEFS_BRANCH = os.environ.get('LAKEFS_BRANCH', 'example-branch-{{ ds_nodash }}')\nLAKEFS_SOURCE_BRANCH = os.environ.get('LAKEFS_SOURCE_BRANCH', 'main')\nLAKEFS_CONN_ID = os.environ.get('LAKEFS_CONN_ID', 'lakefs_default')\n\nwith DAG(\n    dag_id='lakefs_quickstart_dag',\n    start_date=pendulum.datetime(2023, 1, 1, tz='UTC'),\n    schedule=None,\n    catchup=False,\n    tags=['lakefs', 'example'],\n) as dag:\n    # Create a new branch for isolated work\n    create_branch_task = CreateBranchOperator(\n        task_id='create_lakefs_branch',\n        lakefs_conn_id=LAKEFS_CONN_ID,\n        repo=LAKEFS_REPOSITORY,\n        branch=LAKEFS_BRANCH,\n        source_branch=LAKEFS_SOURCE_BRANCH,\n    )\n\n    # Simulate data generation and writing to the lakeFS branch\n    # Replace with your actual data processing logic writing to lakeFS via S3-compatible API\n    write_data_task = BashOperator(\n        task_id='write_simulated_data',\n        bash_command=f\"echo 'test data' > /tmp/data_on_{LAKEFS_BRANCH}.txt && echo 'Simulated data written for branch {LAKEFS_BRANCH}'\",\n    )\n\n    # Commit the changes to the lakeFS branch\n    commit_changes_task = CommitOperator(\n        task_id='commit_to_lakefs',\n        lakefs_conn_id=LAKEFS_CONN_ID,\n        repo=LAKEFS_REPOSITORY,\n        branch=LAKEFS_BRANCH,\n        msg=f'Committed data for DAG run {{ {{ ds }} }} on branch {LAKEFS_BRANCH}',\n        metadata={'airflow_dag_id': '{{ dag.dag_id }}', 'airflow_run_id': '{{ run_id }}'}\n    )\n\n    # Optionally, wait for a file to appear on the branch (e.g., from an external process)\n    # This operator is illustrative; in a real DAG, 'write_data_task' would likely be writing to lakeFS directly.\n    # The FileSensor would then check for that file within lakeFS.\n    sense_file_task = FileSensor(\n        task_id='sense_committed_file',\n        lakefs_conn_id=LAKEFS_CONN_ID,\n        repo=LAKEFS_REPOSITORY,\n        branch=LAKEFS_BRANCH,\n        path=f'/data_on_{LAKEFS_BRANCH}.txt', # Assuming data is written to the root of the branch\n        poke_interval=5,\n        timeout=60,\n        soft_fail=True # Mark as soft_fail to allow DAG to continue if file not present in this example\n    )\n\n    # Merge the changes from the feature branch back to the main branch\n    merge_branch_task = MergeOperator(\n        task_id='merge_to_main',\n        lakefs_conn_id=LAKEFS_CONN_ID,\n        repo=LAKEFS_REPOSITORY,\n        source_ref=LAKEFS_BRANCH,\n        destination_branch=LAKEFS_SOURCE_BRANCH,\n        msg=f'Merged changes from {LAKEFS_BRANCH} by Airflow DAG {{ {{ dag.dag_id }} }}',\n    )\n\n    create_branch_task >> write_data_task >> commit_changes_task >> sense_file_task >> merge_branch_task","description":"This quickstart DAG demonstrates the core capabilities of the `airflow-provider-lakefs`. It creates a new lakeFS branch, simulates data being written and committed to it, waits for a file to be present on the branch, and then merges the changes back to a `main` branch. Before running, ensure you have an Airflow connection named `lakefs_default` (or set `LAKEFS_CONN_ID` environment variable) configured as an HTTP connection pointing to your lakeFS endpoint, with `access_key_id` and `secret_access_key` in the 'Extra' field.\n\nExample `airflow connections add` command:\n`airflow connections add conn_lakefs --conn-type=HTTP --conn-host=http://<LAKEFS_ENDPOINT> --conn-extra='{\"access_key_id\":\"<LAKEFS_ACCESS_KEY_ID>\",\"secret_access_key\":\"<LAKEFS_SECRET_ACCESS_KEY>\"}'`"},"warnings":[{"fix":"Avoid using `airflow-provider-lakefs` operators directly inside TaskGroups. Instead, define them outside and manage dependencies explicitly or use a `SubDagOperator` for complex groupings if absolutely necessary, though direct operator usage is recommended.","message":"LakeFS operators within Airflow TaskGroups can cause DAGs to not appear in the scheduler. This behavior has been observed with `LakeFSMergeOperator` and likely affects others.","severity":"gotcha","affected_versions":"0.44.0+"},{"fix":"Verify that the Airflow connection of type 'HTTP' is correctly configured with your lakeFS endpoint as `conn_host` and the `access_key_id` and `secret_access_key` in the 'Extra' JSON field, e.g., `'{\"access_key_id\":\"<YOUR_ACCESS_KEY>\",\"secret_access_key\":\"<YOUR_SECRET_KEY>\"}'`. Use environment variables or Airflow Variables for sensitive credentials.","message":"Ensuring proper Airflow connection setup for lakeFS is crucial. Incorrect `conn_type`, `conn_host`, or `conn_extra` parameters (especially for `access_key_id` and `secret_access_key`) will lead to authentication failures.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Pin the `airflow-provider-lakefs` version in your `requirements.txt` (e.g., `airflow-provider-lakefs==0.48.0`) and review the project's GitHub repository or PyPI changelog for any breaking changes between specific minor versions before updating.","message":"As a pre-1.0 release (0.x.x), minor versions of `airflow-provider-lakefs` may introduce breaking changes without strict adherence to semantic versioning, although major API changes are usually documented. Always consult the GitHub CHANGELOG.md before upgrading minor versions.","severity":"breaking","affected_versions":"All 0.x.x versions"}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"`pip install airflow-provider-lakefs` in your Airflow environment. Ensure it's installed on all components (scheduler, worker, webserver) that interact with DAGs using this provider.","cause":"The `airflow-provider-lakefs` package is not installed in the Airflow environment where the scheduler or worker is running.","error":"DAG 'my_dag_id' failed to import: ModuleNotFoundError: No module named 'lakefs_provider'"},{"fix":"Create an Airflow connection with the ID `lakefs_default` (or whatever ID you're using). It should be of type 'HTTP', with `conn_host` set to your lakeFS endpoint, and `conn_extra` containing `access_key_id` and `secret_access_key` in JSON format.","cause":"The Airflow connection ID specified in the operator (e.g., `lakefs_conn_id='lakefs_default'`) does not exist in your Airflow environment.","error":"airflow.exceptions.AirflowException: The connection 'lakefs_default' is not defined. Please define it in Airflow UI or environment variables."},{"fix":"Refactor your DAG to avoid placing `airflow-provider-lakefs` operators inside TaskGroups. Define them as regular tasks and manage dependencies explicitly. This is a known issue (as of version 0.44.0) that can be a 'puzzler'.","cause":"Using `airflow-provider-lakefs` operators directly within an Airflow TaskGroup can prevent the DAG from being parsed and displayed by the Airflow scheduler.","error":"DAG 'lakefs_task_group_issue' did not appear in the Airflow UI/scheduler."}]}