Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Anthropics Anthropic sdk python Messages Stream

From Leeroopedia
Knowledge Sources
Domains Streaming, LLM
Last Updated 2026-02-15 00:00 GMT

Overview

This implementation covers the Messages.stream() method and the MessageStreamManager context manager that together set up a streaming connection to the Anthropic Messages API. The stream() method captures request parameters, forces stream=True, and returns a MessageStreamManager whose __enter__ fires the actual HTTP request.

API Signature

Sync: Messages.stream()

def stream(
    self,
    *,
    max_tokens: int,
    messages: Iterable[MessageParam],
    model: ModelParam,
    inference_geo: Optional[str] | Omit = omit,
    metadata: MetadataParam | Omit = omit,
    output_config: OutputConfigParam | Omit = omit,
    output_format: None | JSONOutputFormatParam | type[ResponseFormatT] | Omit = omit,
    container: Optional[str] | Omit = omit,
    service_tier: Literal["auto", "standard_only"] | Omit = omit,
    stop_sequences: SequenceNotStr[str] | Omit = omit,
    system: Union[str, Iterable[TextBlockParam]] | Omit = omit,
    temperature: float | Omit = omit,
    top_k: int | Omit = omit,
    top_p: float | Omit = omit,
    thinking: ThinkingConfigParam | Omit = omit,
    tool_choice: ToolChoiceParam | Omit = omit,
    tools: Iterable[ToolUnionParam] | Omit = omit,
    extra_headers: Headers | None = None,
    extra_query: Query | None = None,
    extra_body: Body | None = None,
    timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> MessageStreamManager[ResponseFormatT]

Source: src/anthropic/resources/messages/messages.py lines 1005-1116

Async: AsyncMessages.stream()

def stream(
    self,
    *,
    max_tokens: int,
    messages: Iterable[MessageParam],
    model: ModelParam,
    # ... same parameters as sync variant ...
) -> AsyncMessageStreamManager[ResponseFormatT]

Source: src/anthropic/resources/messages/messages.py lines 2423-2533

Import

from anthropic import Anthropic

client = Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Tell me a story"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
print()

Internal Behavior

Step 1: Parameter Capture and stream=True Injection

When Messages.stream() is called, it does not immediately issue an HTTP request. Instead, it:

  1. Adds the X-Stainless-Helper-Method: stream and X-Stainless-Stream-Helper: messages headers (line 1047-1051).
  2. Transforms output_format if it is a Pydantic model type, generating a JSON schema (lines 1053-1069).
  3. Uses functools.partial to capture a deferred call to self._post("/v1/messages", ...) with "stream": True hardcoded in the body (line 1102) and stream=True passed to the transport layer (line 1110).
make_request = partial(
    self._post,
    "/v1/messages",
    body=maybe_transform(
        {
            "max_tokens": max_tokens,
            "messages": messages,
            "model": model,
            # ... other parameters ...
            "stream": True,  # Forced at line 1102
        },
        message_create_params.MessageCreateParams,
    ),
    options=make_request_options(
        extra_headers=extra_headers, extra_query=extra_query,
        extra_body=extra_body, timeout=timeout
    ),
    cast_to=Message,
    stream=True,
    stream_cls=Stream[RawMessageStreamEvent],
)

Step 2: MessageStreamManager Construction

The captured callable is passed to MessageStreamManager (lines 1113-1116):

return MessageStreamManager(
    make_request,
    output_format=NOT_GIVEN if is_dict(output_format) else cast(ResponseFormatT, output_format),
)

Step 3: Deferred Execution in __enter__

MessageStreamManager.__enter__ (defined in src/anthropic/lib/streaming/_messages.py lines 166-169) fires the HTTP request and wraps the result:

class MessageStreamManager(Generic[ResponseFormatT]):
    def __init__(
        self,
        api_request: Callable[[], Stream[RawMessageStreamEvent]],
        *,
        output_format: ResponseFormatT | NotGiven,
    ) -> None:
        self.__stream: MessageStream[ResponseFormatT] | None = None
        self.__api_request = api_request
        self.__output_format = output_format

    def __enter__(self) -> MessageStream[ResponseFormatT]:
        raw_stream = self.__api_request()        # HTTP request fires here
        self.__stream = MessageStream(raw_stream, output_format=self.__output_format)
        return self.__stream

    def __exit__(self, exc_type, exc, exc_tb) -> None:
        if self.__stream is not None:
            self.__stream.close()                # Connection cleanup here

Step 4: Async Variant

The async variant at lines 2423-2533 differs in that it stores an Awaitable[AsyncStream[RawMessageStreamEvent]] instead of a Callable. The AsyncMessageStreamManager.__aenter__ method awaits this coroutine:

class AsyncMessageStreamManager(Generic[ResponseFormatT]):
    async def __aenter__(self) -> AsyncMessageStream[ResponseFormatT]:
        raw_stream = await self.__api_request    # Await the coroutine
        self.__stream = AsyncMessageStream(raw_stream, output_format=self.__output_format)
        return self.__stream

    async def __aexit__(self, exc_type, exc, exc_tb) -> None:
        if self.__stream is not None:
            await self.__stream.close()

Output

The stream() method returns a MessageStreamManager[ResponseFormatT] context manager. The API call is deferred until __enter__ is called, at which point a MessageStream[ResponseFormatT] is returned. This MessageStream is an iterable that yields ParsedMessageStreamEvent objects.

Dependencies

  • httpx: Underlying HTTP transport providing the streaming connection.
  • pydantic: Used for TypeAdapter to generate JSON schemas when output_format is a model class.
  • functools.partial: Used to capture the deferred HTTP call (sync variant).

Key Source Files

  • src/anthropic/resources/messages/messages.py -- Messages.stream() (L1005-1116) and AsyncMessages.stream() (L2423-2533)
  • src/anthropic/lib/streaming/_messages.py -- MessageStreamManager (L146-178) and AsyncMessageStreamManager (L294-328)

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment