Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Openai Openai python Realtime Server Events

From Leeroopedia
Knowledge Sources
Domains Realtime_Communication, Event_Processing
Last Updated 2026-02-15 00:00 GMT

Overview

Concrete typed event models and async iterator for consuming Realtime API server events provided by the OpenAI Python SDK.

Description

The AsyncRealtimeConnection implements __aiter__ to yield typed RealtimeServerEvent objects. This is a discriminated union of 30+ event types covering session lifecycle, conversation items, response generation, audio streaming, and errors. Each event has a type field for dispatch and event-specific attributes.

Usage

Iterate over the connection with async for event in connection. Match on event.type to dispatch to appropriate handlers. Use recv() for single-event consumption.

Code Reference

Source Location

  • Repository: openai-python
  • File: src/openai/resources/beta/realtime/realtime.py
  • Lines: L229-303 (AsyncRealtimeConnection.__aiter__, recv)
  • File: src/openai/types/realtime/realtime_server_event.py (RealtimeServerEvent union)

Signature

class AsyncRealtimeConnection:
    async def __aiter__(self) -> AsyncIterator[RealtimeServerEvent]:
        """Yields typed server events from the WebSocket."""
        ...

    async def recv(self) -> RealtimeServerEvent:
        """Receives a single server event."""
        ...

    async def recv_bytes(self) -> bytes:
        """Receives raw bytes (for audio data)."""
        ...
# Key event types in the RealtimeServerEvent union:
RealtimeServerEvent = Union[
    SessionCreatedEvent,          # session.created
    SessionUpdatedEvent,          # session.updated
    ConversationItemCreatedEvent, # conversation.item.created
    ResponseCreatedEvent,         # response.created
    ResponseDoneEvent,            # response.done
    ResponseTextDeltaEvent,       # response.text.delta
    ResponseTextDoneEvent,        # response.text.done
    ResponseAudioDeltaEvent,      # response.audio.delta
    ResponseAudioDoneEvent,       # response.audio.done
    ResponseFunctionCallArgumentsDeltaEvent,  # response.function_call_arguments.delta
    ResponseFunctionCallArgumentsDoneEvent,   # response.function_call_arguments.done
    InputAudioBufferSpeechStartedEvent,       # input_audio_buffer.speech_started
    InputAudioBufferSpeechStoppedEvent,       # input_audio_buffer.speech_stopped
    ErrorEvent,                   # error
    # ... and more
]

Import

from openai.types.realtime import RealtimeServerEvent
# Events consumed via: async for event in connection

I/O Contract

Inputs

Name Type Required Description
connection AsyncRealtimeConnection Yes Active WebSocket connection

Outputs

Name Type Description
event.type str Event discriminator (e.g., "response.text.delta")
ResponseTextDeltaEvent.delta str Incremental text content
ResponseAudioDeltaEvent.delta str Base64-encoded audio data
ResponseDoneEvent.response Response Complete response object
ResponseFunctionCallArgumentsDoneEvent event Complete function call arguments
ErrorEvent.error error Error details with type, code, message

Usage Examples

Text Event Loop

async with client.beta.realtime.connect(model="gpt-4o-realtime-preview") as connection:
    await connection.session.update(session={"modalities": ["text"]})
    await connection.conversation.item.create(item={
        "type": "message",
        "role": "user",
        "content": [{"type": "input_text", "text": "Hello!"}],
    })
    await connection.response.create()

    async for event in connection:
        if event.type == "response.text.delta":
            print(event.delta, end="")
        elif event.type == "response.text.done":
            print()  # Newline after complete text
        elif event.type == "response.done":
            print(f"Usage: {event.response.usage}")
            break
        elif event.type == "error":
            print(f"Error: {event.error.message}")
            break

Function Call Handling

import json

async for event in connection:
    if event.type == "response.function_call_arguments.done":
        args = json.loads(event.arguments)
        result = await call_tool(event.name, args)
        await connection.conversation.item.create(item={
            "type": "function_call_output",
            "call_id": event.call_id,
            "output": json.dumps(result),
        })
        await connection.response.create()

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment