Implementation:Anthropics Anthropic sdk python Messages Stream
| Knowledge Sources | |
|---|---|
| Domains | Streaming, LLM |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
This implementation covers the Messages.stream() method and the MessageStreamManager context manager that together set up a streaming connection to the Anthropic Messages API. The stream() method captures request parameters, forces stream=True, and returns a MessageStreamManager whose __enter__ fires the actual HTTP request.
API Signature
Sync: Messages.stream()
def stream(
self,
*,
max_tokens: int,
messages: Iterable[MessageParam],
model: ModelParam,
inference_geo: Optional[str] | Omit = omit,
metadata: MetadataParam | Omit = omit,
output_config: OutputConfigParam | Omit = omit,
output_format: None | JSONOutputFormatParam | type[ResponseFormatT] | Omit = omit,
container: Optional[str] | Omit = omit,
service_tier: Literal["auto", "standard_only"] | Omit = omit,
stop_sequences: SequenceNotStr[str] | Omit = omit,
system: Union[str, Iterable[TextBlockParam]] | Omit = omit,
temperature: float | Omit = omit,
top_k: int | Omit = omit,
top_p: float | Omit = omit,
thinking: ThinkingConfigParam | Omit = omit,
tool_choice: ToolChoiceParam | Omit = omit,
tools: Iterable[ToolUnionParam] | Omit = omit,
extra_headers: Headers | None = None,
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> MessageStreamManager[ResponseFormatT]
Source: src/anthropic/resources/messages/messages.py lines 1005-1116
Async: AsyncMessages.stream()
def stream(
self,
*,
max_tokens: int,
messages: Iterable[MessageParam],
model: ModelParam,
# ... same parameters as sync variant ...
) -> AsyncMessageStreamManager[ResponseFormatT]
Source: src/anthropic/resources/messages/messages.py lines 2423-2533
Import
from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Tell me a story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print()
Internal Behavior
Step 1: Parameter Capture and stream=True Injection
When Messages.stream() is called, it does not immediately issue an HTTP request. Instead, it:
- Adds the
X-Stainless-Helper-Method: streamandX-Stainless-Stream-Helper: messagesheaders (line 1047-1051). - Transforms
output_formatif it is a Pydantic model type, generating a JSON schema (lines 1053-1069). - Uses
functools.partialto capture a deferred call toself._post("/v1/messages", ...)with"stream": Truehardcoded in the body (line 1102) andstream=Truepassed to the transport layer (line 1110).
make_request = partial(
self._post,
"/v1/messages",
body=maybe_transform(
{
"max_tokens": max_tokens,
"messages": messages,
"model": model,
# ... other parameters ...
"stream": True, # Forced at line 1102
},
message_create_params.MessageCreateParams,
),
options=make_request_options(
extra_headers=extra_headers, extra_query=extra_query,
extra_body=extra_body, timeout=timeout
),
cast_to=Message,
stream=True,
stream_cls=Stream[RawMessageStreamEvent],
)
Step 2: MessageStreamManager Construction
The captured callable is passed to MessageStreamManager (lines 1113-1116):
return MessageStreamManager(
make_request,
output_format=NOT_GIVEN if is_dict(output_format) else cast(ResponseFormatT, output_format),
)
Step 3: Deferred Execution in __enter__
MessageStreamManager.__enter__ (defined in src/anthropic/lib/streaming/_messages.py lines 166-169) fires the HTTP request and wraps the result:
class MessageStreamManager(Generic[ResponseFormatT]):
def __init__(
self,
api_request: Callable[[], Stream[RawMessageStreamEvent]],
*,
output_format: ResponseFormatT | NotGiven,
) -> None:
self.__stream: MessageStream[ResponseFormatT] | None = None
self.__api_request = api_request
self.__output_format = output_format
def __enter__(self) -> MessageStream[ResponseFormatT]:
raw_stream = self.__api_request() # HTTP request fires here
self.__stream = MessageStream(raw_stream, output_format=self.__output_format)
return self.__stream
def __exit__(self, exc_type, exc, exc_tb) -> None:
if self.__stream is not None:
self.__stream.close() # Connection cleanup here
Step 4: Async Variant
The async variant at lines 2423-2533 differs in that it stores an Awaitable[AsyncStream[RawMessageStreamEvent]] instead of a Callable. The AsyncMessageStreamManager.__aenter__ method awaits this coroutine:
class AsyncMessageStreamManager(Generic[ResponseFormatT]):
async def __aenter__(self) -> AsyncMessageStream[ResponseFormatT]:
raw_stream = await self.__api_request # Await the coroutine
self.__stream = AsyncMessageStream(raw_stream, output_format=self.__output_format)
return self.__stream
async def __aexit__(self, exc_type, exc, exc_tb) -> None:
if self.__stream is not None:
await self.__stream.close()
Output
The stream() method returns a MessageStreamManager[ResponseFormatT] context manager. The API call is deferred until __enter__ is called, at which point a MessageStream[ResponseFormatT] is returned. This MessageStream is an iterable that yields ParsedMessageStreamEvent objects.
Dependencies
- httpx: Underlying HTTP transport providing the streaming connection.
- pydantic: Used for
TypeAdapterto generate JSON schemas whenoutput_formatis a model class. - functools.partial: Used to capture the deferred HTTP call (sync variant).
Key Source Files
src/anthropic/resources/messages/messages.py--Messages.stream()(L1005-1116) andAsyncMessages.stream()(L2423-2533)src/anthropic/lib/streaming/_messages.py--MessageStreamManager(L146-178) andAsyncMessageStreamManager(L294-328)