Implementation:Openai Openai python Realtime Server Events
| Knowledge Sources | |
|---|---|
| Domains | Realtime_Communication, Event_Processing |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
Concrete typed event models and async iterator for consuming Realtime API server events provided by the OpenAI Python SDK.
Description
The AsyncRealtimeConnection implements __aiter__ to yield typed RealtimeServerEvent objects. This is a discriminated union of 30+ event types covering session lifecycle, conversation items, response generation, audio streaming, and errors. Each event has a type field for dispatch and event-specific attributes.
Usage
Iterate over the connection with async for event in connection. Match on event.type to dispatch to appropriate handlers. Use recv() for single-event consumption.
Code Reference
Source Location
- Repository: openai-python
- File: src/openai/resources/beta/realtime/realtime.py
- Lines: L229-303 (AsyncRealtimeConnection.__aiter__, recv)
- File: src/openai/types/realtime/realtime_server_event.py (RealtimeServerEvent union)
Signature
class AsyncRealtimeConnection:
async def __aiter__(self) -> AsyncIterator[RealtimeServerEvent]:
"""Yields typed server events from the WebSocket."""
...
async def recv(self) -> RealtimeServerEvent:
"""Receives a single server event."""
...
async def recv_bytes(self) -> bytes:
"""Receives raw bytes (for audio data)."""
...
# Key event types in the RealtimeServerEvent union:
RealtimeServerEvent = Union[
SessionCreatedEvent, # session.created
SessionUpdatedEvent, # session.updated
ConversationItemCreatedEvent, # conversation.item.created
ResponseCreatedEvent, # response.created
ResponseDoneEvent, # response.done
ResponseTextDeltaEvent, # response.text.delta
ResponseTextDoneEvent, # response.text.done
ResponseAudioDeltaEvent, # response.audio.delta
ResponseAudioDoneEvent, # response.audio.done
ResponseFunctionCallArgumentsDeltaEvent, # response.function_call_arguments.delta
ResponseFunctionCallArgumentsDoneEvent, # response.function_call_arguments.done
InputAudioBufferSpeechStartedEvent, # input_audio_buffer.speech_started
InputAudioBufferSpeechStoppedEvent, # input_audio_buffer.speech_stopped
ErrorEvent, # error
# ... and more
]
Import
from openai.types.realtime import RealtimeServerEvent
# Events consumed via: async for event in connection
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| connection | AsyncRealtimeConnection | Yes | Active WebSocket connection |
Outputs
| Name | Type | Description |
|---|---|---|
| event.type | str | Event discriminator (e.g., "response.text.delta") |
| ResponseTextDeltaEvent.delta | str | Incremental text content |
| ResponseAudioDeltaEvent.delta | str | Base64-encoded audio data |
| ResponseDoneEvent.response | Response | Complete response object |
| ResponseFunctionCallArgumentsDoneEvent | event | Complete function call arguments |
| ErrorEvent.error | error | Error details with type, code, message |
Usage Examples
Text Event Loop
async with client.beta.realtime.connect(model="gpt-4o-realtime-preview") as connection:
await connection.session.update(session={"modalities": ["text"]})
await connection.conversation.item.create(item={
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Hello!"}],
})
await connection.response.create()
async for event in connection:
if event.type == "response.text.delta":
print(event.delta, end="")
elif event.type == "response.text.done":
print() # Newline after complete text
elif event.type == "response.done":
print(f"Usage: {event.response.usage}")
break
elif event.type == "error":
print(f"Error: {event.error.message}")
break
Function Call Handling
import json
async for event in connection:
if event.type == "response.function_call_arguments.done":
args = json.loads(event.arguments)
result = await call_tool(event.name, args)
await connection.conversation.item.create(item={
"type": "function_call_output",
"call_id": event.call_id,
"output": json.dumps(result),
})
await connection.response.create()