Apache Airflow Apache Beam Provider

6.2.3 · active · verified Wed Apr 15

The `apache-airflow-providers-apache-beam` package provides operators and hooks to seamlessly integrate Apache Airflow with Apache Beam. Apache Beam is an open-source, unified model for defining both batch and streaming data-parallel processing pipelines. This provider enables Airflow users to define, schedule, and monitor Beam pipelines, which can then be executed by various Beam-supported backends such as Apache Flink, Apache Spark, or Google Cloud Dataflow. The provider follows a roughly 2-3 month minor release cadence, with patch releases issued on an as-needed basis.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use the `BeamRunPythonPipelineOperator` to execute an Apache Beam Python pipeline via Google Cloud Dataflow. It assumes an existing Beam Python script is stored in a Google Cloud Storage (GCS) bucket and uses environment variables for project and bucket configuration.

from airflow.models.dag import DAG
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
from datetime import datetime
import os

# NOTE: For this example to run, ensure you have a Python Beam pipeline script
# available at the specified GCS path (or a local path).
# E.g., a simple 'my_beam_pipeline.py' file in GCS:
# import apache_beam as beam
# from apache_beam.options.pipeline_options import PipelineOptions
# def run_pipeline(argv=None):
#     with beam.Pipeline(options=PipelineOptions(argv)) as pipeline:
#         (pipeline | 'Create' >> beam.Create(['Hello', 'Airflow', 'Beam'])
#                   | 'Log' >> beam.Map(print))
# if __name__ == '__main__':
#     import logging
#     logging.getLogger().setLevel(logging.INFO)
#     run_pipeline()

# Define environment variables for the example; replace with your actual values.
# Ensure 'your-gcp-project-id' and 'your-bucket' are valid and accessible.
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'your-gcp-project-id')
GCS_BEAM_PYTHON_FILE = os.environ.get('GCS_BEAM_PYTHON_FILE', 'gs://your-bucket/path/to/my_beam_pipeline.py')
GCS_TEMP_LOCATION = os.environ.get('GCS_TEMP_LOCATION', 'gs://your-bucket/tmp')

with DAG(
    dag_id='example_apache_beam_python_pipeline',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['apache_beam', 'dataflow', 'python'],
) as dag:
    run_python_beam_pipeline = BeamRunPythonPipelineOperator(
        task_id='run_python_beam_pipeline',
        py_file=GCS_BEAM_PYTHON_FILE,
        pipeline_options=[
            f'--project={GCP_PROJECT_ID}',
            '--runner=DataflowRunner',
            f'--temp_location={GCS_TEMP_LOCATION}',
            '--region=us-central1',
            '--staging_location=gs://your-bucket/staging', # Optional
        ],
        # If not using the default 'google_cloud_default' Airflow connection,
        # specify gcp_conn_id='your_gcp_connection_id'
    )

view raw JSON →