{"id":9475,"library":"apache-flink","title":"Apache Flink Python API (PyFlink)","description":"PyFlink is the Python API for Apache Flink, a powerful open-source stream-processing framework. It enables users to write Flink jobs using Python's DataStream and Table APIs, leveraging Flink's robust capabilities for stateful computations, fault tolerance, and high throughput. Version 2.2.0 is compatible with Flink 1.18+ and requires Python 3.9+. New releases generally align with major Flink core updates and maintain a consistent release cadence.","status":"active","version":"2.2.0","language":"en","source_language":"en","source_url":"https://github.com/apache/flink","tags":["apache flink","pyflink","data streaming","stream processing","big data","etl","distributed systems"],"install":[{"cmd":"pip install apache-flink","lang":"bash","label":"Install PyFlink"}],"dependencies":[{"reason":"Required for communication between Python and the Flink (Java) runtime.","package":"py4j","optional":false}],"imports":[{"symbol":"StreamExecutionEnvironment","correct":"from pyflink.datastream import StreamExecutionEnvironment"},{"symbol":"TableEnvironment","correct":"from pyflink.table import TableEnvironment"},{"symbol":"EnvironmentSettings","correct":"from pyflink.table import EnvironmentSettings"},{"symbol":"DataTypes","correct":"from pyflink.table.types import DataTypes"},{"note":"For PyFlink 2.x and later, TableDescriptor is directly under `pyflink.table`.","wrong":"from pyflink.table.descriptors import TableDescriptor","symbol":"TableDescriptor","correct":"from pyflink.table import TableDescriptor"},{"symbol":"Schema","correct":"from pyflink.table import Schema"},{"note":"Modern Table API uses `pyflink.table.formats` for specific format implementations instead of generic descriptors.","wrong":"from pyflink.table.descriptors import Csv","symbol":"Csv (Format)","correct":"from pyflink.table.formats.csv import Csv"},{"note":"The PyPI package name is 'apache-flink', but the importable module name is 'pyflink'.","wrong":"import apache_flink","symbol":"PyFlink root import","correct":"from pyflink.table import ..."}],"quickstart":{"code":"import os\nfrom pyflink.table import EnvironmentSettings, TableEnvironment\nfrom pyflink.table.types import DataTypes\n\ndef quickstart_pyflink_table_api():\n    # Set up the Table Environment in streaming mode\n    settings = EnvironmentSettings.in_streaming_mode()\n    t_env = TableEnvironment.create(settings)\n\n    # Optional: Set parallelism for local execution\n    t_env.get_config().set(\"parallelism.default\", \"1\")\n\n    # Define some in-memory data\n    data = [\n        (\"Alice\", 100),\n        (\"Bob\", 200),\n        (\"Charlie\", 150),\n        (\"Alice\", 50)\n    ]\n\n    # Create a temporary view from the collection with an explicit schema\n    t_env.create_temporary_view(\n        \"input_table\",\n        t_env.from_elements(\n            data,\n            schema=DataTypes.ROW([\n                DataTypes.FIELD(\"name\", DataTypes.STRING()),\n                DataTypes.FIELD(\"score\", DataTypes.INT())\n            ])\n        )\n    )\n\n    # Perform a simple aggregation\n    result = t_env.from_path(\"input_table\") \\\n        .group_by(\"name\") \\\n        .select(\"name, SUM(score) as total_score\") \\\n        .execute()\n\n    # Collect and print the results (for local execution)\n    print(\"\\n--- PyFlink Quickstart Results ---\")\n    with result.collect() as sink:\n        for row in sink:\n            print(row)\n    print(\"----------------------------------\")\n\nif __name__ == '__main__':\n    quickstart_pyflink_table_api()","lang":"python","description":"This quickstart demonstrates a simple PyFlink Table API job. It creates a streaming table environment, defines an in-memory source, performs a group-by aggregation, and prints the results to the console. This showcases basic setup, data ingestion, transformation, and execution for local development."},"warnings":[{"fix":"Upgrade your Python environment to 3.9+ or use an older PyFlink version compatible with your Python interpreter.","message":"PyFlink version 2.x and later requires Python 3.9 or higher. Older PyFlink versions (e.g., 1.x) supported earlier Python versions.","severity":"breaking","affected_versions":"2.0.0+"},{"fix":"Refer to the official PyFlink documentation for the current Table API patterns, specifically for defining `TableDescriptor` and `Schema` directly under `pyflink.table` and using format classes from `pyflink.table.formats`.","message":"The Table API has undergone significant evolution. The use of `TableDescriptor`, `Schema`, and format-specific classes like `Csv` from `pyflink.table.formats` has replaced the older generic `pyflink.table.descriptors` API for defining sources and sinks.","severity":"breaking","affected_versions":"2.0.0+"},{"fix":"Always use `import pyflink` or `from pyflink import ...` when importing components from the library.","message":"The Python package name is `apache-flink`, but the importable module within Python is `pyflink`. Attempting to `import apache_flink` will result in a `ModuleNotFoundError`.","severity":"gotcha","affected_versions":"All"},{"fix":"Download the necessary Flink connector JARs from Maven Central or the official Flink downloads page and make them available to your Flink cluster (e.g., via `bin/flink run -pyfs ... -j ...` or by placing them in the `lib` directory of your Flink installation).","message":"Many Flink connectors (e.g., Kafka, JDBC, Hive) require additional JAR files to be present in the Flink classpath. The Python package only provides the API, not all runtime dependencies.","severity":"gotcha","affected_versions":"All"},{"fix":"Ensure `java` is in your system's PATH. For local development, download a Flink binary distribution and set the `FLINK_HOME` environment variable, or use `StreamExecutionEnvironment.create_local_environment()` which can automatically manage a mini-cluster.","message":"PyFlink requires a Java Runtime Environment (JRE) to be installed and accessible. It also necessitates a Flink distribution for local execution or a running Flink cluster for deployment.","severity":"gotcha","affected_versions":"All"}],"env_vars":null,"last_verified":"2026-04-17T00:00:00.000Z","next_check":"2026-07-16T00:00:00.000Z","problems":[{"fix":"The correct import name is `pyflink`. Use `import pyflink` or `from pyflink.table import TableEnvironment` etc.","cause":"You likely installed the package using `pip install apache-flink` but tried to `import apache_flink`.","error":"No module named 'pyflink'"},{"fix":"Download the required Flink connector JAR (e.g., `flink-connector-kafka_2.12-1.18.jar`) from Maven Central or Flink's download page. When running your job, specify the JAR using the `-j` flag: `bin/flink run -pyfs your_job.py -j path/to/flink-connector-kafka.jar`.","cause":"You are trying to use a Flink connector (e.g., Kafka, JDBC) but the corresponding Flink JAR file for that connector is not present in the Flink runtime classpath.","error":"java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer"},{"fix":"Review your `TableDescriptor` and `Schema` definitions. Ensure all required properties for the source/sink format are correctly set and that the schema matches the data. Check Flink's JobManager logs for more detailed error messages.","cause":"This is a generic error from the Flink Java backend, often indicating an issue with how a table source/sink is defined, or a misconfiguration of the TableEnvironment.","error":"py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. : org.apache.flink.table.api.TableException: Invalid source table."},{"fix":"Upgrade your Python environment to 3.9 or higher. If using a virtual environment, ensure it's created with a compatible Python version.","cause":"You are attempting to run PyFlink 2.x (or later) with a Python version older than 3.9.","error":"ValueError: Unsupported Python version. PyFlink only supports Python 3.9, 3.10, 3.11."}]}