Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:Openai Openai agents python Stream Event Consumption

From Leeroopedia
Revision as of 17:16, 16 February 2026 by Admin (talk | contribs) (Auto-imported from principles/Openai_Openai_agents_python_Stream_Event_Consumption.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)

Overview

Stream Event Consumption is the principle of receiving and processing real-time events from a streamed agent execution. The stream_events() async iterator on RunResultStreaming provides a unified interface for receiving updates about agent activity as it happens, from raw LLM token deltas to high-level semantic milestones like tool calls and agent handoffs.

Core Theory

StreamEvent as a Union Type

The StreamEvent type is defined as a union of three distinct event classes:

StreamEvent = Union[RawResponsesStreamEvent, RunItemStreamEvent, AgentUpdatedStreamEvent]

Each event class carries a type discriminator field, making it straightforward to dispatch on event type in consuming code. This union design provides a single, typed channel for all streaming information, rather than requiring consumers to subscribe to multiple separate event streams.

RawResponsesStreamEvent

Purpose: Delivers raw LLM response chunks as they stream from the model.

These events carry the low-level data produced by the language model, including:

  • Text deltas -- Partial text tokens as they are generated, enabling character-by-character or word-by-word display.
  • Tool call argument streaming -- Incremental delivery of function call arguments as the model generates them.
  • Other response metadata -- Any additional response-level data emitted by the model.

This is the most granular event type and is essential for building responsive user interfaces that show text appearing in real time. The data field contains a TResponseStreamEvent from the underlying model provider.

RunItemStreamEvent

Purpose: Wraps higher-level semantic items that represent meaningful agent actions.

While raw response events provide token-level granularity, RunItemStreamEvent events represent completed semantic units of work. The name field identifies the specific action:

  • message_output_created -- A complete message output has been produced.
  • tool_called -- A tool invocation has been initiated.
  • tool_output -- A tool has returned its result.
  • handoff_requested -- The current agent has requested a handoff to another agent.
  • handoff_occured -- A handoff between agents has completed.
  • reasoning_item_created -- A reasoning step has been produced.
  • mcp_approval_requested -- An MCP tool requires user approval before execution.
  • mcp_approval_response -- A response to an MCP approval request has been provided.
  • mcp_list_tools -- MCP tools have been listed.

Each event carries an item field containing the corresponding RunItem object with full details about the action.

AgentUpdatedStreamEvent

Purpose: Signals when a handoff changes the currently active agent.

In multi-agent workflows, agents can hand off control to other agents. When this occurs, an AgentUpdatedStreamEvent is emitted with a new_agent field referencing the agent that has taken over. This allows consumers to:

  • Update UI labels showing which agent is responding.
  • Adjust event processing logic based on the active agent.
  • Log agent transitions for debugging or analytics.

Consumption Pattern

The standard pattern for consuming stream events uses an async for loop with type-based dispatch:

async for event in result.stream_events():
    if event.type == "raw_response_event":
        # Handle raw LLM deltas
        pass
    elif event.type == "run_item_stream_event":
        # Handle semantic run items
        pass
    elif event.type == "agent_updated_stream_event":
        # Handle agent handoffs
        pass

Consuming all events via the async for loop ensures the run completes. The loop drives the execution forward; if events are not consumed, the run may not progress to completion.

Design Rationale

Unified Stream

By combining all event types into a single async iterator, the SDK avoids the complexity of managing multiple event channels or callback registrations. Consumers receive a single, ordered sequence of events that reflects the true chronological order of agent activity.

Typed Discrimination

The use of literal type fields on each event class enables both runtime dispatch (via string comparison) and static type checking (via type narrowing). This makes consuming code both safe and ergonomic.

Separation of Granularity

The split between raw response events and run item events provides two levels of abstraction:

  • Fine-grained (raw events) -- For UIs that need token-by-token rendering.
  • Coarse-grained (run items) -- For logging, state tracking, or UIs that only care about completed actions.

Consumers can choose to handle only the event types relevant to their use case, ignoring others.

Source Reference

  • Source: src/agents/result.py, line 483 (stream_events)
  • Source: src/agents/stream_events.py, lines 1-63
  • Import: from agents import RunResultStreaming
  • Import: from agents.stream_events import RawResponsesStreamEvent, RunItemStreamEvent, AgentUpdatedStreamEvent, StreamEvent

See Also

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment