Implementation:Bentoml BentoML SSE Descriptor
| Knowledge Sources | |
|---|---|
| Domains | IO Descriptors, Server Sent Events, Streaming, HTTP |
| Last Updated | 2026-02-13 15:00 GMT |
Overview
Implements the SSE (Server-Sent Events) data class for marshalling and unmarshalling SSE protocol messages within BentoML IO descriptor utilities.
Description
The SSE class is an attrs-based data class that represents a single Server-Sent Event message. It supports the four standard SSE fields: data (required), event (optional event type name), id (optional event identifier), and retry (optional reconnection time in milliseconds).
The marshal() method serializes the SSE object into the standard SSE wire format ("event: ...\nid: ...\nretry: ...\ndata: ...\n\n"). Multi-line data is split and each line is prefixed with "data: ". Event names containing newlines raise a BentoMLException.
The _read_sse() class method parses a single SSE message from a StringIO buffer, reading lines until a blank line delimiter is encountered. It uses an optimized approach where a secondary data_buffer is only allocated when there are multiple data lines.
The from_iterator() async class method is a streaming parser that takes an AsyncIterator[str] of raw SSE text chunks and yields parsed SSE objects. It detects message boundaries by looking for "\\n\\n" sequences (including across chunk boundaries) and yields each parsed message as it becomes complete.
Usage
Use the SSE class when working with streaming SSE responses in BentoML IO descriptors. It is used internally by the Text IO descriptor when content_type="text/event-stream" is set, and can be used directly for custom SSE parsing and generation.
Code Reference
Source Location
- Repository: Bentoml_BentoML
- File: src/bentoml/_internal/io_descriptors/utils.py
- Lines: 1-97
Signature
@attr.s(auto_attribs=True)
class SSE:
data: str
event: str | None = None
id: str | None = None
retry: int | None = None
def marshal(self) -> str: ...
@classmethod
def _read_sse(cls, buffer: io.StringIO) -> SSE: ...
@classmethod
async def from_iterator(
cls,
async_iterator: t.AsyncIterator[str],
) -> t.AsyncGenerator[SSE, None]: ...
Import
from bentoml._internal.io_descriptors.utils import SSE
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data | str | Yes | The event data payload; may be multi-line |
| event | str or None | No | The event type name; must not contain newlines |
| id | str or None | No | The event identifier |
| retry | int or None | No | Reconnection time in milliseconds |
| async_iterator | AsyncIterator[str] | Yes (from_iterator) | Async iterator of raw SSE text chunks to parse |
| buffer | io.StringIO | Yes (_read_sse) | StringIO buffer positioned at the start of an SSE message |
Outputs
| Name | Type | Description |
|---|---|---|
| str (marshal) | str | SSE wire-format string with proper field prefixes and newline delimiters |
| SSE (_read_sse) | SSE | Parsed SSE object from the buffer |
| AsyncGenerator[SSE] (from_iterator) | AsyncGenerator[SSE, None] | Stream of parsed SSE objects from raw text chunks |
Usage Examples
from bentoml._internal.io_descriptors.utils import SSE
# Create and marshal an SSE message
event = SSE(
data="Hello, world!",
event="message",
id="1",
retry=3000,
)
wire_format = event.marshal()
# Output:
# event: message
# id: 1
# retry: 3000
# data: Hello, world!
#
# Marshal multi-line data
event = SSE(data="line1\nline2\nline3")
wire_format = event.marshal()
# Output:
# data: line1
# data: line2
# data: line3
#
# Parse SSE from an async iterator (e.g., streaming HTTP response)
import asyncio
async def sse_stream():
yield "event: update\ndata: first\n\n"
yield "data: second\n\n"
async def process():
async for sse in SSE.from_iterator(sse_stream()):
print(f"Event: {sse.event}, Data: {sse.data}")
asyncio.run(process())