Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Bentoml BentoML GRPC OpenTelemetry Interceptor

From Leeroopedia
Knowledge Sources
Domains gRPC, OpenTelemetry, Observability, Tracing
Last Updated 2026-02-13 15:00 GMT

Overview

Implements an async gRPC server interceptor for OpenTelemetry distributed tracing, providing span creation, context propagation, and a wrapped servicer context that records trace attributes.

Description

This module provides two main classes. _OpenTelemetryServicerContext is a wrapper around aio.ServicerContext that intercepts set_code, set_details, and abort calls to automatically record gRPC status codes and error details on the active OpenTelemetry span. It delegates all other methods to the underlying servicer context.

AsyncOpenTelemetryServerInterceptor extends aio.ServerInterceptor and is the main interceptor class. It creates OpenTelemetry spans for each unary-unary RPC call with comprehensive attributes: RPC system (grpc), status code, method name, service name, user-agent, content-type, and network peer information (IP, port, localhost detection). The interceptor extracts distributed tracing context from incoming gRPC metadata via opentelemetry.propagate.extract and attaches it to the current context. The tracer is configured via dependency injection from BentoMLContainer.tracer_provider. Streaming RPCs are not currently supported and are passed through without instrumentation.

Usage

Use this interceptor to add OpenTelemetry distributed tracing to BentoML gRPC services. It should be the first interceptor in the chain so that other interceptors benefit from the trace context.

Code Reference

Source Location

Signature

class _OpenTelemetryServicerContext(aio.ServicerContext["Request", "Response"]):
    def __init__(self, servicer_context: BentoServicerContext, active_span: Span): ...
    async def abort(self, code: grpc.StatusCode, details: str = "", trailing_metadata: MetadataType = tuple()) -> None: ...
    def set_code(self, code: grpc.StatusCode) -> None: ...
    def code(self) -> grpc.StatusCode: ...
    def set_details(self, details: str) -> None: ...
    def details(self) -> str: ...
    # ... delegates remaining methods to _servicer_context

class AsyncOpenTelemetryServerInterceptor(aio.ServerInterceptor):
    @inject
    def __init__(
        self,
        *,
        tracer_provider: TracerProvider = Provide[BentoMLContainer.tracer_provider],
        schema_url: str | None = None,
    ): ...

    @asynccontextmanager
    async def set_remote_context(self, servicer_context: BentoServicerContext) -> t.AsyncGenerator[None, None]: ...

    def start_span(
        self,
        method_name: str,
        context: BentoServicerContext,
        set_status_on_exception: bool = False,
    ) -> t.ContextManager[Span]: ...

    async def intercept_service(
        self,
        continuation: t.Callable[[HandlerCallDetails], t.Awaitable[RpcMethodHandler]],
        handler_call_details: HandlerCallDetails,
    ) -> RpcMethodHandler: ...

Import

from bentoml.grpc.interceptors.opentelemetry import AsyncOpenTelemetryServerInterceptor

I/O Contract

Inputs

Name Type Required Description
tracer_provider TracerProvider No OpenTelemetry TracerProvider; injected from BentoMLContainer by default
schema_url str or None No Optional OpenTelemetry schema URL
continuation Callable Yes (intercept_service) Callback to get the next RPC handler
handler_call_details HandlerCallDetails Yes (intercept_service) Details about the incoming RPC

Outputs

Name Type Description
RpcMethodHandler RpcMethodHandler Wrapped handler with tracing instrumentation
(side effect) OpenTelemetry Span Span created for each RPC with method, service, peer, and status attributes

Usage Examples

from bentoml.grpc.interceptors.opentelemetry import AsyncOpenTelemetryServerInterceptor
from grpc import aio

# Create server with OpenTelemetry interceptor
interceptor = AsyncOpenTelemetryServerInterceptor()
server = aio.server(interceptors=[interceptor])

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment