Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Anthropics Anthropic sdk python MessageStream Iterator

From Leeroopedia
Knowledge Sources
Domains Streaming, LLM
Last Updated 2026-02-15 00:00 GMT

Overview

This implementation covers the MessageStream class and its core __stream__ generator method, which transforms raw SSE events from the httpx transport into typed ParsedMessageStreamEvent objects. The class implements Python's iterator protocol, allowing callers to consume streaming events with a standard for loop. It also provides the text_stream convenience property for extracting only text deltas.

API Signature

MessageStream class

class MessageStream(Generic[ResponseFormatT]):
    text_stream: Iterator[str]

    def __init__(
        self,
        raw_stream: Stream[RawMessageStreamEvent],
        output_format: ResponseFormatT | NotGiven,
    ) -> None: ...

    def __iter__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]: ...
    def __next__(self) -> ParsedMessageStreamEvent[ResponseFormatT]: ...
    def __stream__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]: ...
    def __stream_text__(self) -> Iterator[str]: ...

Source: src/anthropic/lib/streaming/_messages.py lines 33-143

__stream__ generator

def __stream__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]:

Source: src/anthropic/lib/streaming/_messages.py lines 128-138

Import

from anthropic.lib.streaming import MessageStream

Typically accessed indirectly through the context manager:

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Tell me a story"}]
) as stream:
    for event in stream:
        if event.type == "text":
            print(f"Text delta: {event.text}")
        elif event.type == "message_stop":
            print("Stream complete")

Internal Behavior

Initialization (lines 44-53)

When MessageStream.__init__ is called, it stores the raw httpx stream, initializes the message snapshot to None, and eagerly creates two generator objects:

def __init__(
    self,
    raw_stream: Stream[RawMessageStreamEvent],
    output_format: ResponseFormatT | NotGiven,
) -> None:
    self._raw_stream = raw_stream
    self.text_stream = self.__stream_text__()      # Filtered text-only iterator
    self._iterator = self.__stream__()             # Full event iterator
    self.__final_message_snapshot: ParsedMessage[ResponseFormatT] | None = None
    self.__output_format = output_format

Both self._iterator (the full event generator) and self.text_stream (the text-only generator) are created at init time but are lazy -- they do not start consuming events until iterated.

Iterator Protocol (lines 63-68)

MessageStream delegates __next__ and __iter__ to the internal _iterator generator:

def __next__(self) -> ParsedMessageStreamEvent[ResponseFormatT]:
    return self._iterator.__next__()

def __iter__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]:
    for item in self._iterator:
        yield item

Core __stream__ Generator (lines 128-138)

The __stream__ method is the heart of the iteration logic. For each raw SSE event from the httpx stream, it performs two operations:

def __stream__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]:
    for sse_event in self._raw_stream:
        self.__final_message_snapshot = accumulate_event(
            event=sse_event,
            current_snapshot=self.__final_message_snapshot,
            output_format=self.__output_format,
        )

        events_to_fire = build_events(
            event=sse_event,
            message_snapshot=self.current_message_snapshot,
        )
        for event in events_to_fire:
            yield event
  1. accumulate_event(): Updates the running __final_message_snapshot by merging the SSE delta into the accumulated message.
  2. build_events(): Produces a list of typed events to yield. Some raw events produce multiple typed events (e.g., a content_block_delta with text_delta yields both the raw delta event and a TextEvent).

Yielded Event Types

The __stream__ generator yields ParsedMessageStreamEvent[ResponseFormatT], which is a union of:

  • TextEvent: Contains .text (the delta string) and .snapshot (accumulated text so far).
  • ThinkingEvent: Contains .thinking (delta) and .snapshot (accumulated thinking).
  • SignatureEvent: Contains .signature (the signature string).
  • InputJsonEvent: Contains .partial_json (delta) and .snapshot (partially parsed object via jiter).
  • CitationEvent: Contains .citation (new citation) and .snapshot (all citations).
  • ParsedMessageStopEvent: Contains .message (the final accumulated ParsedMessage).
  • ParsedContentBlockStopEvent: Contains .content_block (the completed content block).
  • Raw SSE events: RawMessageStartEvent, RawMessageDeltaEvent, RawContentBlockStartEvent, RawContentBlockDeltaEvent.

text_stream Convenience (lines 140-143)

The text_stream property provides a filtered view over the full event stream, yielding only the raw text strings from text deltas:

def __stream_text__(self) -> Iterator[str]:
    for chunk in self:
        if chunk.type == "content_block_delta" and chunk.delta.type == "text_delta":
            yield chunk.delta.text

Usage:

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Tell me a story"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
print()

Async Variant: AsyncMessageStream (lines 181-291)

AsyncMessageStream mirrors MessageStream with async equivalents:

  • __aiter__ / __anext__ instead of __iter__ / __next__
  • async def __stream__ using async for sse_event in self._raw_stream
  • text_stream is an AsyncIterator[str]
async def __stream__(self) -> AsyncIterator[ParsedMessageStreamEvent[ResponseFormatT]]:
    async for sse_event in self._raw_stream:
        self.__final_message_snapshot = accumulate_event(
            event=sse_event,
            current_snapshot=self.__final_message_snapshot,
            output_format=self.__output_format,
        )
        events_to_fire = build_events(event=sse_event, message_snapshot=self.current_message_snapshot)
        for event in events_to_fire:
            yield event

Note that accumulate_event() and build_events() are the same synchronous functions in both variants -- only the iteration over the raw stream is async.

Dependencies

  • jiter: Used for partial JSON parsing in InputJsonEvent snapshots (via accumulate_event).
  • pydantic: All event types extend BaseModel or GenericModel.
  • httpx: Provides the underlying Stream/AsyncStream transport.

Key Source Files

  • src/anthropic/lib/streaming/_messages.py -- MessageStream (L33-143), AsyncMessageStream (L181-291)
  • src/anthropic/lib/streaming/_types.py -- Event type definitions (TextEvent, ThinkingEvent, etc.)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment