Apache Airflow Apache Flink Provider
The Apache Airflow Apache Flink Provider package extends Apache Airflow's capabilities by integrating with Apache Flink. It enables users to programmatically author, schedule, and monitor workflows that involve submitting Flink jobs and interacting with Flink clusters directly from Airflow DAGs. This provider is actively maintained, with version 1.8.4 released on March 28, 2026, and follows a regular release cadence aligned with the broader Airflow ecosystem.
Common errors
-
Broken DAG: No module named 'airflow.providers.apache.flink.operators'
cause The Apache Flink provider package is not installed or not accessible by the Airflow component (scheduler, webserver, or worker) attempting to parse or execute the DAG.fixEnsure `apache-airflow-providers-apache-flink` is installed in the Python environment of all Airflow components. If using Docker, rebuild your Airflow image after adding the provider to `requirements.txt`. For example: `pip install apache-airflow-providers-apache-flink`. -
HTTP response headers: HTTPHeaderDict({'Audit-Id': ...})","reason":"BadRequest","code":400}cause This error can occur when using `FlinkKubernetesOperator` and typically indicates an issue with the Kubernetes custom resource definition (CRD) for FlinkDeployment, often due to malformed YAML/JSON in the `application_file` or incorrect configuration for the Flink application.fixThoroughly review the content of your `application_file` for syntax errors or invalid Flink deployment configurations. Consider converting the file content to a string and passing it directly to the `application_file` parameter as a workaround if file path parsing is problematic. -
DAGs Not Appearing in the UI / Scheduler Not Picking Up DAGs
cause While not specific to Flink, this is a common Airflow issue that can arise if DAG files using the Flink provider have syntax errors, are not in the correct DAGs folder, or if the scheduler has issues (e.g., database connectivity, misconfiguration).fixVerify the DAG file path is correct (`~/airflow/dags/`). Check scheduler logs (`airflow logs scheduler`) for parsing errors. Validate Python code for syntax issues. Ensure `apache-airflow-providers-apache-flink` is installed across all Airflow components if it's a distributed setup.
Warnings
- breaking Airflow provider packages, including apache-airflow-providers-apache-flink, have minimum Apache Airflow core version requirements that change with new provider releases. For instance, provider version 1.8.0 requires Airflow 2.11.0+, while older versions might support earlier Airflow versions. Installing a newer provider with an older Airflow core can lead to unexpected errors or incompatibility.
- gotcha When deploying Airflow in distributed environments (e.g., Docker, Kubernetes), the `apache-airflow-providers-apache-flink` package must be installed on all relevant Airflow components, including the scheduler, webserver, and workers. Failing to install the provider consistently across all components can result in errors like DAGs not appearing or tasks failing due to missing imports.
- gotcha The `FlinkKubernetesOperator`'s `application_file` parameter can accept either a path to a `.yaml` or `.json` file, or a YAML/JSON string directly. There have been reported issues where passing a file path might not work as expected, while passing the content as a string resolves the issue. This can be a subtle parsing problem.
Install
-
pip install apache-airflow-providers-apache-flink -
pip install apache-airflow-providers-apache-flink[cncf.kubernetes]
Imports
- FlinkOperator
from airflow.providers.apache.flink.operators.flink import FlinkOperator
- FlinkHook
from airflow.providers.apache.flink.hooks.flink import FlinkHook
- FlinkKubernetesOperator
from airflow.providers.apache.flink.operators.flink_kubernetes import FlinkKubernetesOperator
Quickstart
from __future__ import annotations
import os
import pendulum
from airflow.models.dag import DAG
from airflow.providers.apache.flink.operators.flink import FlinkOperator
with DAG(
dag_id="flink_example_dag",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
tags=["flink", "example"],
) as dag:
submit_flink_job = FlinkOperator(
task_id="submit_example_flink_job",
job_name="my_example_flink_job",
main_class="com.example.flink.MyJob",
jar="/path/to/my-flink-job.jar",
flink_configuration={
"taskmanager.memory.process.size": "2g",
"kubernetes.container.image": os.environ.get("FLINK_K8S_IMAGE", "flink:latest"),
},
# Ensure a Flink connection is configured in Airflow UI with ID 'flink_default'
# flink_conn_id="flink_default",
)