Implementation:Elevenlabs Elevenlabs python RealtimeConnection
| Knowledge Sources | |
|---|---|
| Domains | Speech_Recognition, Event_Driven_Architecture |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
Concrete tool for managing real-time transcription WebSocket connections with event-driven callbacks provided by the elevenlabs-python SDK.
Description
The RealtimeConnection class wraps an async WebSocket connection and provides an event-driven interface for consuming transcription results. It manages:
- Event registration: .on(event, callback) to register handlers for specific event types
- Audio sending: .send(data) to send base64-encoded audio chunks
- Manual commit: .commit() to finalize a transcription segment
- Cleanup: .close() to terminate the connection and any ffmpeg processes
The class includes the RealtimeEvents enum defining all possible event types. A background asyncio task handles incoming WebSocket messages, parsing JSON and dispatching to registered handlers.
Usage
Obtain a RealtimeConnection from ScribeRealtime.connect(). Register event handlers, send audio data, and close when done. The connection object is not reusable after closing.
Code Reference
Source Location
- Repository: elevenlabs-python
- File: src/elevenlabs/realtime/connection.py
- Lines: L11-247
Signature
class RealtimeEvents(str, Enum):
OPEN = "open"
CLOSE = "close"
SESSION_STARTED = "session_started"
PARTIAL_TRANSCRIPT = "partial_transcript"
COMMITTED_TRANSCRIPT = "committed_transcript"
COMMITTED_TRANSCRIPT_WITH_TIMESTAMPS = "committed_transcript_with_timestamps"
ERROR = "error"
AUTH_ERROR = "auth_error"
QUOTA_EXCEEDED = "quota_exceeded"
COMMIT_THROTTLED = "commit_throttled"
TRANSCRIBER_ERROR = "transcriber_error"
UNACCEPTED_TERMS_ERROR = "unaccepted_terms_error"
RATE_LIMITED = "rate_limited"
INPUT_ERROR = "input_error"
QUEUE_OVERFLOW = "queue_overflow"
RESOURCE_EXHAUSTED = "resource_exhausted"
SESSION_TIME_LIMIT_EXCEEDED = "session_time_limit_exceeded"
CHUNK_SIZE_EXCEEDED = "chunk_size_exceeded"
INSUFFICIENT_AUDIO_ACTIVITY = "insufficient_audio_activity"
class RealtimeConnection:
def __init__(
self,
websocket: "ClientConnection",
current_sample_rate: int,
ffmpeg_process: typing.Optional[subprocess.Popen] = None,
):
"""
Args:
websocket: Async WebSocket connection.
current_sample_rate: Audio sample rate in Hz.
ffmpeg_process: Optional ffmpeg subprocess (for URL mode).
"""
def on(self, event: str, callback: typing.Callable) -> None:
"""Register an event handler.
Args:
event: Event type (from RealtimeEvents enum).
callback: Handler function called with event data dict.
"""
async def send(self, data: typing.Dict[str, typing.Any]) -> None:
"""Send audio chunk to server.
Args:
data: Dict with 'audio_base_64' key (base64-encoded audio).
"""
async def commit(self) -> None:
"""Commit current segment (for MANUAL commit strategy)."""
async def close(self) -> None:
"""Close connection and clean up resources."""
Import
from elevenlabs.realtime.connection import RealtimeConnection, RealtimeEvents
I/O Contract
Inputs (on)
| Name | Type | Required | Description |
|---|---|---|---|
| event | str (RealtimeEvents) | Yes | Event type to listen for |
| callback | Callable | Yes | Handler function (receives event data dict) |
Inputs (send)
| Name | Type | Required | Description |
|---|---|---|---|
| data | Dict[str, Any] | Yes | Must contain 'audio_base_64' (base64-encoded audio). Optional: 'previous_text' for context. |
Outputs (via callbacks)
| Event | Callback Data | Description |
|---|---|---|
| PARTIAL_TRANSCRIPT | {"transcript": str, ...} | Interim transcription result (may change) |
| COMMITTED_TRANSCRIPT | {"transcript": str, ...} | Final transcription result |
| COMMITTED_TRANSCRIPT_WITH_TIMESTAMPS | {"transcript": str, "words": [...], ...} | Final result with word-level timing |
| ERROR | {"error": str, ...} | Generic error (also emitted for specific error types) |
| OPEN | (no args) | Connection established |
| CLOSE | (no args) | Connection closed |
Usage Examples
Event Registration and Processing
from elevenlabs.realtime.connection import RealtimeEvents
# Assume 'connection' obtained from ScribeRealtime.connect()
# Register partial transcript handler (live captions)
connection.on(
RealtimeEvents.PARTIAL_TRANSCRIPT,
lambda data: print(f"\rPartial: {data.get('transcript', '')}", end="")
)
# Register committed transcript handler (final results)
connection.on(
RealtimeEvents.COMMITTED_TRANSCRIPT,
lambda data: print(f"\nFinal: {data.get('transcript', '')}")
)
# Register error handler
connection.on(
RealtimeEvents.ERROR,
lambda data: print(f"Error: {data}")
)
# Register session lifecycle
connection.on(RealtimeEvents.OPEN, lambda: print("Connected"))
connection.on(RealtimeEvents.CLOSE, lambda: print("Disconnected"))
Manual Commit Flow
import asyncio
import base64
from elevenlabs.realtime.connection import RealtimeEvents
async def manual_commit_example(connection):
"""Send audio and commit manually."""
transcripts = []
connection.on(
RealtimeEvents.COMMITTED_TRANSCRIPT,
lambda data: transcripts.append(data.get("transcript", ""))
)
# Send audio chunks
with open("recording.pcm", "rb") as f:
while chunk := f.read(8192):
await connection.send({
"audio_base_64": base64.b64encode(chunk).decode()
})
await asyncio.sleep(0.01)
# Manually commit the segment
await connection.commit()
# Wait for result
await asyncio.sleep(2)
print("Transcripts:", transcripts)
await connection.close()