Apache Airflow Apache Beam Provider
The `apache-airflow-providers-apache-beam` package provides operators and hooks to seamlessly integrate Apache Airflow with Apache Beam. Apache Beam is an open-source, unified model for defining both batch and streaming data-parallel processing pipelines. This provider enables Airflow users to define, schedule, and monitor Beam pipelines, which can then be executed by various Beam-supported backends such as Apache Flink, Apache Spark, or Google Cloud Dataflow. The provider follows a roughly 2-3 month minor release cadence, with patch releases issued on an as-needed basis.
Common errors
-
ImportError: import apache_beam as beam. Module not found
cause The Apache Beam module is not installed or not available in the Python environment.fixEnsure Apache Beam is installed by running: pip install apache-beam -
ModuleNotFoundError: No module named 'airflow.providers'
cause The Airflow providers package is not installed or the Airflow version does not support providers.fixUpgrade Airflow to version 2.0.0 or later and install the necessary provider packages. -
Module 'apache_beam.io' has no attribute 'ReadFromBigQuery'
cause The Apache Beam version in use does not include the 'ReadFromBigQuery' module.fixUpgrade Apache Beam to a version that includes 'ReadFromBigQuery', such as version 2.15.0 or later. -
ERROR: Could not find a version that satisfies the requirement apache-airflow-backport-providers-google
cause The specified version of the 'apache-airflow-backport-providers-google' package is not available.fixEnsure compatibility between Airflow and provider versions, and install the correct version of the provider package. -
Broken DAG: No module named 'airflow.providers'
cause The 'airflow.providers' module is missing, possibly due to an incomplete Airflow installation or missing provider packages.fixInstall the required Airflow provider packages and verify the installation.
Warnings
- breaking The `delegate_to` parameter was removed from all Beam operators in version 5.0.0 of the provider. This primarily affected GCS and Dataflow hooks.
- gotcha Potential dependency conflicts can arise when `apache-beam[gcp]` and `apache-airflow-providers-google` are installed together, particularly if `apache-beam` does not support newer Google Python clients. This can lead to unexpected behavior or issues with BigQuery operators.
- breaking Provider versions have increasing minimum Apache Airflow core version requirements. For example, provider version 6.2.0 and higher require Airflow 2.11.0+. Installing a newer provider version on an older Airflow core might automatically upgrade Airflow, potentially requiring a `airflow upgrade db` migration.
- breaking Support for Python 3.9 was dropped in provider version 6.1.2. The provider now requires Python >=3.10.
Install
-
pip install apache-airflow-providers-apache-beam -
pip install apache-airflow-providers-apache-beam[google]
Imports
- ApacheBeamHook
from airflow.providers.apache.beam.hooks.beam import ApacheBeamHook
- BeamRunPythonPipelineOperator
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
- BeamRunJavaPipelineOperator
from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
- BeamRunGoPipelineOperator
from airflow.providers.apache.beam.operators.beam import BeamRunGoPipelineOperator
Quickstart
from airflow.models.dag import DAG
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
from datetime import datetime
import os
# NOTE: For this example to run, ensure you have a Python Beam pipeline script
# available at the specified GCS path (or a local path).
# E.g., a simple 'my_beam_pipeline.py' file in GCS:
# import apache_beam as beam
# from apache_beam.options.pipeline_options import PipelineOptions
# def run_pipeline(argv=None):
# with beam.Pipeline(options=PipelineOptions(argv)) as pipeline:
# (pipeline | 'Create' >> beam.Create(['Hello', 'Airflow', 'Beam'])
# | 'Log' >> beam.Map(print))
# if __name__ == '__main__':
# import logging
# logging.getLogger().setLevel(logging.INFO)
# run_pipeline()
# Define environment variables for the example; replace with your actual values.
# Ensure 'your-gcp-project-id' and 'your-bucket' are valid and accessible.
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'your-gcp-project-id')
GCS_BEAM_PYTHON_FILE = os.environ.get('GCS_BEAM_PYTHON_FILE', 'gs://your-bucket/path/to/my_beam_pipeline.py')
GCS_TEMP_LOCATION = os.environ.get('GCS_TEMP_LOCATION', 'gs://your-bucket/tmp')
with DAG(
dag_id='example_apache_beam_python_pipeline',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['apache_beam', 'dataflow', 'python'],
) as dag:
run_python_beam_pipeline = BeamRunPythonPipelineOperator(
task_id='run_python_beam_pipeline',
py_file=GCS_BEAM_PYTHON_FILE,
pipeline_options=[
f'--project={GCP_PROJECT_ID}',
'--runner=DataflowRunner',
f'--temp_location={GCS_TEMP_LOCATION}',
'--region=us-central1',
'--staging_location=gs://your-bucket/staging', # Optional
],
# If not using the default 'google_cloud_default' Airflow connection,
# specify gcp_conn_id='your_gcp_connection_id'
)