Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Openai Openai agents python Streaming Agent Execution

From Leeroopedia
Revision as of 11:03, 16 February 2026 by Admin (talk | contribs) (Auto-imported from workflows/Openai_Openai_agents_python_Streaming_Agent_Execution.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains AI_Agents, Streaming, LLMs
Last Updated 2026-02-11 14:00 GMT

Overview

End-to-end process for executing an AI agent with real-time streaming of text deltas, tool calls, and run items as they are generated.

Description

This workflow demonstrates how to use Runner.run_streamed() instead of Runner.run() to receive incremental updates as the model generates its response. The streaming interface provides three levels of granularity: raw response events (individual text deltas from the model API), run item stream events (high-level items like messages, tool calls, and tool outputs), and agent updated events (when a handoff transfers control). This enables responsive UIs, real-time displays, and progressive rendering of agent output.

Usage

Execute this workflow when building interactive applications that need to display agent responses as they are generated, rather than waiting for the complete response. This is essential for chat interfaces, real-time dashboards, and any user-facing application where perceived latency matters.

Execution Steps

Step 1: Agent Configuration

Define an Agent with the same configuration as a non-streaming agent. The agent definition is identical whether using streaming or non-streaming execution. Optionally attach tools, handoffs, or guardrails as needed.

Key considerations:

  • Agent configuration is the same for streaming and non-streaming
  • All tool types (function, hosted, MCP) work with streaming
  • Handoffs are supported during streaming execution

Step 2: Initiate Streamed Run

Call Runner.run_streamed() with the agent and input. This returns a RunResultStreaming object immediately (it is not awaited). The streaming begins in the background, and events are buffered until consumed.

Key considerations:

  • run_streamed() is NOT async and returns synchronously
  • The returned RunResultStreaming object provides the stream_events() async iterator
  • The run executes in the background while you consume events

Step 3: Consume Stream Events

Iterate over result.stream_events() using an async for loop. Each event has a type field that determines the event kind and a corresponding data or item payload.

Event types:

  • raw_response_event: Low-level model API events (text deltas, tool call deltas)
  • run_item_stream_event: High-level run items (message outputs, tool calls, tool outputs)
  • agent_updated_stream_event: Agent changed due to handoff

Key considerations:

  • For simple text streaming, filter for raw_response_event with ResponseTextDeltaEvent data
  • For structured item tracking, use run_item_stream_event
  • agent_updated_stream_event fires when a handoff changes the active agent

Step 4: Access Final Result

After the stream completes (the async for loop ends), access result.final_output, result.new_items, and other RunResult properties. The streaming result object provides the same interface as a non-streaming result once the run completes.

Key considerations:

  • result.final_output is available after the stream completes
  • result.new_items contains all items generated during the run
  • Cancellation is supported by breaking out of the stream_events loop

Execution Diagram

GitHub URL

Workflow Repository