Apache Airflow Common IO Provider
The Apache Airflow Common IO Provider (apache-airflow-providers-common-io, current version 1.7.2) offers a unified interface for interacting with various file systems within Airflow tasks, abstracting away the underlying storage details. It aims to simplify DAG development by providing generic operators and hooks that can work across different storage backends (e.g., local, S3, GCS, Azure Blob Storage), with specific implementations provided by other Airflow provider packages. This provider follows the regular Apache Airflow provider release cadence, receiving updates frequently alongside core Airflow releases.
Warnings
- gotcha The `common-io` provider offers a generic interface, but it does NOT provide the concrete implementations for specific cloud storage services (e.g., S3, GCS, Azure Blob Storage). To use `common-io` hooks or operators with these services, you must also install the respective cloud provider packages (e.g., `apache-airflow-providers-s3`, `apache-airflow-providers-google`). Without them, `common-io` will not be able to interact with those backends.
- gotcha Operators like `FileTransferOperator` and hooks like `FileSystemHook` heavily rely on Airflow Connection IDs (e.g., `source_filesystem_conn_id`, `destination_filesystem_conn_id`). Misconfigured connections, incorrect connection types, or missing connections will lead to runtime errors when the task attempts to interact with the file system.
- gotcha The `apache-airflow-providers-common-io` package requires Python >=3.10. Users running Airflow on older Python versions will encounter installation or runtime errors due to this dependency.
Install
-
pip install apache-airflow-providers-common-io
Imports
- FileSystemHook
from airflow.providers.common.io.hooks.filesystem import FileSystemHook
- FileTransferOperator
from airflow.providers.common.io.operators.file_transfer import FileTransferOperator
Quickstart
from __future__ import annotations
import os
import tempfile
from pathlib import Path
import pendulum
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.common.io.hooks.filesystem import FileSystemHook
def _demonstrate_common_io_hook():
# Create a temporary local directory and files to demonstrate listing
with tempfile.TemporaryDirectory() as tmpdir:
test_dir_path = Path(tmpdir) / "common_io_test_data"
test_dir_path.mkdir(exist_ok=True)
(test_dir_path / "file_a.txt").write_text("Content A")
(test_dir_path / "file_b.txt").write_text("Content B")
print(f"Created dummy files in: {test_dir_path}")
# Instantiate FileSystemHook.
# In a real Airflow setup, this hook would typically resolve an Airflow Connection ID
# (e.g., `conn_id='fs_default'`) to determine the base path and other config.
# For this quickstart, we explicitly set the base_path to our temporary directory
# to make it runnable without prior Airflow UI connection setup.
hook = FileSystemHook()
hook.base_path = str(test_dir_path)
print(f"Listing files in base_path: {hook.base_path}")
# List files using the common IO interface
listed_items = list(hook.list_path()) # list_path returns a generator of BasePath objects
if listed_items:
print("Files found:")
for item in listed_items:
print(f"- {item.path_str}")
else:
print("No files found.")
with DAG(
dag_id="common_io_quickstart",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
catchup=False,
schedule=None,
tags=["common_io", "example", "quickstart"],
) as dag:
list_files_task = PythonOperator(
task_id="list_files_with_common_io",
python_callable=_demonstrate_common_io_hook,
)