Workflow:Openai Openai agents python Streaming Agent Execution
| Knowledge Sources | |
|---|---|
| Domains | AI_Agents, Streaming, LLMs |
| Last Updated | 2026-02-11 14:00 GMT |
Overview
End-to-end process for executing an AI agent with real-time streaming of text deltas, tool calls, and run items as they are generated.
Description
This workflow demonstrates how to use Runner.run_streamed() instead of Runner.run() to receive incremental updates as the model generates its response. The streaming interface provides three levels of granularity: raw response events (individual text deltas from the model API), run item stream events (high-level items like messages, tool calls, and tool outputs), and agent updated events (when a handoff transfers control). This enables responsive UIs, real-time displays, and progressive rendering of agent output.
Usage
Execute this workflow when building interactive applications that need to display agent responses as they are generated, rather than waiting for the complete response. This is essential for chat interfaces, real-time dashboards, and any user-facing application where perceived latency matters.
Execution Steps
Step 1: Agent Configuration
Define an Agent with the same configuration as a non-streaming agent. The agent definition is identical whether using streaming or non-streaming execution. Optionally attach tools, handoffs, or guardrails as needed.
Key considerations:
- Agent configuration is the same for streaming and non-streaming
- All tool types (function, hosted, MCP) work with streaming
- Handoffs are supported during streaming execution
Step 2: Initiate Streamed Run
Call Runner.run_streamed() with the agent and input. This returns a RunResultStreaming object immediately (it is not awaited). The streaming begins in the background, and events are buffered until consumed.
Key considerations:
- run_streamed() is NOT async and returns synchronously
- The returned RunResultStreaming object provides the stream_events() async iterator
- The run executes in the background while you consume events
Step 3: Consume Stream Events
Iterate over result.stream_events() using an async for loop. Each event has a type field that determines the event kind and a corresponding data or item payload.
Event types:
- raw_response_event: Low-level model API events (text deltas, tool call deltas)
- run_item_stream_event: High-level run items (message outputs, tool calls, tool outputs)
- agent_updated_stream_event: Agent changed due to handoff
Key considerations:
- For simple text streaming, filter for raw_response_event with ResponseTextDeltaEvent data
- For structured item tracking, use run_item_stream_event
- agent_updated_stream_event fires when a handoff changes the active agent
Step 4: Access Final Result
After the stream completes (the async for loop ends), access result.final_output, result.new_items, and other RunResult properties. The streaming result object provides the same interface as a non-streaming result once the run completes.
Key considerations:
- result.final_output is available after the stream completes
- result.new_items contains all items generated during the run
- Cancellation is supported by breaking out of the stream_events loop