{"id":7930,"library":"apache-airflow-providers-apache-flink","title":"Apache Airflow Apache Flink Provider","description":"The Apache Airflow Apache Flink Provider package extends Apache Airflow's capabilities by integrating with Apache Flink. It enables users to programmatically author, schedule, and monitor workflows that involve submitting Flink jobs and interacting with Flink clusters directly from Airflow DAGs. This provider is actively maintained, with version 1.8.4 released on March 28, 2026, and follows a regular release cadence aligned with the broader Airflow ecosystem.","status":"active","version":"1.8.4","language":"en","source_language":"en","source_url":"https://github.com/apache/airflow","tags":["Apache Airflow","Apache Flink","ETL","Workflow Orchestration","Data Streaming","Provider","Kubernetes"],"install":[{"cmd":"pip install apache-airflow-providers-apache-flink","lang":"bash","label":"Install core provider"},{"cmd":"pip install apache-airflow-providers-apache-flink[cncf.kubernetes]","lang":"bash","label":"Install with Kubernetes support"}],"dependencies":[{"reason":"Core Airflow functionality is required to use this provider.","package":"apache-airflow","optional":false},{"reason":"Provides common compatibility features across providers.","package":"apache-airflow-providers-common-compat","optional":true},{"reason":"Required for secure operations, likely within underlying Airflow or Flink connections.","package":"cryptography","optional":false},{"reason":"Needed for using FlinkKubernetesOperator to deploy Flink applications on Kubernetes.","package":"apache-airflow-providers-cncf-kubernetes","optional":true}],"imports":[{"symbol":"FlinkOperator","correct":"from airflow.providers.apache.flink.operators.flink import FlinkOperator"},{"symbol":"FlinkHook","correct":"from airflow.providers.apache.flink.hooks.flink import FlinkHook"},{"symbol":"FlinkKubernetesOperator","correct":"from airflow.providers.apache.flink.operators.flink_kubernetes import FlinkKubernetesOperator"}],"quickstart":{"code":"from __future__ import annotations\n\nimport os\nimport pendulum\n\nfrom airflow.models.dag import DAG\nfrom airflow.providers.apache.flink.operators.flink import FlinkOperator\n\nwith DAG(\n    dag_id=\"flink_example_dag\",\n    start_date=pendulum.datetime(2023, 1, 1, tz=\"UTC\"),\n    schedule=None,\n    catchup=False,\n    tags=[\"flink\", \"example\"],\n) as dag:\n    submit_flink_job = FlinkOperator(\n        task_id=\"submit_example_flink_job\",\n        job_name=\"my_example_flink_job\",\n        main_class=\"com.example.flink.MyJob\",\n        jar=\"/path/to/my-flink-job.jar\",\n        flink_configuration={\n            \"taskmanager.memory.process.size\": \"2g\",\n            \"kubernetes.container.image\": os.environ.get(\"FLINK_K8S_IMAGE\", \"flink:latest\"),\n        },\n        # Ensure a Flink connection is configured in Airflow UI with ID 'flink_default'\n        # flink_conn_id=\"flink_default\", \n    )","lang":"python","description":"This quickstart demonstrates a basic Airflow DAG using the `FlinkOperator` to submit a Flink job. Ensure that you have a Flink connection configured in your Airflow environment (e.g., named 'flink_default') and replace `/path/to/my-flink-job.jar` with the actual path to your Flink job JAR file. For Kubernetes deployments, the Flink image can be configured via `FLINK_K8S_IMAGE` environment variable."},"warnings":[{"fix":"Always check the provider's official documentation or changelog for the `Requirements` section to verify the minimum supported Airflow version before upgrading. Ensure your Airflow environment meets these requirements.","message":"Airflow provider packages, including apache-airflow-providers-apache-flink, have minimum Apache Airflow core version requirements that change with new provider releases. For instance, provider version 1.8.0 requires Airflow 2.11.0+, while older versions might support earlier Airflow versions. Installing a newer provider with an older Airflow core can lead to unexpected errors or incompatibility.","severity":"breaking","affected_versions":"<1.8.0 with Airflow >=2.11.0, or >=1.8.0 with Airflow <2.11.0"},{"fix":"Include `pip install apache-airflow-providers-apache-flink` in your Airflow Dockerfile or ensure it's part of your `requirements.txt` used for building Airflow images or installing dependencies on all nodes.","message":"When deploying Airflow in distributed environments (e.g., Docker, Kubernetes), the `apache-airflow-providers-apache-flink` package must be installed on all relevant Airflow components, including the scheduler, webserver, and workers. Failing to install the provider consistently across all components can result in errors like DAGs not appearing or tasks failing due to missing imports.","severity":"gotcha","affected_versions":"All versions in distributed Airflow setups."},{"fix":"If encountering issues with `application_file` as a path, try reading the file content into a string and passing the string directly to the `application_file` parameter. For example, `application_file=open('flink_deployment.yaml').read()`.","message":"The `FlinkKubernetesOperator`'s `application_file` parameter can accept either a path to a `.yaml` or `.json` file, or a YAML/JSON string directly. There have been reported issues where passing a file path might not work as expected, while passing the content as a string resolves the issue. This can be a subtle parsing problem.","severity":"gotcha","affected_versions":"Potentially all versions using `FlinkKubernetesOperator`."}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"Ensure `apache-airflow-providers-apache-flink` is installed in the Python environment of all Airflow components. If using Docker, rebuild your Airflow image after adding the provider to `requirements.txt`. For example: `pip install apache-airflow-providers-apache-flink`.","cause":"The Apache Flink provider package is not installed or not accessible by the Airflow component (scheduler, webserver, or worker) attempting to parse or execute the DAG.","error":"Broken DAG: No module named 'airflow.providers.apache.flink.operators'"},{"fix":"Thoroughly review the content of your `application_file` for syntax errors or invalid Flink deployment configurations. Consider converting the file content to a string and passing it directly to the `application_file` parameter as a workaround if file path parsing is problematic.","cause":"This error can occur when using `FlinkKubernetesOperator` and typically indicates an issue with the Kubernetes custom resource definition (CRD) for FlinkDeployment, often due to malformed YAML/JSON in the `application_file` or incorrect configuration for the Flink application.","error":"HTTP response headers: HTTPHeaderDict({'Audit-Id': ...})\",\"reason\":\"BadRequest\",\"code\":400}"},{"fix":"Verify the DAG file path is correct (`~/airflow/dags/`). Check scheduler logs (`airflow logs scheduler`) for parsing errors. Validate Python code for syntax issues. Ensure `apache-airflow-providers-apache-flink` is installed across all Airflow components if it's a distributed setup.","cause":"While not specific to Flink, this is a common Airflow issue that can arise if DAG files using the Flink provider have syntax errors, are not in the correct DAGs folder, or if the scheduler has issues (e.g., database connectivity, misconfiguration).","error":"DAGs Not Appearing in the UI / Scheduler Not Picking Up DAGs"}]}