Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Openai Openai agents python RunResultStreaming

From Leeroopedia
Revision as of 11:44, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Openai_Openai_agents_python_RunResultStreaming.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)

Overview

RunResultStreaming is a dataclass that represents the result of a streamed agent run. It extends RunResultBase with streaming-specific features, providing both real-time event access via stream_events() and final result inspection after the stream completes.

Class Definition

@dataclass
class RunResultStreaming(RunResultBase):
    current_agent: Agent[Any]
    _current_agent_output_schema: AgentOutputSchemaBase | None = None
    max_turns: int = DEFAULT_MAX_TURNS
    is_complete: bool = False

    # Inherited from RunResultBase:
    # input, new_items, raw_responses, final_output,
    # input_guardrail_results, output_guardrail_results, context_wrapper

    async def stream_events(self) -> AsyncIterator[StreamEvent]: ...
    def to_state(self) -> RunState[Any]: ...

Source

  • File: src/agents/result.py, lines 320-654
  • Import: from agents import RunResultStreaming

Properties

Streaming-Specific Properties

Property Type Description
current_agent Agent[Any] The agent that is currently active during the stream. Updates when handoffs occur.
_current_agent_output_schema None Internal field tracking the output schema of the current agent. Used for response parsing.
max_turns int Maximum number of turns allowed for this run. Defaults to DEFAULT_MAX_TURNS.
is_complete bool Indicates whether the stream has finished. False during streaming, True after all events have been consumed.

Inherited Properties (from RunResultBase)

Property Type Description
input list[TResponseInputItem] The original input provided to the run.
new_items list[RunItem] All run items produced during execution (messages, tool calls, handoffs, etc.).
raw_responses list[ModelResponse] The complete raw LLM responses collected during the run.
final_output Any The final text or structured output from the agent. Available after is_complete is True.
input_guardrail_results list[InputGuardrailResult] Results from input guardrail evaluations.
output_guardrail_results list[OutputGuardrailResult] Results from output guardrail evaluations.
context_wrapper RunContextWrapper[Any] The context wrapper used during the run.

Methods

stream_events()

async def stream_events(self) -> AsyncIterator[StreamEvent]:

Returns an async iterator that yields StreamEvent objects as the agent run progresses. The three event types are:

  • RawResponsesStreamEvent -- Raw LLM response chunks (text deltas, tool call arguments).
  • RunItemStreamEvent -- Semantic run items (messages, tool calls, handoffs).
  • AgentUpdatedStreamEvent -- Agent handoff notifications.

Important: All events must be consumed for the run to complete. The is_complete flag is set to True after the iterator is exhausted.

to_state()

def to_state(self) -> RunState[Any]:

Serializes the current run state into a RunState object. This enables resuming an interrupted or paused run by passing the RunState as the input parameter to a subsequent Runner.run() or Runner.run_streamed() call.

Example

result = Runner.run_streamed(agent, "Hello")

# Stream events first
async for event in result.stream_events():
    pass  # Process events

# Then access final result
assert result.is_complete
print(result.final_output)
print(result.current_agent.name)
print(len(result.new_items))

Explanation

  1. Runner.run_streamed() returns a RunResultStreaming object immediately.
  2. The async for loop consumes all stream events. In this example, events are not processed, but in practice each event would be handled according to its type.
  3. After the loop completes, is_complete is True.
  4. final_output contains the agent's complete output.
  5. current_agent.name identifies the last active agent.
  6. new_items contains all run items produced during the run.

State Resumption Example

# First run - might be interrupted
result1 = Runner.run_streamed(agent, "Start a complex task")
async for event in result1.stream_events():
    if should_pause(event):
        break

# Save state for later resumption
saved_state = result1.to_state()

# Resume from saved state
result2 = Runner.run_streamed(agent, saved_state)
async for event in result2.stream_events():
    handle(event)

print(result2.final_output)

Lifecycle

The lifecycle of a RunResultStreaming object follows this sequence:

  1. Creation -- Returned by Runner.run_streamed(). is_complete is False.
  2. Streaming -- Events are consumed via stream_events(). current_agent may change during handoffs. new_items and raw_responses are populated incrementally.
  3. Completion -- After all events are consumed, is_complete becomes True. final_output is now available.
  4. Inspection -- All inherited RunResultBase properties are accessible with their final values.
  5. Resumption (optional) -- to_state() can be called to serialize state for later resumption.

See Also

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment