Apache Flink Python API (PyFlink)
PyFlink is the Python API for Apache Flink, a powerful open-source stream-processing framework. It enables users to write Flink jobs using Python's DataStream and Table APIs, leveraging Flink's robust capabilities for stateful computations, fault tolerance, and high throughput. Version 2.2.0 is compatible with Flink 1.18+ and requires Python 3.9+. New releases generally align with major Flink core updates and maintain a consistent release cadence.
Common errors
-
No module named 'pyflink'
cause You likely installed the package using `pip install apache-flink` but tried to `import apache_flink`.fixThe correct import name is `pyflink`. Use `import pyflink` or `from pyflink.table import TableEnvironment` etc. -
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
cause You are trying to use a Flink connector (e.g., Kafka, JDBC) but the corresponding Flink JAR file for that connector is not present in the Flink runtime classpath.fixDownload the required Flink connector JAR (e.g., `flink-connector-kafka_2.12-1.18.jar`) from Maven Central or Flink's download page. When running your job, specify the JAR using the `-j` flag: `bin/flink run -pyfs your_job.py -j path/to/flink-connector-kafka.jar`. -
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. : org.apache.flink.table.api.TableException: Invalid source table.
cause This is a generic error from the Flink Java backend, often indicating an issue with how a table source/sink is defined, or a misconfiguration of the TableEnvironment.fixReview your `TableDescriptor` and `Schema` definitions. Ensure all required properties for the source/sink format are correctly set and that the schema matches the data. Check Flink's JobManager logs for more detailed error messages. -
ValueError: Unsupported Python version. PyFlink only supports Python 3.9, 3.10, 3.11.
cause You are attempting to run PyFlink 2.x (or later) with a Python version older than 3.9.fixUpgrade your Python environment to 3.9 or higher. If using a virtual environment, ensure it's created with a compatible Python version.
Warnings
- breaking PyFlink version 2.x and later requires Python 3.9 or higher. Older PyFlink versions (e.g., 1.x) supported earlier Python versions.
- breaking The Table API has undergone significant evolution. The use of `TableDescriptor`, `Schema`, and format-specific classes like `Csv` from `pyflink.table.formats` has replaced the older generic `pyflink.table.descriptors` API for defining sources and sinks.
- gotcha The Python package name is `apache-flink`, but the importable module within Python is `pyflink`. Attempting to `import apache_flink` will result in a `ModuleNotFoundError`.
- gotcha Many Flink connectors (e.g., Kafka, JDBC, Hive) require additional JAR files to be present in the Flink classpath. The Python package only provides the API, not all runtime dependencies.
- gotcha PyFlink requires a Java Runtime Environment (JRE) to be installed and accessible. It also necessitates a Flink distribution for local execution or a running Flink cluster for deployment.
Install
-
pip install apache-flink
Imports
- StreamExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
- TableEnvironment
from pyflink.table import TableEnvironment
- EnvironmentSettings
from pyflink.table import EnvironmentSettings
- DataTypes
from pyflink.table.types import DataTypes
- TableDescriptor
from pyflink.table.descriptors import TableDescriptor
from pyflink.table import TableDescriptor
- Schema
from pyflink.table import Schema
- Csv (Format)
from pyflink.table.descriptors import Csv
from pyflink.table.formats.csv import Csv
- PyFlink root import
import apache_flink
from pyflink.table import ...
Quickstart
import os
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.types import DataTypes
def quickstart_pyflink_table_api():
# Set up the Table Environment in streaming mode
settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(settings)
# Optional: Set parallelism for local execution
t_env.get_config().set("parallelism.default", "1")
# Define some in-memory data
data = [
("Alice", 100),
("Bob", 200),
("Charlie", 150),
("Alice", 50)
]
# Create a temporary view from the collection with an explicit schema
t_env.create_temporary_view(
"input_table",
t_env.from_elements(
data,
schema=DataTypes.ROW([
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT())
])
)
)
# Perform a simple aggregation
result = t_env.from_path("input_table") \
.group_by("name") \
.select("name, SUM(score) as total_score") \
.execute()
# Collect and print the results (for local execution)
print("\n--- PyFlink Quickstart Results ---")
with result.collect() as sink:
for row in sink:
print(row)
print("----------------------------------")
if __name__ == '__main__':
quickstart_pyflink_table_api()