Apache Flink Python API (PyFlink)

2.2.0 · active · verified Fri Apr 17

PyFlink is the Python API for Apache Flink, a powerful open-source stream-processing framework. It enables users to write Flink jobs using Python's DataStream and Table APIs, leveraging Flink's robust capabilities for stateful computations, fault tolerance, and high throughput. Version 2.2.0 is compatible with Flink 1.18+ and requires Python 3.9+. New releases generally align with major Flink core updates and maintain a consistent release cadence.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a simple PyFlink Table API job. It creates a streaming table environment, defines an in-memory source, performs a group-by aggregation, and prints the results to the console. This showcases basic setup, data ingestion, transformation, and execution for local development.

import os
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.types import DataTypes

def quickstart_pyflink_table_api():
    # Set up the Table Environment in streaming mode
    settings = EnvironmentSettings.in_streaming_mode()
    t_env = TableEnvironment.create(settings)

    # Optional: Set parallelism for local execution
    t_env.get_config().set("parallelism.default", "1")

    # Define some in-memory data
    data = [
        ("Alice", 100),
        ("Bob", 200),
        ("Charlie", 150),
        ("Alice", 50)
    ]

    # Create a temporary view from the collection with an explicit schema
    t_env.create_temporary_view(
        "input_table",
        t_env.from_elements(
            data,
            schema=DataTypes.ROW([
                DataTypes.FIELD("name", DataTypes.STRING()),
                DataTypes.FIELD("score", DataTypes.INT())
            ])
        )
    )

    # Perform a simple aggregation
    result = t_env.from_path("input_table") \
        .group_by("name") \
        .select("name, SUM(score) as total_score") \
        .execute()

    # Collect and print the results (for local execution)
    print("\n--- PyFlink Quickstart Results ---")
    with result.collect() as sink:
        for row in sink:
            print(row)
    print("----------------------------------")

if __name__ == '__main__':
    quickstart_pyflink_table_api()

view raw JSON →