Implementation:Bentoml BentoML GrpcClient
| Knowledge Sources | |
|---|---|
| Domains | Client, gRPC, Networking |
| Last Updated | 2026-02-13 15:00 GMT |
Overview
Implements gRPC client classes (GrpcClient, AsyncGrpcClient, SyncGrpcClient) for communicating with BentoML services over the gRPC protocol, supporting SSL/TLS, health checks, service metadata discovery, and automatic API endpoint binding.
Description
This module provides concrete gRPC transport implementations of the abstract client base classes defined in bentoml._internal.client. It includes three client classes:
GrpcClient: A wrapper (deprecated) that holds both sync and async gRPC clients and delegates to them. Extends the deprecatedClientbase class.
AsyncGrpcClient: The full-featured async gRPC client. Key capabilities:- Channel management: Creates either secure (
aio.secure_channel) or insecure (aio.insecure_channel) gRPC channels. Supports custom channel options, interceptors, and compression settings. - SSL/TLS support: Accepts
ClientCredentials(root certificates, private key, certificate chain) for secure connections. Credentials can be provided as file paths or raw bytes. - Health checking:
wait_until_server_ready()performs gRPC health checks against the/grpc.health.v1.Health/Checkendpoint, polling with configurable timeout and interval. - Service metadata discovery:
from_url()connects to the server'sServiceMetadataRPC to retrieve API definitions, then constructsInferenceAPIobjects from the returned IO descriptor specs. This enables dynamic client method creation without client-side service definitions. - RPC method caching: Uses
@cached_propertyfor_rpc_metadataand_rpc_methods, lazily building unary-unary RPC stubs with proper serializers/deserializers. - API invocation:
_call()serializes input via the API's IO descriptorto_proto()method, sends the request, and deserializes the response viafrom_proto(). - Protocol version handling: Supports multiple gRPC protocol versions. Raises an informative error with example code if protocol version is older than v1.
- Channel management: Creates either secure (
SyncGrpcClient: MirrorsAsyncGrpcClientwith synchronous blocking semantics. Usesgrpc.Channelinstead ofaio.Channel. Usesasyncio.run()to bridge async IO descriptor serialization in synchronous context.
Both AsyncGrpcClient and SyncGrpcClient implement:
health(service_name, timeout): Performs a gRPC health check on a named service._split_channel_args(**kwargs): Separates gRPC channel kwargs (timeout, metadata, credentials, wait_for_ready, compression) from other kwargs.close(): Closes the underlying gRPC channel.
Usage
gRPC clients are typically created via AsyncGrpcClient.from_url() or SyncGrpcClient.from_url(). They are selected automatically when SyncClient.from_url() or AsyncClient.from_url() encounters a gRPC server (detected via BadStatusLine from HTTP attempt or explicit kind="grpc"). Requires the bentoml[grpc] extra to be installed.
Code Reference
Source Location
- Repository: Bentoml_BentoML
- File: src/bentoml/_internal/client/grpc.py
- Lines: 1-776
Signature
class GrpcClient(Client):
def __init__(self, svc: Service, server_url: str): ...
class AsyncGrpcClient(AsyncClient):
def __init__(
self,
server_url: str,
svc: Service,
ssl: bool = False,
channel_options: aio.ChannelArgumentType | None = None,
interceptors: t.Sequence[aio.ClientInterceptor] | None = None,
compression: grpc.Compression | None = None,
ssl_client_credentials: ClientCredentials | None = None,
*,
protocol_version: str = LATEST_PROTOCOL_VERSION,
**kwargs: t.Any,
): ...
@classmethod
async def from_url(cls, server_url: str, **kwargs: t.Any) -> AsyncGrpcClient: ...
async def _call(self, inp: t.Any = None, *, _bentoml_api: InferenceAPI[t.Any], **attrs: t.Any) -> t.Any: ...
async def health(self, service_name: str, *, timeout: int = 30) -> t.Any: ...
@staticmethod
async def wait_until_server_ready(host: str, port: int, timeout: float = 30, check_interval: int = 1, **kwargs: t.Any) -> None: ...
class SyncGrpcClient(SyncClient):
def __init__(self, server_url: str, svc: Service, ssl: bool = False, ...): ...
@classmethod
def from_url(cls, server_url: str, **kwargs: t.Any) -> SyncGrpcClient: ...
def _call(self, inp: t.Any = None, *, _bentoml_api: InferenceAPI[t.Any], **attrs: t.Any): ...
def health(self, service_name: str, *, timeout: int = 30) -> t.Any: ...
Import
from bentoml._internal.client.grpc import GrpcClient, AsyncGrpcClient, SyncGrpcClient
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| server_url | str | Yes | gRPC server address (e.g., "localhost:3000") |
| svc | Service | Yes | BentoML Service with API definitions (auto-discovered from server metadata) |
| ssl | bool | No (default: False) | Whether to use SSL/TLS for the connection |
| ssl_client_credentials | ClientCredentials | No | TLS credentials (root_certificates, private_key, certificate_chain) |
| channel_options | ChannelArgumentType | No | gRPC channel configuration options |
| interceptors | Sequence[ClientInterceptor] | No | gRPC client interceptors |
| compression | grpc.Compression | No | gRPC compression algorithm |
| protocol_version | str | No (default: LATEST) | BentoML gRPC protocol version string |
Outputs
| Name | Type | Description |
|---|---|---|
| result | Any | Deserialized output from the gRPC service API call (via IO descriptor from_proto) |
| health response | HealthCheckResponse | gRPC health check status |
| client instance | AsyncGrpcClient / SyncGrpcClient | Connected gRPC client with bound API methods |
Usage Examples
from bentoml._internal.client.grpc import AsyncGrpcClient, SyncGrpcClient
# Async gRPC client
client = await AsyncGrpcClient.from_url("localhost:3000")
result = await client.predict(input_data)
await client.close()
# Sync gRPC client
client = SyncGrpcClient.from_url("localhost:3000")
result = client.predict(input_data)
client.close()
# With SSL
client = await AsyncGrpcClient.from_url(
"localhost:3000",
ssl=True,
ssl_client_credentials={
"root_certificates": "/path/to/ca.pem",
"private_key": "/path/to/key.pem",
"certificate_chain": "/path/to/cert.pem",
},
)
# Health check
await AsyncGrpcClient.wait_until_server_ready("localhost", 3000, timeout=60)