Apache Airflow gRPC Provider

3.9.4 · active · verified Thu Apr 16

The `apache-airflow-providers-grpc` package extends Apache Airflow, enabling interaction with gRPC-based services. It provides operators and hooks to execute gRPC commands and manage connections within Airflow workflows. Currently at version 3.9.4, this provider is actively maintained with frequent releases aligning with the broader Apache Airflow provider release schedule, ensuring compatibility and introducing new features.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic Airflow DAG using `GrpcOperator` to interact with a gRPC service. It assumes a gRPC connection named `grpc_default` is configured in the Airflow UI (Admin -> Connections). The example uses placeholder protobuf stubs (`dummy_pb2_grpc`, `dummy_pb2`) for illustration; in a real application, these would be generated from your `.proto` files. The `GrpcOperator` calls a specified gRPC method (`Ping` on `DummyServiceStub`) with structured data. A `GrpcHook` can also be used directly within a `PythonOperator` for more complex interactions.

from __future__ import annotations

import os
from datetime import datetime

from airflow.models.dag import DAG
from airflow.providers.grpc.operators.grpc import GrpcOperator
from airflow.providers.grpc.hooks.grpc import GrpcHook

# NOTE: In a real scenario, you would generate protobuf stub classes
# For this example, we assume a 'dummy_pb2_grpc' and 'dummy_pb2' exist
# and define a 'DummyService' with a 'Ping' method that takes 'PingRequest'
# and returns 'PingResponse'.
# Replace with your actual generated proto files and service/method names.

try:
    import grpc
    from dummy_pb2_grpc import DummyServiceStub
    from dummy_pb2 import PingRequest, PingResponse
except ImportError:
    # Placeholder for running the quickstart without actual protobufs
    class DummyServiceStub:
        def __init__(self, channel):
            pass
        def Ping(self, request):
            print(f"Mock Ping called with: {request.message}")
            return type('PingResponse', (object,), {'message': 'Pong from mock!'})
    class PingRequest:
        def __init__(self, message=None):
            self.message = message
    PingResponse = type('PingResponse', (object,), {'message': None})
    print("Warning: Protobuf stubs (dummy_pb2_grpc, dummy_pb2) not found. Using mock classes.")


with DAG(
    dag_id='grpc_example_dag',
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False,
    tags=['grpc', 'example'],
) as dag:
    # Define a gRPC connection in Airflow UI (Admin -> Connections)
    # Conn Id: 'grpc_default'
    # Conn Type: 'gRPC Connection'
    # Host: 'localhost'
    # Port: '50051'
    # Auth Type: 'NO_AUTH' for insecure channel, or 'SSL'/'TLS' with 'Credential Pem File'
    # For JWT_GOOGLE or OATH_GOOGLE, set these and ensure 'google-auth' is installed

    # Example of using GrpcOperator
    # This assumes 'grpc_default' connection is configured in Airflow
    call_grpc_service = GrpcOperator(
        task_id='call_grpc_service',
        grpc_conn_id='grpc_default',
        stub_class=DummyServiceStub, # Your generated gRPC stub class
        call_func='Ping', # The method on the stub class to call
        data={'request': PingRequest(message='Hello gRPC from Airflow!')},
        # If the gRPC method expects a specific request object, pass it like this.
        # The 'data' parameter maps to the keyword arguments of the gRPC method.
    )

    # Example of using GrpcHook directly in a PythonOperator (or for custom logic)
    def _interact_with_grpc_hook(**kwargs):
        hook = GrpcHook(grpc_conn_id='grpc_default')
        # For this example, we're using a mock service. In a real scenario,
        # hook.run() would return a gRPC response or yield streaming responses.
        response = hook.run(stub_class=DummyServiceStub, call_func='Ping', data={'request': PingRequest(message='Hello from Hook!')})
        # For unary calls, response is typically the deserialized message
        # For streaming, it's an iterator.
        print(f"gRPC Hook Response: {response.message}")

    interact_with_hook = GrpcOperator(
        task_id='interact_with_grpc_hook',
        grpc_conn_id='grpc_default',
        stub_class=DummyServiceStub, 
        call_func='Ping',
        data={'request': PingRequest(message='Hello from Hook via Operator!')}
    )

view raw JSON →