Apache Beam SDK for Python

2.71.0 · active · verified Sat Mar 28

Apache Beam is an open-source, unified programming model for defining and executing data processing pipelines for both batch and streaming data. It offers language-specific SDKs, including Python, to construct pipelines that can run on various distributed processing backends such as Apache Flink, Apache Spark, and Google Cloud Dataflow. The library maintains an active development pace with minor releases approximately every 6 weeks, and its current version is 2.71.0.

Warnings

Install

Imports

Quickstart

This classic WordCount example demonstrates basic Apache Beam concepts: reading data from a source (local file or GCS), applying transformations like splitting, mapping, and combining, and writing the results to an output file. Run it locally using the DirectRunner.

import re
import argparse
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

def main(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        default='output.txt',
        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    with beam.Pipeline(options=pipeline_options) as p:
        # Read the text file into a PCollection
        lines = p | ReadFromText(known_args.input)

        # Count the occurrences of each word
        counts = (
            lines
            | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)
        )

        # Format the counts into strings
        output = counts | 'Format' >> beam.Map(
            lambda word_count: '%s: %s' % (word_count[0], word_count[1]))

        # Write the output
        output | WriteToText(known_args.output)

if __name__ == '__main__':
    print("Running Beam WordCount pipeline locally...")
    main()
    print("Pipeline finished. Check 'output.txt' for results.")

view raw JSON →