Implementation:Anthropics Anthropic sdk python Stream Event Processing
| Knowledge Sources | |
|---|---|
| Domains | Streaming, LLM |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
This implementation covers the two core functions that process streaming events in real time: build_events() transforms raw SSE events into typed domain events, and accumulate_event() merges each SSE delta into a running message snapshot. Together, these functions implement the accumulation and dispatch pattern that powers the SDK's streaming infrastructure.
API Signatures
build_events()
def build_events(
*,
event: RawMessageStreamEvent,
message_snapshot: ParsedMessage[ResponseFormatT],
) -> list[ParsedMessageStreamEvent[ResponseFormatT]]
Source: src/anthropic/lib/streaming/_messages.py lines 331-422
accumulate_event()
def accumulate_event(
*,
event: RawMessageStreamEvent,
current_snapshot: ParsedMessage[ResponseFormatT] | None,
output_format: ResponseFormatT | NotGiven = NOT_GIVEN,
) -> ParsedMessage[ResponseFormatT]
Source: src/anthropic/lib/streaming/_messages.py lines 433-518
Import
These are module-level functions, not typically imported directly by user code. They are called internally by MessageStream.__stream__():
# Internal usage within __stream__:
self.__final_message_snapshot = accumulate_event(
event=sse_event,
current_snapshot=self.__final_message_snapshot,
output_format=self.__output_format,
)
events_to_fire = build_events(
event=sse_event,
message_snapshot=self.current_message_snapshot,
)
build_events() -- Detailed Behavior
build_events() is a stateless function that takes a raw SSE event and the current message snapshot, and returns a list of typed events to yield to the caller. Some raw events produce multiple output events.
Event Dispatch Table
| Raw Event Type | Output Events |
|---|---|
message_start |
Passes through the raw event as-is |
message_delta |
Passes through the raw event as-is |
message_stop |
Produces a ParsedMessageStopEvent with the final message_snapshot
|
content_block_start |
Passes through the raw event as-is |
content_block_delta |
Passes through the raw delta event plus a domain event (see below) |
content_block_stop |
Produces a ParsedContentBlockStopEvent with the completed content_block
|
content_block_delta Sub-dispatch
When a content_block_delta event arrives, build_events() appends the raw delta and a higher-level domain event based on event.delta.type:
if event.delta.type == "text_delta":
if content_block.type == "text":
events_to_fire.append(
build(TextEvent, type="text", text=event.delta.text, snapshot=content_block.text)
)
elif event.delta.type == "input_json_delta":
if content_block.type == "tool_use":
events_to_fire.append(
build(InputJsonEvent, type="input_json",
partial_json=event.delta.partial_json, snapshot=content_block.input)
)
elif event.delta.type == "citations_delta":
if content_block.type == "text":
events_to_fire.append(
build(CitationEvent, type="citation",
citation=event.delta.citation, snapshot=content_block.citations or [])
)
elif event.delta.type == "thinking_delta":
if content_block.type == "thinking":
events_to_fire.append(
build(ThinkingEvent, type="thinking",
thinking=event.delta.thinking, snapshot=content_block.thinking)
)
elif event.delta.type == "signature_delta":
if content_block.type == "thinking":
events_to_fire.append(
build(SignatureEvent, type="signature", signature=content_block.signature)
)
The domain events read their snapshot field from the already-accumulated message_snapshot, which is why accumulate_event() must be called before build_events().
accumulate_event() -- Detailed Behavior
accumulate_event() is a stateful function that merges each SSE delta into the running ParsedMessage snapshot. It handles the following event types:
message_start (lines 450-454)
Initializes the snapshot from the message_start event's .message field using Pydantic's construct() for performance (skips validation):
if current_snapshot is None:
if event.type == "message_start":
return cast(
ParsedMessage[ResponseFormatT],
ParsedMessage.construct(**cast(Any, event.message.to_dict()))
)
raise RuntimeError(f'Unexpected event order, got {event.type} before "message_start"')
content_block_start (lines 456-463)
Appends a new content block to the snapshot's content list:
if event.type == "content_block_start":
current_snapshot.content.append(
cast(Any, construct_type(type_=ParsedContentBlock, value=event.content_block.model_dump()))
)
content_block_delta (lines 464-498)
Updates the content block at event.index based on the delta type:
text_delta: Appendsevent.delta.texttocontent.text(string concatenation).input_json_delta: Appends the partial JSON string to an internal buffer (__json_bufproperty), then re-parses the entire buffer withjiter.from_json(json_buf, partial_mode=True). This incrementally parses the growing JSON string into a Python dict.citations_delta: Appends the new citation tocontent.citationslist (initializes the list if empty).thinking_delta: Appendsevent.delta.thinkingtocontent.thinking(string concatenation).signature_delta: Setscontent.signaturetoevent.delta.signature(replacement, not accumulation).
content_block_stop (lines 499-502)
Triggers structured output parsing if output_format is specified:
if event.type == "content_block_stop":
content_block = current_snapshot.content[event.index]
if content_block.type == "text" and is_given(output_format):
content_block.parsed_output = parse_text(content_block.text, output_format)
message_delta (lines 503-516)
Updates the message-level metadata:
if event.type == "message_delta":
current_snapshot.stop_reason = event.delta.stop_reason
current_snapshot.stop_sequence = event.delta.stop_sequence
current_snapshot.usage.output_tokens = event.usage.output_tokens
# Also updates input_tokens, cache tokens, and server_tool_use if present
Event Type Definitions
The typed domain events are defined in src/anthropic/lib/streaming/_types.py:
TextEvent (lines 23-33)
class TextEvent(BaseModel):
type: Literal["text"]
text: str # The text delta
snapshot: str # The entire accumulated text
def parsed_snapshot(self) -> Dict[str, Any]:
return cast(Dict[str, Any],
jiter.from_json(self.snapshot.encode("utf-8"), partial_mode="trailing-strings"))
The parsed_snapshot() method provides partial JSON parsing of the accumulated text, useful when the model is streaming JSON output.
Other Event Types
ThinkingEvent(line 46):.thinking(delta) +.snapshot(accumulated)SignatureEvent(line 56):.signature(complete signature string)InputJsonEvent(line 63):.partial_json(delta) +.snapshot(parsed object)CitationEvent(line 36):.citation(new citation) +.snapshot(all citations)ParsedMessageStopEvent(line 110):.message(finalParsedMessage)ParsedContentBlockStopEvent(line 116):.content_block(finalParsedContentBlock)
Usage Example
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
if event.type == "text":
print(f"Text delta: {event.text}")
print(f"Accumulated so far: {event.snapshot}")
elif event.type == "thinking":
print(f"Thinking: {event.thinking}")
elif event.type == "input_json":
print(f"Tool input so far: {event.snapshot}")
elif event.type == "message_stop":
print(f"Done! Final message: {event.message}")
The text_stream convenience property provides a simpler interface when only text content is needed:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Tell me a story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print()
Internally, text_stream iterates over the full event stream and filters for content_block_delta events with text_delta type (lines 140-143):
def __stream_text__(self) -> Iterator[str]:
for chunk in self:
if chunk.type == "content_block_delta" and chunk.delta.type == "text_delta":
yield chunk.delta.text
Dependencies
- jiter: Fast Rust-based JSON parser used for partial JSON parsing of tool inputs (
from_jsonwithpartial_mode=True). Also used inTextEvent.parsed_snapshot(). - pydantic: All event types extend
BaseModel.construct()is used for validation-free snapshot initialization.
Key Source Files
src/anthropic/lib/streaming/_messages.py--build_events()(L331-422),accumulate_event()(L433-518)src/anthropic/lib/streaming/_types.py-- Event type definitions (L23-140)