Apache Airflow gRPC Provider

raw JSON →
3.9.4 verified Thu Apr 16 auth: no python

The `apache-airflow-providers-grpc` package extends Apache Airflow, enabling interaction with gRPC-based services. It provides operators and hooks to execute gRPC commands and manage connections within Airflow workflows. Currently at version 3.9.4, this provider is actively maintained with frequent releases aligning with the broader Apache Airflow provider release schedule, ensuring compatibility and introducing new features.

pip install apache-airflow-providers-grpc
error TypeError: 'dict' object is not callable or expecting protobuf message
cause Attempting to pass a raw Python dictionary directly to a gRPC method via `GrpcOperator` or `GrpcHook` when the method expects a protobuf message. The `data` parameter needs the actual protobuf object.
fix
Construct the correct protobuf request object (e.g., YourService_pb2.YourRequest(field='value')) and pass it within the data dictionary, typically under a 'request' key: data={'request': YourService_pb2.YourRequest(...) }.
error grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Connect Failed" debug_error_string = "..."
cause Airflow's `GrpcHook` or `GrpcOperator` failed to establish a connection with the gRPC server. This could be due to an incorrect host/port, the server not running, network issues, or authentication failures.
fix
Verify the gRPC connection details in Airflow UI (Admin -> Connections -> grpc_default or your custom connection ID). Ensure the 'Host' and 'Port' are correct, the gRPC server is running and accessible from the Airflow worker, and the 'Auth Type' and associated credentials (e.g., 'Credential Pem File' or 'Scopes') are correctly configured and match the server's requirements.
breaking Provider version 2.0.0+ requires Apache Airflow 2.1.0+ due to the removal of the `apply_default` decorator. Older Airflow versions will cause automatic upgrades or database migration issues.
fix Upgrade your Apache Airflow instance to at least version 2.1.0 before installing or upgrading to `apache-airflow-providers-grpc` version 2.0.0 or higher. For the latest provider version (3.9.0+), Airflow >=2.11.0 is required.
breaking Minimum supported Airflow version has incrementally increased. Provider version 3.0.0+ requires Airflow 2.2+, version 3.1.0+ requires Airflow 2.3+, and version 3.9.0+ requires Airflow 2.11+.
fix Always check the provider's changelog or documentation for the `minimum_airflow_version` compatible with the specific provider version you intend to use. Ensure your Airflow environment meets this requirement.
breaking Python 3.9 support was dropped in provider version 3.8.1.
fix Ensure your Python environment is version 3.10 or higher when using `apache-airflow-providers-grpc` versions 3.8.1 or newer, as indicated by the `requires_python: >=3.10` metadata on PyPI.
gotcha The `data` parameter in `GrpcOperator` expects a dictionary where keys map to keyword arguments of the gRPC method, and values are the corresponding protobuf request objects. Passing incorrect types or structures will lead to serialization errors.
fix Always pass the gRPC method's expected request object within the `data` dictionary, e.g., `data={'request': YourProtoRequestObject(field='value')}`. Do not pass raw dictionaries directly if the gRPC method expects a protobuf message.

This quickstart demonstrates a basic Airflow DAG using `GrpcOperator` to interact with a gRPC service. It assumes a gRPC connection named `grpc_default` is configured in the Airflow UI (Admin -> Connections). The example uses placeholder protobuf stubs (`dummy_pb2_grpc`, `dummy_pb2`) for illustration; in a real application, these would be generated from your `.proto` files. The `GrpcOperator` calls a specified gRPC method (`Ping` on `DummyServiceStub`) with structured data. A `GrpcHook` can also be used directly within a `PythonOperator` for more complex interactions.

from __future__ import annotations

import os
from datetime import datetime

from airflow.models.dag import DAG
from airflow.providers.grpc.operators.grpc import GrpcOperator
from airflow.providers.grpc.hooks.grpc import GrpcHook

# NOTE: In a real scenario, you would generate protobuf stub classes
# For this example, we assume a 'dummy_pb2_grpc' and 'dummy_pb2' exist
# and define a 'DummyService' with a 'Ping' method that takes 'PingRequest'
# and returns 'PingResponse'.
# Replace with your actual generated proto files and service/method names.

try:
    import grpc
    from dummy_pb2_grpc import DummyServiceStub
    from dummy_pb2 import PingRequest, PingResponse
except ImportError:
    # Placeholder for running the quickstart without actual protobufs
    class DummyServiceStub:
        def __init__(self, channel):
            pass
        def Ping(self, request):
            print(f"Mock Ping called with: {request.message}")
            return type('PingResponse', (object,), {'message': 'Pong from mock!'})
    class PingRequest:
        def __init__(self, message=None):
            self.message = message
    PingResponse = type('PingResponse', (object,), {'message': None})
    print("Warning: Protobuf stubs (dummy_pb2_grpc, dummy_pb2) not found. Using mock classes.")


with DAG(
    dag_id='grpc_example_dag',
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False,
    tags=['grpc', 'example'],
) as dag:
    # Define a gRPC connection in Airflow UI (Admin -> Connections)
    # Conn Id: 'grpc_default'
    # Conn Type: 'gRPC Connection'
    # Host: 'localhost'
    # Port: '50051'
    # Auth Type: 'NO_AUTH' for insecure channel, or 'SSL'/'TLS' with 'Credential Pem File'
    # For JWT_GOOGLE or OATH_GOOGLE, set these and ensure 'google-auth' is installed

    # Example of using GrpcOperator
    # This assumes 'grpc_default' connection is configured in Airflow
    call_grpc_service = GrpcOperator(
        task_id='call_grpc_service',
        grpc_conn_id='grpc_default',
        stub_class=DummyServiceStub, # Your generated gRPC stub class
        call_func='Ping', # The method on the stub class to call
        data={'request': PingRequest(message='Hello gRPC from Airflow!')},
        # If the gRPC method expects a specific request object, pass it like this.
        # The 'data' parameter maps to the keyword arguments of the gRPC method.
    )

    # Example of using GrpcHook directly in a PythonOperator (or for custom logic)
    def _interact_with_grpc_hook(**kwargs):
        hook = GrpcHook(grpc_conn_id='grpc_default')
        # For this example, we're using a mock service. In a real scenario,
        # hook.run() would return a gRPC response or yield streaming responses.
        response = hook.run(stub_class=DummyServiceStub, call_func='Ping', data={'request': PingRequest(message='Hello from Hook!')})
        # For unary calls, response is typically the deserialized message
        # For streaming, it's an iterator.
        print(f"gRPC Hook Response: {response.message}")

    interact_with_hook = GrpcOperator(
        task_id='interact_with_grpc_hook',
        grpc_conn_id='grpc_default',
        stub_class=DummyServiceStub, 
        call_func='Ping',
        data={'request': PingRequest(message='Hello from Hook via Operator!')}
    )