Implementation:Bentoml BentoML Service Composition Pattern
Overview
The Service Composition Pattern in BentoML enables building multi-model inference pipelines by composing independently defined services into a dependency graph. Each model gets its own @bentoml.service class with dedicated resources, and an entry service uses bentoml.depends() to wire them together.
This is a Pattern Doc -- there is no single API call. Instead, it documents the overall pattern of composing multiple BentoML services.
Interface Specification
Pattern Structure
The composition pattern follows three steps:
- Define individual model services -- Each model is wrapped in its own
@bentoml.serviceclass with appropriate resource declarations. - Create an entry service -- A top-level service declares dependencies on the individual services using
bentoml.depends(). - Wire the pipeline logic -- The entry service's API methods orchestrate calls across dependent services.
Service Definition Template
import bentoml
import numpy as np
# Step 1: Define individual model services
@bentoml.service(resources={"cpu": "2"})
class Preprocessing:
def __init__(self):
# Load preprocessing artifacts
self.tokenizer = load_tokenizer()
@bentoml.api
def preprocess(self, raw_input: str) -> np.ndarray:
return self.tokenizer.encode(raw_input)
@bentoml.service(resources={"gpu": 1, "memory": "8Gi"})
class InferenceModel:
def __init__(self):
# Load the ML model
self.model = load_model()
@bentoml.api
def predict(self, features: np.ndarray) -> np.ndarray:
return self.model.predict(features)
@bentoml.service(resources={"cpu": "1"})
class Postprocessing:
@bentoml.api
def format_output(self, raw_prediction: np.ndarray) -> dict:
return {"label": int(raw_prediction.argmax()), "confidence": float(raw_prediction.max())}
# Step 2 & 3: Create entry service and wire dependencies
@bentoml.service(resources={"cpu": "1"})
class Pipeline:
preprocessing = bentoml.depends(Preprocessing)
model = bentoml.depends(InferenceModel)
postprocessing = bentoml.depends(Postprocessing)
@bentoml.api
async def predict(self, raw_input: str) -> dict:
# Sequential pipeline: Preprocessing -> Model -> Postprocessing
features = await self.preprocessing.to_async.preprocess(raw_input)
raw_prediction = await self.model.to_async.predict(features)
result = await self.postprocessing.to_async.format_output(raw_prediction)
return result
Example Implementations
Sequential Pipeline (A -> B -> C)
@bentoml.service
class SequentialPipeline:
step_a = bentoml.depends(ServiceA)
step_b = bentoml.depends(ServiceB)
step_c = bentoml.depends(ServiceC)
@bentoml.api
async def run(self, data: str) -> dict:
result_a = await self.step_a.to_async.process(data)
result_b = await self.step_b.to_async.process(result_a)
result_c = await self.step_c.to_async.process(result_b)
return result_c
Parallel Ensemble (A -> [B, C] -> D)
import asyncio
@bentoml.service
class EnsemblePipeline:
preprocessor = bentoml.depends(Preprocessor)
model_a = bentoml.depends(ModelA)
model_b = bentoml.depends(ModelB)
aggregator = bentoml.depends(Aggregator)
@bentoml.api
async def run(self, data: str) -> dict:
preprocessed = await self.preprocessor.to_async.process(data)
# Parallel inference
result_a, result_b = await asyncio.gather(
self.model_a.to_async.predict(preprocessed),
self.model_b.to_async.predict(preprocessed),
)
# Aggregate results
final = await self.aggregator.to_async.combine(result_a, result_b)
return final
Mixed DAG
import asyncio
@bentoml.service
class MixedDAGPipeline:
preprocessor = bentoml.depends(Preprocessor)
text_model = bentoml.depends(TextModel)
image_model = bentoml.depends(ImageModel)
fusion = bentoml.depends(FusionModel)
ranker = bentoml.depends(Ranker)
@bentoml.api
async def run(self, text: str, image_url: str) -> dict:
# Preprocessing (sequential)
text_features = await self.preprocessor.to_async.process_text(text)
image_features = await self.preprocessor.to_async.process_image(image_url)
# Parallel inference on different modalities
text_pred, image_pred = await asyncio.gather(
self.text_model.to_async.predict(text_features),
self.image_model.to_async.predict(image_features),
)
# Fusion and ranking (sequential)
fused = await self.fusion.to_async.fuse(text_pred, image_pred)
ranked = await self.ranker.to_async.rank(fused)
return ranked
Design Decisions
| Decision | Recommendation | Rationale |
|---|---|---|
| When to compose vs. single service | Compose when models need different resources or independent scaling | Avoids over-provisioning resources for lightweight steps |
| Resource allocation per model | Declare resources in @bentoml.service decorator |
Each service gets dedicated resources matching its workload |
| Error handling in pipelines | Use try/except in the entry service API method | Centralizes error handling and allows graceful degradation |
| Sync vs. async calls | Use .to_async for I/O-bound calls; sync for CPU-bound |
Async enables parallelism; sync is simpler for linear flows |
Source Files
docs/source/get-started/model-composition.rst-- Primary documentation for multi-model compositiondocs/source/build-with-bentoml/distributed-services.rst-- Distributed services deployment guide
Relationship to Principle
This pattern implements the Service Architecture Design principle by providing concrete code patterns for decomposing multi-model inference into composable BentoML services.
Principle:Bentoml_BentoML_Service_Architecture_Design