Implementation:Bentoml BentoML GRPC OpenTelemetry Interceptor
| Knowledge Sources | |
|---|---|
| Domains | gRPC, OpenTelemetry, Observability, Tracing |
| Last Updated | 2026-02-13 15:00 GMT |
Overview
Implements an async gRPC server interceptor for OpenTelemetry distributed tracing, providing span creation, context propagation, and a wrapped servicer context that records trace attributes.
Description
This module provides two main classes. _OpenTelemetryServicerContext is a wrapper around aio.ServicerContext that intercepts set_code, set_details, and abort calls to automatically record gRPC status codes and error details on the active OpenTelemetry span. It delegates all other methods to the underlying servicer context.
AsyncOpenTelemetryServerInterceptor extends aio.ServerInterceptor and is the main interceptor class. It creates OpenTelemetry spans for each unary-unary RPC call with comprehensive attributes: RPC system (grpc), status code, method name, service name, user-agent, content-type, and network peer information (IP, port, localhost detection). The interceptor extracts distributed tracing context from incoming gRPC metadata via opentelemetry.propagate.extract and attaches it to the current context. The tracer is configured via dependency injection from BentoMLContainer.tracer_provider. Streaming RPCs are not currently supported and are passed through without instrumentation.
Usage
Use this interceptor to add OpenTelemetry distributed tracing to BentoML gRPC services. It should be the first interceptor in the chain so that other interceptors benefit from the trace context.
Code Reference
Source Location
- Repository: Bentoml_BentoML
- File: src/bentoml/grpc/interceptors/opentelemetry.py
- Lines: 1-276
Signature
class _OpenTelemetryServicerContext(aio.ServicerContext["Request", "Response"]):
def __init__(self, servicer_context: BentoServicerContext, active_span: Span): ...
async def abort(self, code: grpc.StatusCode, details: str = "", trailing_metadata: MetadataType = tuple()) -> None: ...
def set_code(self, code: grpc.StatusCode) -> None: ...
def code(self) -> grpc.StatusCode: ...
def set_details(self, details: str) -> None: ...
def details(self) -> str: ...
# ... delegates remaining methods to _servicer_context
class AsyncOpenTelemetryServerInterceptor(aio.ServerInterceptor):
@inject
def __init__(
self,
*,
tracer_provider: TracerProvider = Provide[BentoMLContainer.tracer_provider],
schema_url: str | None = None,
): ...
@asynccontextmanager
async def set_remote_context(self, servicer_context: BentoServicerContext) -> t.AsyncGenerator[None, None]: ...
def start_span(
self,
method_name: str,
context: BentoServicerContext,
set_status_on_exception: bool = False,
) -> t.ContextManager[Span]: ...
async def intercept_service(
self,
continuation: t.Callable[[HandlerCallDetails], t.Awaitable[RpcMethodHandler]],
handler_call_details: HandlerCallDetails,
) -> RpcMethodHandler: ...
Import
from bentoml.grpc.interceptors.opentelemetry import AsyncOpenTelemetryServerInterceptor
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| tracer_provider | TracerProvider | No | OpenTelemetry TracerProvider; injected from BentoMLContainer by default |
| schema_url | str or None | No | Optional OpenTelemetry schema URL |
| continuation | Callable | Yes (intercept_service) | Callback to get the next RPC handler |
| handler_call_details | HandlerCallDetails | Yes (intercept_service) | Details about the incoming RPC |
Outputs
| Name | Type | Description |
|---|---|---|
| RpcMethodHandler | RpcMethodHandler | Wrapped handler with tracing instrumentation |
| (side effect) | OpenTelemetry Span | Span created for each RPC with method, service, peer, and status attributes |
Usage Examples
from bentoml.grpc.interceptors.opentelemetry import AsyncOpenTelemetryServerInterceptor
from grpc import aio
# Create server with OpenTelemetry interceptor
interceptor = AsyncOpenTelemetryServerInterceptor()
server = aio.server(interceptors=[interceptor])