Implementation:Bentoml BentoML Async Dependency Execution
Overview
Async Dependency Execution is a pattern for running parallel dependency calls in BentoML multi-service compositions. It uses the .to_async property on dependency proxies combined with asyncio.gather() to execute independent model calls concurrently, reducing overall pipeline latency.
This is a Pattern Doc documenting the usage of .to_async and asyncio.gather() for parallel dependency execution.
Interface Specification
Source Location
src/_bentoml_sdk/service/dependency.py:L48-102
The .to_async Property
The Dependency class provides a __get__ descriptor that returns a proxy object. This proxy has a .to_async property that converts synchronous method calls into awaitable coroutines.
# The Dependency descriptor returns a proxy with .to_async
class Dependency(t.Generic[T]):
def __get__(self, obj: Any, owner: type) -> T:
# Returns a proxy to the resolved service
# The proxy supports:
# proxy.method(args) -> synchronous call
# proxy.to_async.method(args) -> returns a coroutine
...
When .to_async is used, the returned proxy wraps each method call as an async coroutine that can be awaited or passed to asyncio.gather().
Example Implementations
Sequential Pattern
In a sequential pipeline, each step depends on the previous step's output. Async is optional here but maintains consistency with the async serving infrastructure.
import bentoml
@bentoml.service
class SequentialPipeline:
preprocessor = bentoml.depends(Preprocessor)
model = bentoml.depends(InferenceModel)
postprocessor = bentoml.depends(Postprocessor)
@bentoml.api
async def predict(self, raw_input: str) -> dict:
# Sequential: each step waits for the previous
features = await self.preprocessor.to_async.extract(raw_input)
prediction = await self.model.to_async.predict(features)
result = await self.postprocessor.to_async.format(prediction)
return result
Parallel Pattern
When multiple models are independent, use asyncio.gather() to execute them concurrently.
import asyncio
import bentoml
@bentoml.service
class ParallelEnsemble:
model_a = bentoml.depends(ModelA)
model_b = bentoml.depends(ModelB)
model_c = bentoml.depends(ModelC)
@bentoml.api
async def predict(self, data: list[float]) -> dict:
# Parallel: all three models run concurrently
result_a, result_b, result_c = await asyncio.gather(
self.model_a.to_async.predict(data),
self.model_b.to_async.predict(data),
self.model_c.to_async.predict(data),
)
# Aggregate results
return {
"model_a": result_a,
"model_b": result_b,
"model_c": result_c,
"ensemble": (result_a + result_b + result_c) / 3,
}
Mixed Pattern (Sequential + Parallel)
Real-world pipelines often combine sequential and parallel stages.
import asyncio
import bentoml
@bentoml.service
class MixedPipeline:
preprocessor = bentoml.depends(Preprocessor)
text_model = bentoml.depends(TextModel)
image_model = bentoml.depends(ImageModel)
ranker = bentoml.depends(Ranker)
@bentoml.api
async def predict(self, text: str, image_url: str) -> dict:
# Step 1: Sequential preprocessing
text_features = await self.preprocessor.to_async.process_text(text)
image_features = await self.preprocessor.to_async.process_image(image_url)
# Step 2: Parallel model inference
text_pred, image_pred = await asyncio.gather(
self.text_model.to_async.predict(text_features),
self.image_model.to_async.predict(image_features),
)
# Step 3: Sequential ranking
result = await self.ranker.to_async.rank(text_pred, image_pred)
return result
Error Handling with asyncio.gather()
import asyncio
import bentoml
@bentoml.service
class ResilientEnsemble:
model_a = bentoml.depends(ModelA)
model_b = bentoml.depends(ModelB)
@bentoml.api
async def predict(self, data: list[float]) -> dict:
# return_exceptions=True collects exceptions instead of raising
results = await asyncio.gather(
self.model_a.to_async.predict(data),
self.model_b.to_async.predict(data),
return_exceptions=True,
)
valid_results = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append(f"Model {i} failed: {result}")
else:
valid_results.append(result)
if not valid_results:
raise RuntimeError(f"All models failed: {errors}")
return {"result": sum(valid_results) / len(valid_results), "errors": errors}
Comparison of Patterns
| Pattern | Code | Latency | Use Case |
|---|---|---|---|
| Sync sequential | self.svc.method(data) |
Sum of all calls | Simple linear pipelines |
| Async sequential | await self.svc.to_async.method(data) |
Sum of all calls (non-blocking) | Linear pipelines in async context |
| Async parallel | await asyncio.gather(self.a.to_async.m(d), self.b.to_async.m(d)) |
Max of parallel calls | Independent model calls |
Source Files
src/_bentoml_sdk/service/dependency.py:L48-102-- Dependency class with.to_asyncsupport
Relationship to Principle
This pattern implements the Async Execution Patterns principle by providing concrete code patterns for parallel and sequential async dependency execution in BentoML.
Principle:Bentoml_BentoML_Async_Execution_Patterns