Implementation:Anthropics Anthropic sdk python Stream Parsed Snapshot
| Knowledge Sources | |
|---|---|
| Domains | Structured_Output, LLM, Data_Extraction |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
The Stream Parsed Snapshot implementation provides the machinery for incremental structured output parsing during streaming. It combines the TextEvent.parsed_snapshot() method for partial JSON decoding with the accumulate_event() function's content_block_stop handler for full validation on completion. Together, these enable a two-phase parsing strategy: lenient partial dictionaries during streaming and strict Pydantic validation when the content block finishes.
API Surface
TextEvent.parsed_snapshot
from anthropic.lib.streaming._types import TextEvent
Source: src/anthropic/lib/streaming/_types.py:L23-33
class TextEvent(BaseModel):
type: Literal["text"]
text: str
"""The text delta"""
snapshot: str
"""The entire accumulated text"""
def parsed_snapshot(self) -> Dict[str, Any]:
return cast(
Dict[str, Any],
jiter.from_json(self.snapshot.encode("utf-8"), partial_mode="trailing-strings"),
)
Returns a Dict[str, Any] representing the best-effort parse of the accumulated JSON text so far. Uses jiter.from_json() with partial_mode="trailing-strings" to handle incomplete JSON gracefully.
Messages.stream (Entry Point)
from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[...],
output_format=MovieReview,
) as stream:
...
Source: src/anthropic/resources/messages/messages.py:L990-1116
The stream() method accepts output_format (a Pydantic BaseModel subclass), performs schema derivation and transformation (same as parse()), and returns a MessageStreamManager that yields ParsedMessageStreamEvent instances. The output_format type is forwarded to the MessageStream for use during event accumulation.
content_block_stop Handling
Source: src/anthropic/lib/streaming/_messages.py:L499-502
elif event.type == "content_block_stop":
content_block = current_snapshot.content[event.index]
if content_block.type == "text" and is_given(output_format):
content_block.parsed_output = parse_text(content_block.text, output_format)
When a content_block_stop event arrives in the accumulate_event() function, the SDK performs full Pydantic validation on the complete text of the content block, storing the validated instance as parsed_output.
stream.get_final_message
After the stream completes, stream.get_final_message() returns the accumulated ParsedMessage[ResponseFormatT] snapshot, which contains fully validated parsed_output on its text content blocks.
Detailed Behavior
Phase 1: Partial Parsing During Streaming
As each text_delta event arrives, the accumulate_event() function appends the delta text to the snapshot's text content block:
if event.delta.type == "text_delta":
if content.type == "text":
content.text += event.delta.text
The SDK then fires a TextEvent that contains both the delta text and the accumulated snapshot. When the developer calls event.parsed_snapshot(), jiter parses the accumulated text:
jiter.from_json(self.snapshot.encode("utf-8"), partial_mode="trailing-strings")
jiter partial_mode="trailing-strings" handles:
- Incomplete objects:
{"title": "Inception", "ratiparses to{"title": "Inception", "rati": ""}(trailing key with empty value) or as much as is parseable. - Unterminated strings:
{"title": "Inceparses with the partial string value"Ince". - Unclosed arrays:
{"pros": ["Great visuals", "Comparses to{"pros": ["Great visuals", "Com"]}.
The return type is Dict[str, Any], not the target Pydantic model, since partial data typically cannot satisfy all required fields.
Phase 2: Full Validation on content_block_stop
When the model finishes generating a content block, the API sends a content_block_stop event. At this point, the accumulated text represents the complete JSON output. The accumulate_event() function calls parse_text():
content_block.parsed_output = parse_text(content_block.text, output_format)
Which internally does:
adapted_type: TypeAdapter[ResponseFormatT] = TypeAdapter(output_format)
return adapted_type.validate_json(text)
This performs strict JSON parsing and Pydantic validation, producing a fully validated ResponseFormatT instance. If validation fails, a ValidationError is raised.
Event Types in Parsed Streams
The streaming system uses specialized event types for parsed streams:
Source: src/anthropic/lib/streaming/_types.py:L110-140
class ParsedMessageStopEvent(RawMessageStopEvent, GenericModel, Generic[ResponseFormatT]):
type: Literal["message_stop"]
message: ParsedMessage[ResponseFormatT]
class ParsedContentBlockStopEvent(RawContentBlockStopEvent, GenericModel, Generic[ResponseFormatT]):
type: Literal["content_block_stop"]
content_block: ParsedContentBlock[ResponseFormatT]
ParsedMessageStreamEvent = Annotated[
Union[
TextEvent,
CitationEvent,
ThinkingEvent,
SignatureEvent,
InputJsonEvent,
RawMessageStartEvent,
RawMessageDeltaEvent,
ParsedMessageStopEvent[ResponseFormatT],
RawContentBlockStartEvent,
RawContentBlockDeltaEvent,
ParsedContentBlockStopEvent[ResponseFormatT],
],
PropertyInfo(discriminator="type"),
]
The ParsedMessageStopEvent carries a ParsedMessage[ResponseFormatT] and the ParsedContentBlockStopEvent carries a ParsedContentBlock[ResponseFormatT], both with validated parsed_output.
MessageStream Class
Source: src/anthropic/lib/streaming/_messages.py:L33-53
The MessageStream class is generic over ResponseFormatT and stores the output_format for use during event accumulation:
class MessageStream(Generic[ResponseFormatT]):
def __init__(
self,
raw_stream: Stream[RawMessageStreamEvent],
output_format: ResponseFormatT | NotGiven,
) -> None:
self._raw_stream = raw_stream
self.text_stream = self.__stream_text__()
self._iterator = self.__stream__()
self.__final_message_snapshot: ParsedMessage[ResponseFormatT] | None = None
self.__output_format = output_format
Usage Example
import anthropic
from pydantic import BaseModel
class MovieReview(BaseModel):
title: str
rating: float
summary: str
pros: list[str]
cons: list[str]
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Review the movie Inception"}],
output_format=MovieReview,
) as stream:
for event in stream:
if event.type == "text":
# Partial Dict[str, Any] via jiter
snapshot = event.parsed_snapshot()
if snapshot:
print(f"Partial: {snapshot}")
# Fully validated MovieReview
final = stream.get_final_message()
review = final.parsed_output
print(f"{review.title}: {review.rating}/10")
print(f"Pros: {', '.join(review.pros)}")
Dependencies
- jiter (
from_jsonwithpartial_mode): For partial JSON decoding during streaming. This is a Rust-backed JSON parser with Python bindings. - pydantic (
BaseModel,TypeAdapter,validate_json): For full validation on stream completion. - anthropic.lib._parse._response (
parse_text,ResponseFormatT): For the validation function and generic type variable. - anthropic.types.parsed_message (
ParsedMessage,ParsedContentBlock): For the typed response containers.
Key Source Files
src/anthropic/lib/streaming/_types.py--TextEvent(L23-33) withparsed_snapshot(),ParsedMessageStopEvent(L110-113),ParsedContentBlockStopEvent(L116-122),ParsedMessageStreamEvent(L125-140).src/anthropic/lib/streaming/_messages.py--MessageStreamclass (L33-53),accumulate_event()(L433-518) withcontent_block_stophandling at L499-502.src/anthropic/lib/_parse/_response.py--parse_text()(L16-20) used for full validation.