{"id":6515,"library":"apache-airflow-providers-apache-beam","title":"Apache Airflow Apache Beam Provider","description":"The `apache-airflow-providers-apache-beam` package provides operators and hooks to seamlessly integrate Apache Airflow with Apache Beam. Apache Beam is an open-source, unified model for defining both batch and streaming data-parallel processing pipelines. This provider enables Airflow users to define, schedule, and monitor Beam pipelines, which can then be executed by various Beam-supported backends such as Apache Flink, Apache Spark, or Google Cloud Dataflow. The provider follows a roughly 2-3 month minor release cadence, with patch releases issued on an as-needed basis.","status":"active","version":"6.2.3","language":"en","source_language":"en","source_url":"https://github.com/apache/airflow/tree/main/airflow/providers/apache/beam","tags":["airflow","apache-beam","etl","data-processing","pipeline","gcp","dataflow"],"install":[{"cmd":"pip install apache-airflow-providers-apache-beam","lang":"bash","label":"Install core provider"},{"cmd":"pip install apache-airflow-providers-apache-beam[google]","lang":"bash","label":"Install with Google Cloud (Dataflow) support"}],"dependencies":[{"reason":"Core Apache Airflow installation; provider version 6.2.0+ requires Airflow >=2.11.0.","package":"apache-airflow","optional":false},{"reason":"Required for defining and executing Apache Beam pipelines; provider version 6.1.1+ requires Apache Beam >=2.60.0.","package":"apache-beam","optional":false},{"reason":"Required for running Apache Beam pipelines on Google Cloud Dataflow, which is a common use case. Install with the `google` extra.","package":"apache-airflow-providers-google","optional":true}],"imports":[{"symbol":"ApacheBeamHook","correct":"from airflow.providers.apache.beam.hooks.beam import ApacheBeamHook"},{"symbol":"BeamRunPythonPipelineOperator","correct":"from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator"},{"symbol":"BeamRunJavaPipelineOperator","correct":"from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator"},{"symbol":"BeamRunGoPipelineOperator","correct":"from airflow.providers.apache.beam.operators.beam import BeamRunGoPipelineOperator"}],"quickstart":{"code":"from airflow.models.dag import DAG\nfrom airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator\nfrom datetime import datetime\nimport os\n\n# NOTE: For this example to run, ensure you have a Python Beam pipeline script\n# available at the specified GCS path (or a local path).\n# E.g., a simple 'my_beam_pipeline.py' file in GCS:\n# import apache_beam as beam\n# from apache_beam.options.pipeline_options import PipelineOptions\n# def run_pipeline(argv=None):\n#     with beam.Pipeline(options=PipelineOptions(argv)) as pipeline:\n#         (pipeline | 'Create' >> beam.Create(['Hello', 'Airflow', 'Beam'])\n#                   | 'Log' >> beam.Map(print))\n# if __name__ == '__main__':\n#     import logging\n#     logging.getLogger().setLevel(logging.INFO)\n#     run_pipeline()\n\n# Define environment variables for the example; replace with your actual values.\n# Ensure 'your-gcp-project-id' and 'your-bucket' are valid and accessible.\nGCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'your-gcp-project-id')\nGCS_BEAM_PYTHON_FILE = os.environ.get('GCS_BEAM_PYTHON_FILE', 'gs://your-bucket/path/to/my_beam_pipeline.py')\nGCS_TEMP_LOCATION = os.environ.get('GCS_TEMP_LOCATION', 'gs://your-bucket/tmp')\n\nwith DAG(\n    dag_id='example_apache_beam_python_pipeline',\n    start_date=datetime(2023, 1, 1),\n    schedule_interval=None,\n    catchup=False,\n    tags=['apache_beam', 'dataflow', 'python'],\n) as dag:\n    run_python_beam_pipeline = BeamRunPythonPipelineOperator(\n        task_id='run_python_beam_pipeline',\n        py_file=GCS_BEAM_PYTHON_FILE,\n        pipeline_options=[\n            f'--project={GCP_PROJECT_ID}',\n            '--runner=DataflowRunner',\n            f'--temp_location={GCS_TEMP_LOCATION}',\n            '--region=us-central1',\n            '--staging_location=gs://your-bucket/staging', # Optional\n        ],\n        # If not using the default 'google_cloud_default' Airflow connection,\n        # specify gcp_conn_id='your_gcp_connection_id'\n    )","lang":"python","description":"This quickstart demonstrates how to use the `BeamRunPythonPipelineOperator` to execute an Apache Beam Python pipeline via Google Cloud Dataflow. It assumes an existing Beam Python script is stored in a Google Cloud Storage (GCS) bucket and uses environment variables for project and bucket configuration."},"warnings":[{"fix":"Use the `impersonation_chain` parameter instead to achieve impersonation for your Beam operators.","message":"The `delegate_to` parameter was removed from all Beam operators in version 5.0.0 of the provider. This primarily affected GCS and Dataflow hooks.","severity":"breaking","affected_versions":">=5.0.0"},{"fix":"Ensure `apache-beam` and `apache-airflow-providers-google` are on compatible versions. Installing the `apache-airflow-providers-apache-beam[google]` extra helps manage these cross-provider dependencies. For Dataflow, verify that your `apache-beam` version supports the Google Python clients used by the `google` provider. Upgrading both providers to their latest versions is often the simplest solution.","message":"Potential dependency conflicts can arise when `apache-beam[gcp]` and `apache-airflow-providers-google` are installed together, particularly if `apache-beam` does not support newer Google Python clients. This can lead to unexpected behavior or issues with BigQuery operators.","severity":"gotcha","affected_versions":"<6.2.3 (older provider versions and incompatible `apache-beam` versions)"},{"fix":"Before upgrading the provider, verify your Airflow core version meets the minimum requirement (`apache-airflow >=2.11.0` for provider 6.2.0+). Upgrade Airflow first if necessary, and be prepared to run `airflow upgrade db`.","message":"Provider versions have increasing minimum Apache Airflow core version requirements. For example, provider version 6.2.0 and higher require Airflow 2.11.0+. Installing a newer provider version on an older Airflow core might automatically upgrade Airflow, potentially requiring a `airflow upgrade db` migration.","severity":"breaking","affected_versions":">=6.2.0"},{"fix":"Ensure your Airflow environment is running Python 3.10 or newer before upgrading to provider versions 6.1.2 or later.","message":"Support for Python 3.9 was dropped in provider version 6.1.2. The provider now requires Python >=3.10.","severity":"breaking","affected_versions":">=6.1.2"}],"env_vars":null,"last_verified":"2026-04-15T00:00:00.000Z","next_check":"2026-07-14T00:00:00.000Z","problems":[{"fix":"Ensure Apache Beam is installed by running: pip install apache-beam","cause":"The Apache Beam module is not installed or not available in the Python environment.","error":"ImportError: import apache_beam as beam. Module not found"},{"fix":"Upgrade Airflow to version 2.0.0 or later and install the necessary provider packages.","cause":"The Airflow providers package is not installed or the Airflow version does not support providers.","error":"ModuleNotFoundError: No module named 'airflow.providers'"},{"fix":"Upgrade Apache Beam to a version that includes 'ReadFromBigQuery', such as version 2.15.0 or later.","cause":"The Apache Beam version in use does not include the 'ReadFromBigQuery' module.","error":"Module 'apache_beam.io' has no attribute 'ReadFromBigQuery'"},{"fix":"Ensure compatibility between Airflow and provider versions, and install the correct version of the provider package.","cause":"The specified version of the 'apache-airflow-backport-providers-google' package is not available.","error":"ERROR: Could not find a version that satisfies the requirement apache-airflow-backport-providers-google"},{"fix":"Install the required Airflow provider packages and verify the installation.","cause":"The 'airflow.providers' module is missing, possibly due to an incomplete Airflow installation or missing provider packages.","error":"Broken DAG: No module named 'airflow.providers'"}]}