Apache Airflow gRPC Provider
The `apache-airflow-providers-grpc` package extends Apache Airflow, enabling interaction with gRPC-based services. It provides operators and hooks to execute gRPC commands and manage connections within Airflow workflows. Currently at version 3.9.4, this provider is actively maintained with frequent releases aligning with the broader Apache Airflow provider release schedule, ensuring compatibility and introducing new features.
Common errors
-
TypeError: 'dict' object is not callable or expecting protobuf message
cause Attempting to pass a raw Python dictionary directly to a gRPC method via `GrpcOperator` or `GrpcHook` when the method expects a protobuf message. The `data` parameter needs the actual protobuf object.fixConstruct the correct protobuf request object (e.g., `YourService_pb2.YourRequest(field='value')`) and pass it within the `data` dictionary, typically under a 'request' key: `data={'request': YourService_pb2.YourRequest(...) }`. -
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Connect Failed" debug_error_string = "..."
cause Airflow's `GrpcHook` or `GrpcOperator` failed to establish a connection with the gRPC server. This could be due to an incorrect host/port, the server not running, network issues, or authentication failures.fixVerify the gRPC connection details in Airflow UI (Admin -> Connections -> `grpc_default` or your custom connection ID). Ensure the 'Host' and 'Port' are correct, the gRPC server is running and accessible from the Airflow worker, and the 'Auth Type' and associated credentials (e.g., 'Credential Pem File' or 'Scopes') are correctly configured and match the server's requirements.
Warnings
- breaking Provider version 2.0.0+ requires Apache Airflow 2.1.0+ due to the removal of the `apply_default` decorator. Older Airflow versions will cause automatic upgrades or database migration issues.
- breaking Minimum supported Airflow version has incrementally increased. Provider version 3.0.0+ requires Airflow 2.2+, version 3.1.0+ requires Airflow 2.3+, and version 3.9.0+ requires Airflow 2.11+.
- breaking Python 3.9 support was dropped in provider version 3.8.1.
- gotcha The `data` parameter in `GrpcOperator` expects a dictionary where keys map to keyword arguments of the gRPC method, and values are the corresponding protobuf request objects. Passing incorrect types or structures will lead to serialization errors.
Install
-
pip install apache-airflow-providers-grpc
Imports
- GrpcOperator
from airflow.providers.grpc.operators.grpc import GrpcOperator
- GrpcHook
from airflow.contrib.hooks.grpc_hook import GrpcHook
from airflow.providers.grpc.hooks.grpc import GrpcHook
Quickstart
from __future__ import annotations
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.grpc.operators.grpc import GrpcOperator
from airflow.providers.grpc.hooks.grpc import GrpcHook
# NOTE: In a real scenario, you would generate protobuf stub classes
# For this example, we assume a 'dummy_pb2_grpc' and 'dummy_pb2' exist
# and define a 'DummyService' with a 'Ping' method that takes 'PingRequest'
# and returns 'PingResponse'.
# Replace with your actual generated proto files and service/method names.
try:
import grpc
from dummy_pb2_grpc import DummyServiceStub
from dummy_pb2 import PingRequest, PingResponse
except ImportError:
# Placeholder for running the quickstart without actual protobufs
class DummyServiceStub:
def __init__(self, channel):
pass
def Ping(self, request):
print(f"Mock Ping called with: {request.message}")
return type('PingResponse', (object,), {'message': 'Pong from mock!'})
class PingRequest:
def __init__(self, message=None):
self.message = message
PingResponse = type('PingResponse', (object,), {'message': None})
print("Warning: Protobuf stubs (dummy_pb2_grpc, dummy_pb2) not found. Using mock classes.")
with DAG(
dag_id='grpc_example_dag',
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=['grpc', 'example'],
) as dag:
# Define a gRPC connection in Airflow UI (Admin -> Connections)
# Conn Id: 'grpc_default'
# Conn Type: 'gRPC Connection'
# Host: 'localhost'
# Port: '50051'
# Auth Type: 'NO_AUTH' for insecure channel, or 'SSL'/'TLS' with 'Credential Pem File'
# For JWT_GOOGLE or OATH_GOOGLE, set these and ensure 'google-auth' is installed
# Example of using GrpcOperator
# This assumes 'grpc_default' connection is configured in Airflow
call_grpc_service = GrpcOperator(
task_id='call_grpc_service',
grpc_conn_id='grpc_default',
stub_class=DummyServiceStub, # Your generated gRPC stub class
call_func='Ping', # The method on the stub class to call
data={'request': PingRequest(message='Hello gRPC from Airflow!')},
# If the gRPC method expects a specific request object, pass it like this.
# The 'data' parameter maps to the keyword arguments of the gRPC method.
)
# Example of using GrpcHook directly in a PythonOperator (or for custom logic)
def _interact_with_grpc_hook(**kwargs):
hook = GrpcHook(grpc_conn_id='grpc_default')
# For this example, we're using a mock service. In a real scenario,
# hook.run() would return a gRPC response or yield streaming responses.
response = hook.run(stub_class=DummyServiceStub, call_func='Ping', data={'request': PingRequest(message='Hello from Hook!')})
# For unary calls, response is typically the deserialized message
# For streaming, it's an iterator.
print(f"gRPC Hook Response: {response.message}")
interact_with_hook = GrpcOperator(
task_id='interact_with_grpc_hook',
grpc_conn_id='grpc_default',
stub_class=DummyServiceStub,
call_func='Ping',
data={'request': PingRequest(message='Hello from Hook via Operator!')}
)