Implementation:Openai Openai agents python RunResultStreaming
Overview
RunResultStreaming is a dataclass that represents the result of a streamed agent run. It extends RunResultBase with streaming-specific features, providing both real-time event access via stream_events() and final result inspection after the stream completes.
Class Definition
@dataclass
class RunResultStreaming(RunResultBase):
current_agent: Agent[Any]
_current_agent_output_schema: AgentOutputSchemaBase | None = None
max_turns: int = DEFAULT_MAX_TURNS
is_complete: bool = False
# Inherited from RunResultBase:
# input, new_items, raw_responses, final_output,
# input_guardrail_results, output_guardrail_results, context_wrapper
async def stream_events(self) -> AsyncIterator[StreamEvent]: ...
def to_state(self) -> RunState[Any]: ...
Source
- File:
src/agents/result.py, lines 320-654 - Import:
from agents import RunResultStreaming
Properties
Streaming-Specific Properties
| Property | Type | Description |
|---|---|---|
current_agent |
Agent[Any] |
The agent that is currently active during the stream. Updates when handoffs occur. |
_current_agent_output_schema |
None | Internal field tracking the output schema of the current agent. Used for response parsing. |
max_turns |
int |
Maximum number of turns allowed for this run. Defaults to DEFAULT_MAX_TURNS.
|
is_complete |
bool |
Indicates whether the stream has finished. False during streaming, True after all events have been consumed.
|
Inherited Properties (from RunResultBase)
| Property | Type | Description |
|---|---|---|
input |
list[TResponseInputItem] | The original input provided to the run. |
new_items |
list[RunItem] |
All run items produced during execution (messages, tool calls, handoffs, etc.). |
raw_responses |
list[ModelResponse] |
The complete raw LLM responses collected during the run. |
final_output |
Any |
The final text or structured output from the agent. Available after is_complete is True.
|
input_guardrail_results |
list[InputGuardrailResult] |
Results from input guardrail evaluations. |
output_guardrail_results |
list[OutputGuardrailResult] |
Results from output guardrail evaluations. |
context_wrapper |
RunContextWrapper[Any] |
The context wrapper used during the run. |
Methods
stream_events()
async def stream_events(self) -> AsyncIterator[StreamEvent]:
Returns an async iterator that yields StreamEvent objects as the agent run progresses. The three event types are:
RawResponsesStreamEvent-- Raw LLM response chunks (text deltas, tool call arguments).RunItemStreamEvent-- Semantic run items (messages, tool calls, handoffs).AgentUpdatedStreamEvent-- Agent handoff notifications.
Important: All events must be consumed for the run to complete. The is_complete flag is set to True after the iterator is exhausted.
to_state()
def to_state(self) -> RunState[Any]:
Serializes the current run state into a RunState object. This enables resuming an interrupted or paused run by passing the RunState as the input parameter to a subsequent Runner.run() or Runner.run_streamed() call.
Example
result = Runner.run_streamed(agent, "Hello")
# Stream events first
async for event in result.stream_events():
pass # Process events
# Then access final result
assert result.is_complete
print(result.final_output)
print(result.current_agent.name)
print(len(result.new_items))
Explanation
Runner.run_streamed()returns aRunResultStreamingobject immediately.- The
async forloop consumes all stream events. In this example, events are not processed, but in practice each event would be handled according to its type. - After the loop completes,
is_completeisTrue. final_outputcontains the agent's complete output.current_agent.nameidentifies the last active agent.new_itemscontains all run items produced during the run.
State Resumption Example
# First run - might be interrupted
result1 = Runner.run_streamed(agent, "Start a complex task")
async for event in result1.stream_events():
if should_pause(event):
break
# Save state for later resumption
saved_state = result1.to_state()
# Resume from saved state
result2 = Runner.run_streamed(agent, saved_state)
async for event in result2.stream_events():
handle(event)
print(result2.final_output)
Lifecycle
The lifecycle of a RunResultStreaming object follows this sequence:
- Creation -- Returned by
Runner.run_streamed().is_completeisFalse. - Streaming -- Events are consumed via
stream_events().current_agentmay change during handoffs.new_itemsandraw_responsesare populated incrementally. - Completion -- After all events are consumed,
is_completebecomesTrue.final_outputis now available. - Inspection -- All inherited
RunResultBaseproperties are accessible with their final values. - Resumption (optional) --
to_state()can be called to serialize state for later resumption.
See Also
- Streamed Result Access Principle -- The theory behind streamed result access.
- Runner.run_streamed Implementation -- The method that creates RunResultStreaming objects.
- Stream Events Implementation -- The event types yielded during streaming.