gRPC Channelz

1.80.0 · active · verified Fri Apr 17

grpcio-channelz provides the implementation for gRPC's Channelz service, offering live debugging and introspection capabilities for gRPC channels, subchannels, and calls. It extends the core grpcio library functionality. The current version, 1.80.0, typically aligns with the grpcio release cycle, providing continuous updates and bug fixes.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to enable the Channelz service on a gRPC server and then query it from a client. On the server-side, simply importing `grpc.experimental.channelz` is usually sufficient to register the service. The client then uses generated stubs and message definitions from `grpcio-channelz` to make RPC calls to the Channelz service.

import grpc
import grpc.experimental.channelz
from concurrent import futures

# For client-side querying
from grpcio_channelz.proto.channelz_pb2 import GetTopChannelsRequest
from grpcio_channelz.proto.channelz_pb2_grpc import ChannelzStub

# --- Server-side: Enable Channelz (simple import is often enough) ---
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    # Simply importing grpc.experimental.channelz registers the service.
    # You could explicitly add it if needed, but for basic setup, import is enough.
    # grpc.experimental.channelz.add_server_channelz_stub(server) # Not strictly necessary if imported
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Server started on port 50051. Channelz enabled.")
    return server

# --- Client-side: Query Channelz ---
def query_channelz(server_address='localhost:50051'):
    with grpc.insecure_channel(server_address) as channel:
        stub = ChannelzStub(channel)
        try:
            # Query for top channels
            response = stub.GetTopChannels(GetTopChannelsRequest(started_channel_id=0, max_results=10))
            print("\n--- Channelz Top Channels ---")
            for channel_info in response.channel: # Use .channel, not .top_channel
                print(f"ID: {channel_info.ref.channel_id}, State: {channel_info.data.state.state}, Target: {channel_info.data.target}")
        except grpc.RpcError as e:
            print(f"Error querying Channelz: {e.code().name} - {e.details()}")

if __name__ == '__main__':
    server = serve()
    # Allow some time for server to start and channels to register
    import time
    time.sleep(2)

    # Make a dummy call to ensure a channel is created and visible to channelz
    with grpc.insecure_channel('localhost:50051') as client_channel:
        try:
            # Just connect, no actual rpc needed to create a channel
            grpc.channel_ready_future(client_channel).wait(timeout=1)
            print("Dummy client connection made to register a channel.")
        except grpc.FutureTimeoutError:
            print("Dummy client connection timed out, but channel might still be visible.")

    query_channelz()

    server.stop(0) # Stop server gracefully

view raw JSON →