Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Anthropics Anthropic sdk python Stream Parsed Snapshot

From Leeroopedia
Knowledge Sources
Domains Structured_Output, LLM, Data_Extraction
Last Updated 2026-02-15 00:00 GMT

Overview

The Stream Parsed Snapshot implementation provides the machinery for incremental structured output parsing during streaming. It combines the TextEvent.parsed_snapshot() method for partial JSON decoding with the accumulate_event() function's content_block_stop handler for full validation on completion. Together, these enable a two-phase parsing strategy: lenient partial dictionaries during streaming and strict Pydantic validation when the content block finishes.

API Surface

TextEvent.parsed_snapshot

from anthropic.lib.streaming._types import TextEvent

Source: src/anthropic/lib/streaming/_types.py:L23-33

class TextEvent(BaseModel):
    type: Literal["text"]

    text: str
    """The text delta"""

    snapshot: str
    """The entire accumulated text"""

    def parsed_snapshot(self) -> Dict[str, Any]:
        return cast(
            Dict[str, Any],
            jiter.from_json(self.snapshot.encode("utf-8"), partial_mode="trailing-strings"),
        )

Returns a Dict[str, Any] representing the best-effort parse of the accumulated JSON text so far. Uses jiter.from_json() with partial_mode="trailing-strings" to handle incomplete JSON gracefully.

Messages.stream (Entry Point)

from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[...],
    output_format=MovieReview,
) as stream:
    ...

Source: src/anthropic/resources/messages/messages.py:L990-1116

The stream() method accepts output_format (a Pydantic BaseModel subclass), performs schema derivation and transformation (same as parse()), and returns a MessageStreamManager that yields ParsedMessageStreamEvent instances. The output_format type is forwarded to the MessageStream for use during event accumulation.

content_block_stop Handling

Source: src/anthropic/lib/streaming/_messages.py:L499-502

elif event.type == "content_block_stop":
    content_block = current_snapshot.content[event.index]
    if content_block.type == "text" and is_given(output_format):
        content_block.parsed_output = parse_text(content_block.text, output_format)

When a content_block_stop event arrives in the accumulate_event() function, the SDK performs full Pydantic validation on the complete text of the content block, storing the validated instance as parsed_output.

stream.get_final_message

After the stream completes, stream.get_final_message() returns the accumulated ParsedMessage[ResponseFormatT] snapshot, which contains fully validated parsed_output on its text content blocks.

Detailed Behavior

Phase 1: Partial Parsing During Streaming

As each text_delta event arrives, the accumulate_event() function appends the delta text to the snapshot's text content block:

if event.delta.type == "text_delta":
    if content.type == "text":
        content.text += event.delta.text

The SDK then fires a TextEvent that contains both the delta text and the accumulated snapshot. When the developer calls event.parsed_snapshot(), jiter parses the accumulated text:

jiter.from_json(self.snapshot.encode("utf-8"), partial_mode="trailing-strings")

jiter partial_mode="trailing-strings" handles:

  • Incomplete objects: {"title": "Inception", "rati parses to {"title": "Inception", "rati": ""} (trailing key with empty value) or as much as is parseable.
  • Unterminated strings: {"title": "Ince parses with the partial string value "Ince".
  • Unclosed arrays: {"pros": ["Great visuals", "Com parses to {"pros": ["Great visuals", "Com"]}.

The return type is Dict[str, Any], not the target Pydantic model, since partial data typically cannot satisfy all required fields.

Phase 2: Full Validation on content_block_stop

When the model finishes generating a content block, the API sends a content_block_stop event. At this point, the accumulated text represents the complete JSON output. The accumulate_event() function calls parse_text():

content_block.parsed_output = parse_text(content_block.text, output_format)

Which internally does:

adapted_type: TypeAdapter[ResponseFormatT] = TypeAdapter(output_format)
return adapted_type.validate_json(text)

This performs strict JSON parsing and Pydantic validation, producing a fully validated ResponseFormatT instance. If validation fails, a ValidationError is raised.

Event Types in Parsed Streams

The streaming system uses specialized event types for parsed streams:

Source: src/anthropic/lib/streaming/_types.py:L110-140

class ParsedMessageStopEvent(RawMessageStopEvent, GenericModel, Generic[ResponseFormatT]):
    type: Literal["message_stop"]
    message: ParsedMessage[ResponseFormatT]

class ParsedContentBlockStopEvent(RawContentBlockStopEvent, GenericModel, Generic[ResponseFormatT]):
    type: Literal["content_block_stop"]
    content_block: ParsedContentBlock[ResponseFormatT]

ParsedMessageStreamEvent = Annotated[
    Union[
        TextEvent,
        CitationEvent,
        ThinkingEvent,
        SignatureEvent,
        InputJsonEvent,
        RawMessageStartEvent,
        RawMessageDeltaEvent,
        ParsedMessageStopEvent[ResponseFormatT],
        RawContentBlockStartEvent,
        RawContentBlockDeltaEvent,
        ParsedContentBlockStopEvent[ResponseFormatT],
    ],
    PropertyInfo(discriminator="type"),
]

The ParsedMessageStopEvent carries a ParsedMessage[ResponseFormatT] and the ParsedContentBlockStopEvent carries a ParsedContentBlock[ResponseFormatT], both with validated parsed_output.

MessageStream Class

Source: src/anthropic/lib/streaming/_messages.py:L33-53

The MessageStream class is generic over ResponseFormatT and stores the output_format for use during event accumulation:

class MessageStream(Generic[ResponseFormatT]):
    def __init__(
        self,
        raw_stream: Stream[RawMessageStreamEvent],
        output_format: ResponseFormatT | NotGiven,
    ) -> None:
        self._raw_stream = raw_stream
        self.text_stream = self.__stream_text__()
        self._iterator = self.__stream__()
        self.__final_message_snapshot: ParsedMessage[ResponseFormatT] | None = None
        self.__output_format = output_format

Usage Example

import anthropic
from pydantic import BaseModel

class MovieReview(BaseModel):
    title: str
    rating: float
    summary: str
    pros: list[str]
    cons: list[str]

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Review the movie Inception"}],
    output_format=MovieReview,
) as stream:
    for event in stream:
        if event.type == "text":
            # Partial Dict[str, Any] via jiter
            snapshot = event.parsed_snapshot()
            if snapshot:
                print(f"Partial: {snapshot}")

    # Fully validated MovieReview
    final = stream.get_final_message()
    review = final.parsed_output
    print(f"{review.title}: {review.rating}/10")
    print(f"Pros: {', '.join(review.pros)}")

Dependencies

  • jiter (from_json with partial_mode): For partial JSON decoding during streaming. This is a Rust-backed JSON parser with Python bindings.
  • pydantic (BaseModel, TypeAdapter, validate_json): For full validation on stream completion.
  • anthropic.lib._parse._response (parse_text, ResponseFormatT): For the validation function and generic type variable.
  • anthropic.types.parsed_message (ParsedMessage, ParsedContentBlock): For the typed response containers.

Key Source Files

  • src/anthropic/lib/streaming/_types.py -- TextEvent (L23-33) with parsed_snapshot(), ParsedMessageStopEvent (L110-113), ParsedContentBlockStopEvent (L116-122), ParsedMessageStreamEvent (L125-140).
  • src/anthropic/lib/streaming/_messages.py -- MessageStream class (L33-53), accumulate_event() (L433-518) with content_block_stop handling at L499-502.
  • src/anthropic/lib/_parse/_response.py -- parse_text() (L16-20) used for full validation.

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment