Implementation:Anthropics Anthropic sdk python MessageStream Get Final
| Knowledge Sources | |
|---|---|
| Domains | Streaming, LLM |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
This implementation covers the get_final_message() and get_final_text() convenience methods on MessageStream and AsyncMessageStream. These methods drain the stream iterator to completion and return the accumulated result, bridging streaming transport with batch-style consumption.
API Signatures
get_final_message()
# Sync
def get_final_message(self) -> ParsedMessage[ResponseFormatT]
# Async
async def get_final_message(self) -> ParsedMessage[ResponseFormatT]
get_final_text()
# Sync
def get_final_text(self) -> str
# Async
async def get_final_text(self) -> str
until_done()
# Sync
def until_done(self) -> None
# Async
async def until_done(self) -> None
Source: src/anthropic/lib/streaming/_messages.py lines 89-120 (sync), lines 237-268 (async)
Import
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
message = stream.get_final_message()
print(message.content[0].text)
print(f"Usage: {message.usage}")
Internal Behavior
get_final_message() (lines 89-95, sync)
def get_final_message(self) -> ParsedMessage[ResponseFormatT]:
"""Waits until the stream has been read to completion and returns
the accumulated `Message` object.
"""
self.until_done()
assert self.__final_message_snapshot is not None
return self.__final_message_snapshot
This method:
- Calls
until_done()to drain any remaining events from the stream. - Asserts that the snapshot is not
None(it would only beNoneif no events were ever received, which would indicate a protocol error). - Returns the
__final_message_snapshot, which is a completeParsedMessage[ResponseFormatT]containing all accumulated content blocks, usage statistics, stop reason, and optionally parsed structured output.
until_done() (lines 118-120, sync)
def until_done(self) -> None:
"""Blocks until the stream has been consumed"""
consume_sync_iterator(self)
consume_sync_iterator(self) iterates over all remaining events in the MessageStream, which triggers the __stream__ generator. As each event is consumed, accumulate_event() updates the snapshot as a side effect. After this call, the stream is exhausted and the snapshot is complete.
If the stream has already been fully iterated (e.g., via a prior for loop), this is effectively a no-op.
get_final_text() (lines 97-116, sync)
def get_final_text(self) -> str:
"""Returns all `text` content blocks concatenated together.
Will raise an error if no `text` content blocks were returned.
"""
message = self.get_final_message()
text_blocks: list[str] = []
for block in message.content:
if block.type == "text":
text_blocks.append(block.text)
if not text_blocks:
raise RuntimeError(
f".get_final_text() can only be called when the API returns a `text` content block.\n"
f"The API returned {','.join([b.type for b in message.content])} content block type(s) "
f"that you can access by calling get_final_message().content"
)
return "".join(text_blocks)
This method:
- Calls
get_final_message()to drain the stream and get the complete message. - Iterates over all content blocks, collecting the
.textfield from blocks of type"text". - If no text blocks were found (e.g., the response contained only tool use blocks), raises a
RuntimeErrorwith a descriptive message directing the caller to useget_final_message().contentinstead. - Returns the concatenated text from all text blocks joined together.
Async Variants (lines 237-268)
The async versions are structurally identical but use await:
async def get_final_message(self) -> ParsedMessage[ResponseFormatT]:
"""Waits until the stream has been read to completion and returns
the accumulated `Message` object.
"""
await self.until_done()
assert self.__final_message_snapshot is not None
return self.__final_message_snapshot
async def get_final_text(self) -> str:
"""Returns all `text` content blocks concatenated together.
Will raise an error if no `text` content blocks were returned.
"""
message = await self.get_final_message()
text_blocks: list[str] = []
for block in message.content:
if block.type == "text":
text_blocks.append(block.text)
if not text_blocks:
raise RuntimeError(
f".get_final_text() can only be called when the API returns a `text` content block.\n"
f"The API returned {','.join([b.type for b in message.content])} content block type(s) "
f"that you can access by calling get_final_message().content"
)
return "".join(text_blocks)
async def until_done(self) -> None:
"""Waits until the stream has been consumed"""
await consume_async_iterator(self)
Usage Examples
Getting the final message
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
message = stream.get_final_message()
print(message.content[0].text)
print(f"Stop reason: {message.stop_reason}")
print(f"Input tokens: {message.usage.input_tokens}")
print(f"Output tokens: {message.usage.output_tokens}")
Getting final text directly
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku"}]
) as stream:
text = stream.get_final_text()
print(text)
Combining streaming display with final retrieval
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Tell me a story"}]
) as stream:
# Display text in real time
for text in stream.text_stream:
print(text, end="", flush=True)
print()
# Then get the complete message for further processing
message = stream.get_final_message()
print(f"\nTotal tokens used: {message.usage.output_tokens}")
In this example, text_stream partially drains the stream (only text deltas are yielded, but all events are consumed by the underlying iterator). After the loop, get_final_message() calls until_done() which finds the iterator already exhausted, and returns the complete snapshot.
Async usage
import anthropic
async_client = anthropic.AsyncAnthropic()
async with async_client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
message = await stream.get_final_message()
print(message.content[0].text)
Return Types
ParsedMessage[ResponseFormatT]
The object returned by get_final_message() is a ParsedMessage with the following key fields:
.id: The message ID (e.g.,"msg_...").content: List ofParsedContentBlockobjects (text blocks, tool use blocks, thinking blocks).stop_reason: Why the model stopped (e.g.,"end_turn","max_tokens","tool_use").stop_sequence: The stop sequence that triggered the stop, if any.usage: Token usage statistics (.input_tokens,.output_tokens,.cache_creation_input_tokens,.cache_read_input_tokens).model: The model that generated the response
When output_format is specified, text content blocks also have a .parsed_output field containing the structured output parsed into the specified type.
Key Source Files
src/anthropic/lib/streaming/_messages.py--MessageStream.get_final_message()(L89-95),get_final_text()(L97-116),until_done()(L118-120)src/anthropic/lib/streaming/_messages.py--AsyncMessageStream.get_final_message()(L237-243),get_final_text()(L245-264),until_done()(L266-268)