Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Anthropics Anthropic sdk python Stream Event Iteration

From Leeroopedia
Knowledge Sources
Domains Streaming, LLM
Last Updated 2026-02-15 00:00 GMT

Overview

The Stream Event Iteration principle describes how the Anthropic Python SDK transforms a low-level Server-Sent Events (SSE) byte stream from the Anthropic API into a high-level, typed Python iterator. Each raw SSE event is processed through two transformations -- accumulation (updating a running message snapshot) and event building (producing typed domain events) -- before being yielded to the caller. This allows application code to iterate over semantically meaningful events using a standard Python for loop.

Core Concepts

Server-Sent Events (SSE)

The Anthropic Messages API uses the Server-Sent Events protocol for streaming. The server sends a sequence of events over a long-lived HTTP connection:

event: message_start
data: {"type":"message_start","message":{"id":"msg_...","type":"message",...}}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}

event: content_block_stop
data: {"type":"content_block_stop","index":0}

event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":12}}

event: message_stop
data: {"type":"message_stop"}

The SDK's transport layer (httpx + the Stream wrapper) deserializes each SSE event into a typed RawMessageStreamEvent Pydantic model. The MessageStream class then processes these raw events into higher-level typed events.

The Iterator Pattern

MessageStream implements Python's iterator protocol (__iter__ and __next__), allowing it to be consumed with a standard for loop. Internally, the __stream__ generator method drives iteration:

raw SSE events        -->  accumulate_event()  -->  build_events()  -->  typed events yielded
(from httpx Stream)        (update snapshot)        (produce events)     (to caller's for loop)

For each raw SSE event:

  1. accumulate_event() updates the running ParsedMessage snapshot. For example, a text_delta event appends its text to the current text block.
  2. build_events() inspects the raw event and produces one or more typed domain events (e.g., TextEvent, ThinkingEvent, InputJsonEvent).
  3. The typed events are yielded to the caller.

Event Accumulation

Accumulation is the process of maintaining a running snapshot of the complete message as it is being streamed. Each SSE delta is merged into the snapshot:

  • message_start: Initializes the snapshot from the initial message object.
  • content_block_start: Appends a new content block to the snapshot.
  • content_block_delta: Updates the content block at the specified index (appends text, partial JSON, thinking text, etc.).
  • content_block_stop: Triggers parsing of structured output if output_format is set.
  • message_delta: Updates stop_reason, stop_sequence, and usage on the snapshot.

This means that at any point during iteration, the current_message_snapshot property reflects the complete message accumulated so far.

From Wire Events to Domain Events

The raw SSE events are protocol-level constructs (starts, deltas, stops). The SDK transforms certain deltas into higher-level domain events that are more convenient for application code:

Wire Event Domain Event Payload
content_block_delta with text_delta TextEvent .text (delta), .snapshot (accumulated)
content_block_delta with input_json_delta InputJsonEvent .partial_json (delta), .snapshot (parsed object)
content_block_delta with thinking_delta ThinkingEvent .thinking (delta), .snapshot (accumulated)
content_block_delta with signature_delta SignatureEvent .signature (full signature)
content_block_delta with citations_delta CitationEvent .citation (new), .snapshot (all citations)
message_stop ParsedMessageStopEvent .message (final snapshot)
content_block_stop ParsedContentBlockStopEvent .content_block (final block)

Note that content_block_delta events produce two yields: the original raw delta event and the higher-level domain event. This gives callers the flexibility to work at either abstraction level.

The text_stream Convenience

For the common case of simply extracting text content, MessageStream provides a text_stream property. This is a filtered iterator that yields only the .text string from text_delta events, discarding everything else:

for text in stream.text_stream:
    print(text, end="", flush=True)

This is a convenience wrapper around the full event iterator, filtering for content_block_delta events where delta.type == "text_delta".

Design Rationale

The two-pass approach (accumulate then build) cleanly separates concerns:

  • Accumulation is stateful and maintains the single source of truth for the message snapshot.
  • Event building is stateless -- it reads the raw event and current snapshot to produce typed events.

This separation makes the code easier to test and extend. Adding a new event type only requires updating build_events() without touching the accumulation logic, and vice versa.

The iterator pattern was chosen because it maps naturally to Python's for loop syntax and composes well with generator expressions, itertools, and async iteration (async for).

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment