Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Anthropics Anthropic sdk python Stream Event Processing

From Leeroopedia
Knowledge Sources
Domains Streaming, LLM
Last Updated 2026-02-15 00:00 GMT

Overview

This implementation covers the two core functions that process streaming events in real time: build_events() transforms raw SSE events into typed domain events, and accumulate_event() merges each SSE delta into a running message snapshot. Together, these functions implement the accumulation and dispatch pattern that powers the SDK's streaming infrastructure.

API Signatures

build_events()

def build_events(
    *,
    event: RawMessageStreamEvent,
    message_snapshot: ParsedMessage[ResponseFormatT],
) -> list[ParsedMessageStreamEvent[ResponseFormatT]]

Source: src/anthropic/lib/streaming/_messages.py lines 331-422

accumulate_event()

def accumulate_event(
    *,
    event: RawMessageStreamEvent,
    current_snapshot: ParsedMessage[ResponseFormatT] | None,
    output_format: ResponseFormatT | NotGiven = NOT_GIVEN,
) -> ParsedMessage[ResponseFormatT]

Source: src/anthropic/lib/streaming/_messages.py lines 433-518

Import

These are module-level functions, not typically imported directly by user code. They are called internally by MessageStream.__stream__():

# Internal usage within __stream__:
self.__final_message_snapshot = accumulate_event(
    event=sse_event,
    current_snapshot=self.__final_message_snapshot,
    output_format=self.__output_format,
)
events_to_fire = build_events(
    event=sse_event,
    message_snapshot=self.current_message_snapshot,
)

build_events() -- Detailed Behavior

build_events() is a stateless function that takes a raw SSE event and the current message snapshot, and returns a list of typed events to yield to the caller. Some raw events produce multiple output events.

Event Dispatch Table

Raw Event Type Output Events
message_start Passes through the raw event as-is
message_delta Passes through the raw event as-is
message_stop Produces a ParsedMessageStopEvent with the final message_snapshot
content_block_start Passes through the raw event as-is
content_block_delta Passes through the raw delta event plus a domain event (see below)
content_block_stop Produces a ParsedContentBlockStopEvent with the completed content_block

content_block_delta Sub-dispatch

When a content_block_delta event arrives, build_events() appends the raw delta and a higher-level domain event based on event.delta.type:

if event.delta.type == "text_delta":
    if content_block.type == "text":
        events_to_fire.append(
            build(TextEvent, type="text", text=event.delta.text, snapshot=content_block.text)
        )
elif event.delta.type == "input_json_delta":
    if content_block.type == "tool_use":
        events_to_fire.append(
            build(InputJsonEvent, type="input_json",
                  partial_json=event.delta.partial_json, snapshot=content_block.input)
        )
elif event.delta.type == "citations_delta":
    if content_block.type == "text":
        events_to_fire.append(
            build(CitationEvent, type="citation",
                  citation=event.delta.citation, snapshot=content_block.citations or [])
        )
elif event.delta.type == "thinking_delta":
    if content_block.type == "thinking":
        events_to_fire.append(
            build(ThinkingEvent, type="thinking",
                  thinking=event.delta.thinking, snapshot=content_block.thinking)
        )
elif event.delta.type == "signature_delta":
    if content_block.type == "thinking":
        events_to_fire.append(
            build(SignatureEvent, type="signature", signature=content_block.signature)
        )

The domain events read their snapshot field from the already-accumulated message_snapshot, which is why accumulate_event() must be called before build_events().

accumulate_event() -- Detailed Behavior

accumulate_event() is a stateful function that merges each SSE delta into the running ParsedMessage snapshot. It handles the following event types:

message_start (lines 450-454)

Initializes the snapshot from the message_start event's .message field using Pydantic's construct() for performance (skips validation):

if current_snapshot is None:
    if event.type == "message_start":
        return cast(
            ParsedMessage[ResponseFormatT],
            ParsedMessage.construct(**cast(Any, event.message.to_dict()))
        )
    raise RuntimeError(f'Unexpected event order, got {event.type} before "message_start"')

content_block_start (lines 456-463)

Appends a new content block to the snapshot's content list:

if event.type == "content_block_start":
    current_snapshot.content.append(
        cast(Any, construct_type(type_=ParsedContentBlock, value=event.content_block.model_dump()))
    )

content_block_delta (lines 464-498)

Updates the content block at event.index based on the delta type:

  • text_delta: Appends event.delta.text to content.text (string concatenation).
  • input_json_delta: Appends the partial JSON string to an internal buffer (__json_buf property), then re-parses the entire buffer with jiter.from_json(json_buf, partial_mode=True). This incrementally parses the growing JSON string into a Python dict.
  • citations_delta: Appends the new citation to content.citations list (initializes the list if empty).
  • thinking_delta: Appends event.delta.thinking to content.thinking (string concatenation).
  • signature_delta: Sets content.signature to event.delta.signature (replacement, not accumulation).

content_block_stop (lines 499-502)

Triggers structured output parsing if output_format is specified:

if event.type == "content_block_stop":
    content_block = current_snapshot.content[event.index]
    if content_block.type == "text" and is_given(output_format):
        content_block.parsed_output = parse_text(content_block.text, output_format)

message_delta (lines 503-516)

Updates the message-level metadata:

if event.type == "message_delta":
    current_snapshot.stop_reason = event.delta.stop_reason
    current_snapshot.stop_sequence = event.delta.stop_sequence
    current_snapshot.usage.output_tokens = event.usage.output_tokens
    # Also updates input_tokens, cache tokens, and server_tool_use if present

Event Type Definitions

The typed domain events are defined in src/anthropic/lib/streaming/_types.py:

TextEvent (lines 23-33)

class TextEvent(BaseModel):
    type: Literal["text"]
    text: str          # The text delta
    snapshot: str      # The entire accumulated text

    def parsed_snapshot(self) -> Dict[str, Any]:
        return cast(Dict[str, Any],
            jiter.from_json(self.snapshot.encode("utf-8"), partial_mode="trailing-strings"))

The parsed_snapshot() method provides partial JSON parsing of the accumulated text, useful when the model is streaming JSON output.

Other Event Types

  • ThinkingEvent (line 46): .thinking (delta) + .snapshot (accumulated)
  • SignatureEvent (line 56): .signature (complete signature string)
  • InputJsonEvent (line 63): .partial_json (delta) + .snapshot (parsed object)
  • CitationEvent (line 36): .citation (new citation) + .snapshot (all citations)
  • ParsedMessageStopEvent (line 110): .message (final ParsedMessage)
  • ParsedContentBlockStopEvent (line 116): .content_block (final ParsedContentBlock)

Usage Example

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        if event.type == "text":
            print(f"Text delta: {event.text}")
            print(f"Accumulated so far: {event.snapshot}")
        elif event.type == "thinking":
            print(f"Thinking: {event.thinking}")
        elif event.type == "input_json":
            print(f"Tool input so far: {event.snapshot}")
        elif event.type == "message_stop":
            print(f"Done! Final message: {event.message}")

The text_stream convenience property provides a simpler interface when only text content is needed:

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Tell me a story"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
print()

Internally, text_stream iterates over the full event stream and filters for content_block_delta events with text_delta type (lines 140-143):

def __stream_text__(self) -> Iterator[str]:
    for chunk in self:
        if chunk.type == "content_block_delta" and chunk.delta.type == "text_delta":
            yield chunk.delta.text

Dependencies

  • jiter: Fast Rust-based JSON parser used for partial JSON parsing of tool inputs (from_json with partial_mode=True). Also used in TextEvent.parsed_snapshot().
  • pydantic: All event types extend BaseModel. construct() is used for validation-free snapshot initialization.

Key Source Files

  • src/anthropic/lib/streaming/_messages.py -- build_events() (L331-422), accumulate_event() (L433-518)
  • src/anthropic/lib/streaming/_types.py -- Event type definitions (L23-140)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment