Implementation:Bentoml BentoML Runner Utils
| Knowledge Sources | |
|---|---|
| Domains | Runner, Utilities, Batching |
| Last Updated | 2026-02-13 15:00 GMT |
Overview
Provides the Params container class and utility functions for managing runner parameters, payload batching, and async iteration in BentoML's runner infrastructure.
Description
This module contains the Params generic class, which is a container for positional and keyword arguments that allows performing operations across all parameter values simultaneously. Key operations include map (apply a function to all values), map_enumerate (apply with an index iterable), iter (iterate over a Params of iterables), agg (aggregate a sequence of Params), and sample (get the first value). The module also provides payload_paramss_to_batch_params which converts a sequence of Params[Payload] into batched parameters using AutoContainer, along with the PAYLOAD_META_HEADER constant. Additionally, iterate_in_threadpool provides an async wrapper around synchronous iterators using anyio, and a helper _StopIteration exception is used to safely propagate stop iteration across thread boundaries.
Usage
Use the Params class when you need to manipulate sets of arguments uniformly, especially in BentoML's batching and runner dispatch logic. Use payload_paramss_to_batch_params for combining multiple request payloads into a single batched inference call. Use iterate_in_threadpool to convert synchronous generator outputs into async iterables.
Code Reference
Source Location
- Repository: Bentoml_BentoML
- File: src/bentoml/_internal/runner/utils.py
- Lines: 1-179
Signature
class Params(t.Generic[T]):
args: tuple[T, ...]
kwargs: dict[str, T]
def __init__(self, *args: T, **kwargs: T): ...
def items(self) -> t.Iterator[t.Tuple[t.Union[int, str], T]]: ...
@classmethod
def from_dict(cls, data: dict[str | int, T]) -> Params[T]: ...
def all_equal(self) -> bool: ...
def map(self, function: t.Callable[[T], To]) -> Params[To]: ...
def map_enumerate(self, function: t.Callable[[T, Ti], To], iterable: t.Iterable[Ti]) -> Params[To]: ...
def iter(self: Params[t.Iterable[Ti]]) -> t.Iterator[Params[Ti]]: ...
@classmethod
def agg(cls, params_list: t.Sequence[Params[Ti]], agg_func: t.Callable[...] = pass_through) -> Params[To]: ...
@property
def sample(self) -> T: ...
def payload_paramss_to_batch_params(
paramss: t.Sequence[Params[Payload]],
batch_dim: int,
) -> tuple[Params[t.Any], list[int]]: ...
async def iterate_in_threadpool(iterator: t.Iterator[T]) -> t.AsyncIterator[T]: ...
Import
from bentoml._internal.runner.utils import Params, payload_paramss_to_batch_params, iterate_in_threadpool
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| *args | T | No | Positional arguments stored in Params.args |
| **kwargs | T | No | Keyword arguments stored in Params.kwargs |
| paramss | Sequence[Params[Payload]] | Yes (payload_paramss_to_batch_params) | Sequence of Params containing Payload objects to batch |
| batch_dim | int | Yes (payload_paramss_to_batch_params) | The dimension along which to batch inputs |
| iterator | Iterator[T] | Yes (iterate_in_threadpool) | A synchronous iterator to wrap as async |
Outputs
| Name | Type | Description |
|---|---|---|
| Params[To] | Params[To] | Transformed Params container after map/agg operations |
| (batched_params, indices) | tuple[Params[Any], list[int]] | Batched parameters and index list for splitting results back |
| AsyncIterator[T] | AsyncIterator[T] | Async iterator wrapping the synchronous iterator |
Usage Examples
from bentoml._internal.runner.utils import Params
# Create Params and apply a transformation
params = Params(1, 2, 3, x=4, y=5)
doubled = params.map(lambda v: v * 2)
# doubled.args == (2, 4, 6), doubled.kwargs == {"x": 8, "y": 10}
# Aggregate multiple Params
p1 = Params(1, 2)
p2 = Params(3, 4)
agg = Params.agg([p1, p2], agg_func=lambda seq: sum(seq))
# agg.args == (4, 6)
# Get sample value
sample = params.sample # returns 1