Apache Airflow Common IO Provider

1.7.2 · active · verified Thu Apr 09

The Apache Airflow Common IO Provider (apache-airflow-providers-common-io, current version 1.7.2) offers a unified interface for interacting with various file systems within Airflow tasks, abstracting away the underlying storage details. It aims to simplify DAG development by providing generic operators and hooks that can work across different storage backends (e.g., local, S3, GCS, Azure Blob Storage), with specific implementations provided by other Airflow provider packages. This provider follows the regular Apache Airflow provider release cadence, receiving updates frequently alongside core Airflow releases.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use the `FileSystemHook` from the `common-io` provider to list files in a local directory. While `common-io` aims to abstract different file systems, the `FileSystemHook` (and `FileTransferOperator`) typically rely on Airflow Connection IDs (e.g., `fs_default` for local file systems, `aws_default` for S3) to configure their backend. This example manually sets the `base_path` to a temporary directory for local execution without requiring prior Airflow UI connection setup. In a production DAG, you would typically pass `conn_id` to the hook/operator and configure the connection in Airflow.

from __future__ import annotations

import os
import tempfile
from pathlib import Path

import pendulum

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.common.io.hooks.filesystem import FileSystemHook

def _demonstrate_common_io_hook():
    # Create a temporary local directory and files to demonstrate listing
    with tempfile.TemporaryDirectory() as tmpdir:
        test_dir_path = Path(tmpdir) / "common_io_test_data"
        test_dir_path.mkdir(exist_ok=True)
        (test_dir_path / "file_a.txt").write_text("Content A")
        (test_dir_path / "file_b.txt").write_text("Content B")
        print(f"Created dummy files in: {test_dir_path}")

        # Instantiate FileSystemHook.
        # In a real Airflow setup, this hook would typically resolve an Airflow Connection ID
        # (e.g., `conn_id='fs_default'`) to determine the base path and other config.
        # For this quickstart, we explicitly set the base_path to our temporary directory
        # to make it runnable without prior Airflow UI connection setup.
        hook = FileSystemHook()
        hook.base_path = str(test_dir_path)

        print(f"Listing files in base_path: {hook.base_path}")
        # List files using the common IO interface
        listed_items = list(hook.list_path()) # list_path returns a generator of BasePath objects
        
        if listed_items:
            print("Files found:")
            for item in listed_items:
                print(f"- {item.path_str}")
        else:
            print("No files found.")

with DAG(
    dag_id="common_io_quickstart",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["common_io", "example", "quickstart"],
) as dag:
    list_files_task = PythonOperator(
        task_id="list_files_with_common_io",
        python_callable=_demonstrate_common_io_hook,
    )

view raw JSON →