Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Anthropics Anthropic sdk python MessageStream Get Final

From Leeroopedia
Knowledge Sources
Domains Streaming, LLM
Last Updated 2026-02-15 00:00 GMT

Overview

This implementation covers the get_final_message() and get_final_text() convenience methods on MessageStream and AsyncMessageStream. These methods drain the stream iterator to completion and return the accumulated result, bridging streaming transport with batch-style consumption.

API Signatures

get_final_message()

# Sync
def get_final_message(self) -> ParsedMessage[ResponseFormatT]

# Async
async def get_final_message(self) -> ParsedMessage[ResponseFormatT]

get_final_text()

# Sync
def get_final_text(self) -> str

# Async
async def get_final_text(self) -> str

until_done()

# Sync
def until_done(self) -> None

# Async
async def until_done(self) -> None

Source: src/anthropic/lib/streaming/_messages.py lines 89-120 (sync), lines 237-268 (async)

Import

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    message = stream.get_final_message()
    print(message.content[0].text)
    print(f"Usage: {message.usage}")

Internal Behavior

get_final_message() (lines 89-95, sync)

def get_final_message(self) -> ParsedMessage[ResponseFormatT]:
    """Waits until the stream has been read to completion and returns
    the accumulated `Message` object.
    """
    self.until_done()
    assert self.__final_message_snapshot is not None
    return self.__final_message_snapshot

This method:

  1. Calls until_done() to drain any remaining events from the stream.
  2. Asserts that the snapshot is not None (it would only be None if no events were ever received, which would indicate a protocol error).
  3. Returns the __final_message_snapshot, which is a complete ParsedMessage[ResponseFormatT] containing all accumulated content blocks, usage statistics, stop reason, and optionally parsed structured output.

until_done() (lines 118-120, sync)

def until_done(self) -> None:
    """Blocks until the stream has been consumed"""
    consume_sync_iterator(self)

consume_sync_iterator(self) iterates over all remaining events in the MessageStream, which triggers the __stream__ generator. As each event is consumed, accumulate_event() updates the snapshot as a side effect. After this call, the stream is exhausted and the snapshot is complete.

If the stream has already been fully iterated (e.g., via a prior for loop), this is effectively a no-op.

get_final_text() (lines 97-116, sync)

def get_final_text(self) -> str:
    """Returns all `text` content blocks concatenated together.

    Will raise an error if no `text` content blocks were returned.
    """
    message = self.get_final_message()
    text_blocks: list[str] = []
    for block in message.content:
        if block.type == "text":
            text_blocks.append(block.text)

    if not text_blocks:
        raise RuntimeError(
            f".get_final_text() can only be called when the API returns a `text` content block.\n"
            f"The API returned {','.join([b.type for b in message.content])} content block type(s) "
            f"that you can access by calling get_final_message().content"
        )

    return "".join(text_blocks)

This method:

  1. Calls get_final_message() to drain the stream and get the complete message.
  2. Iterates over all content blocks, collecting the .text field from blocks of type "text".
  3. If no text blocks were found (e.g., the response contained only tool use blocks), raises a RuntimeError with a descriptive message directing the caller to use get_final_message().content instead.
  4. Returns the concatenated text from all text blocks joined together.

Async Variants (lines 237-268)

The async versions are structurally identical but use await:

async def get_final_message(self) -> ParsedMessage[ResponseFormatT]:
    """Waits until the stream has been read to completion and returns
    the accumulated `Message` object.
    """
    await self.until_done()
    assert self.__final_message_snapshot is not None
    return self.__final_message_snapshot

async def get_final_text(self) -> str:
    """Returns all `text` content blocks concatenated together.

    Will raise an error if no `text` content blocks were returned.
    """
    message = await self.get_final_message()
    text_blocks: list[str] = []
    for block in message.content:
        if block.type == "text":
            text_blocks.append(block.text)

    if not text_blocks:
        raise RuntimeError(
            f".get_final_text() can only be called when the API returns a `text` content block.\n"
            f"The API returned {','.join([b.type for b in message.content])} content block type(s) "
            f"that you can access by calling get_final_message().content"
        )

    return "".join(text_blocks)

async def until_done(self) -> None:
    """Waits until the stream has been consumed"""
    await consume_async_iterator(self)

Usage Examples

Getting the final message

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    message = stream.get_final_message()
    print(message.content[0].text)
    print(f"Stop reason: {message.stop_reason}")
    print(f"Input tokens: {message.usage.input_tokens}")
    print(f"Output tokens: {message.usage.output_tokens}")

Getting final text directly

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku"}]
) as stream:
    text = stream.get_final_text()
    print(text)

Combining streaming display with final retrieval

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Tell me a story"}]
) as stream:
    # Display text in real time
    for text in stream.text_stream:
        print(text, end="", flush=True)
    print()

    # Then get the complete message for further processing
    message = stream.get_final_message()
    print(f"\nTotal tokens used: {message.usage.output_tokens}")

In this example, text_stream partially drains the stream (only text deltas are yielded, but all events are consumed by the underlying iterator). After the loop, get_final_message() calls until_done() which finds the iterator already exhausted, and returns the complete snapshot.

Async usage

import anthropic

async_client = anthropic.AsyncAnthropic()

async with async_client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    message = await stream.get_final_message()
    print(message.content[0].text)

Return Types

ParsedMessage[ResponseFormatT]

The object returned by get_final_message() is a ParsedMessage with the following key fields:

  • .id: The message ID (e.g., "msg_...")
  • .content: List of ParsedContentBlock objects (text blocks, tool use blocks, thinking blocks)
  • .stop_reason: Why the model stopped (e.g., "end_turn", "max_tokens", "tool_use")
  • .stop_sequence: The stop sequence that triggered the stop, if any
  • .usage: Token usage statistics (.input_tokens, .output_tokens, .cache_creation_input_tokens, .cache_read_input_tokens)
  • .model: The model that generated the response

When output_format is specified, text content blocks also have a .parsed_output field containing the structured output parsed into the specified type.

Key Source Files

  • src/anthropic/lib/streaming/_messages.py -- MessageStream.get_final_message() (L89-95), get_final_text() (L97-116), until_done() (L118-120)
  • src/anthropic/lib/streaming/_messages.py -- AsyncMessageStream.get_final_message() (L237-243), get_final_text() (L245-264), until_done() (L266-268)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment