Implementation:Anthropics Anthropic sdk python MessageStream Iterator
| Knowledge Sources | |
|---|---|
| Domains | Streaming, LLM |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
This implementation covers the MessageStream class and its core __stream__ generator method, which transforms raw SSE events from the httpx transport into typed ParsedMessageStreamEvent objects. The class implements Python's iterator protocol, allowing callers to consume streaming events with a standard for loop. It also provides the text_stream convenience property for extracting only text deltas.
API Signature
MessageStream class
class MessageStream(Generic[ResponseFormatT]):
text_stream: Iterator[str]
def __init__(
self,
raw_stream: Stream[RawMessageStreamEvent],
output_format: ResponseFormatT | NotGiven,
) -> None: ...
def __iter__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]: ...
def __next__(self) -> ParsedMessageStreamEvent[ResponseFormatT]: ...
def __stream__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]: ...
def __stream_text__(self) -> Iterator[str]: ...
Source: src/anthropic/lib/streaming/_messages.py lines 33-143
__stream__ generator
def __stream__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]:
Source: src/anthropic/lib/streaming/_messages.py lines 128-138
Import
from anthropic.lib.streaming import MessageStream
Typically accessed indirectly through the context manager:
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Tell me a story"}]
) as stream:
for event in stream:
if event.type == "text":
print(f"Text delta: {event.text}")
elif event.type == "message_stop":
print("Stream complete")
Internal Behavior
Initialization (lines 44-53)
When MessageStream.__init__ is called, it stores the raw httpx stream, initializes the message snapshot to None, and eagerly creates two generator objects:
def __init__(
self,
raw_stream: Stream[RawMessageStreamEvent],
output_format: ResponseFormatT | NotGiven,
) -> None:
self._raw_stream = raw_stream
self.text_stream = self.__stream_text__() # Filtered text-only iterator
self._iterator = self.__stream__() # Full event iterator
self.__final_message_snapshot: ParsedMessage[ResponseFormatT] | None = None
self.__output_format = output_format
Both self._iterator (the full event generator) and self.text_stream (the text-only generator) are created at init time but are lazy -- they do not start consuming events until iterated.
Iterator Protocol (lines 63-68)
MessageStream delegates __next__ and __iter__ to the internal _iterator generator:
def __next__(self) -> ParsedMessageStreamEvent[ResponseFormatT]:
return self._iterator.__next__()
def __iter__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]:
for item in self._iterator:
yield item
Core __stream__ Generator (lines 128-138)
The __stream__ method is the heart of the iteration logic. For each raw SSE event from the httpx stream, it performs two operations:
def __stream__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]:
for sse_event in self._raw_stream:
self.__final_message_snapshot = accumulate_event(
event=sse_event,
current_snapshot=self.__final_message_snapshot,
output_format=self.__output_format,
)
events_to_fire = build_events(
event=sse_event,
message_snapshot=self.current_message_snapshot,
)
for event in events_to_fire:
yield event
accumulate_event(): Updates the running__final_message_snapshotby merging the SSE delta into the accumulated message.build_events(): Produces a list of typed events to yield. Some raw events produce multiple typed events (e.g., acontent_block_deltawithtext_deltayields both the raw delta event and aTextEvent).
Yielded Event Types
The __stream__ generator yields ParsedMessageStreamEvent[ResponseFormatT], which is a union of:
TextEvent: Contains.text(the delta string) and.snapshot(accumulated text so far).ThinkingEvent: Contains.thinking(delta) and.snapshot(accumulated thinking).SignatureEvent: Contains.signature(the signature string).InputJsonEvent: Contains.partial_json(delta) and.snapshot(partially parsed object via jiter).CitationEvent: Contains.citation(new citation) and.snapshot(all citations).ParsedMessageStopEvent: Contains.message(the final accumulatedParsedMessage).ParsedContentBlockStopEvent: Contains.content_block(the completed content block).- Raw SSE events:
RawMessageStartEvent,RawMessageDeltaEvent,RawContentBlockStartEvent,RawContentBlockDeltaEvent.
text_stream Convenience (lines 140-143)
The text_stream property provides a filtered view over the full event stream, yielding only the raw text strings from text deltas:
def __stream_text__(self) -> Iterator[str]:
for chunk in self:
if chunk.type == "content_block_delta" and chunk.delta.type == "text_delta":
yield chunk.delta.text
Usage:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Tell me a story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print()
Async Variant: AsyncMessageStream (lines 181-291)
AsyncMessageStream mirrors MessageStream with async equivalents:
__aiter__/__anext__instead of__iter__/__next__async def __stream__usingasync for sse_event in self._raw_streamtext_streamis anAsyncIterator[str]
async def __stream__(self) -> AsyncIterator[ParsedMessageStreamEvent[ResponseFormatT]]:
async for sse_event in self._raw_stream:
self.__final_message_snapshot = accumulate_event(
event=sse_event,
current_snapshot=self.__final_message_snapshot,
output_format=self.__output_format,
)
events_to_fire = build_events(event=sse_event, message_snapshot=self.current_message_snapshot)
for event in events_to_fire:
yield event
Note that accumulate_event() and build_events() are the same synchronous functions in both variants -- only the iteration over the raw stream is async.
Dependencies
- jiter: Used for partial JSON parsing in
InputJsonEventsnapshots (viaaccumulate_event). - pydantic: All event types extend
BaseModelorGenericModel. - httpx: Provides the underlying
Stream/AsyncStreamtransport.
Key Source Files
src/anthropic/lib/streaming/_messages.py--MessageStream(L33-143),AsyncMessageStream(L181-291)src/anthropic/lib/streaming/_types.py-- Event type definitions (TextEvent,ThinkingEvent, etc.)