Implementation:Openai Openai node EventStream
| Knowledge Sources | |
|---|---|
| Domains | SDK, Streaming, Event System |
| Last Updated | 2026-02-15 12:00 GMT |
Overview
EventStream is the base class for async streaming lifecycles in the SDK, providing event emission, abort control, error handling, and promise-based completion tracking.
Description
The EventStream<EventTypes> class manages the full lifecycle of an asynchronous streaming operation. It maintains internal state for connection, completion, error, and abort conditions, and exposes these through boolean getters (ended, errored, aborted). The class uses two internal promises: a connected promise that resolves when the stream establishes its connection, and an end promise that resolves when the stream completes.
The class provides a complete event listener system with on(), off(), once(), and emitted() methods, similar to the standalone EventEmitter but integrated with the streaming lifecycle. The BaseEvents interface defines four standard lifecycle events: connect, error, abort, and end. Error handling automatically wraps non-OpenAI errors into OpenAIError instances and converts AbortError into APIUserAbortError.
A key design feature is the unhandled rejection safety mechanism: if no error listener is registered and no promise-returning method (like done()) has been called, the class triggers a Promise.reject() to surface the error rather than silently swallowing it. Subclasses use the protected _run() method to start execution, _connected() to signal connection, and _emitFinal() as a hook for emitting final state before the end event.
Usage
Use EventStream as a base class for building streaming abstractions. It is extended by ChatCompletionStream, AbstractChatCompletionRunner, and the Responses API streaming classes. End users interact with it through these subclasses, using on() for event listeners, done() to await completion, and abort() to cancel the stream.
Code Reference
Source Location
- Repository: openai-node
- File: src/lib/EventStream.ts
Signature
export interface BaseEvents {
connect: () => void;
error: (error: OpenAIError) => void;
abort: (error: APIUserAbortError) => void;
end: () => void;
}
export class EventStream<EventTypes extends BaseEvents> {
controller: AbortController;
get ended(): boolean;
get errored(): boolean;
get aborted(): boolean;
abort(): void;
on<Event extends keyof EventTypes>(event: Event, listener: EventTypes[Event]): this;
off<Event extends keyof EventTypes>(event: Event, listener: EventTypes[Event]): this;
once<Event extends keyof EventTypes>(event: Event, listener: EventTypes[Event]): this;
emitted<Event extends keyof EventTypes>(event: Event): Promise<...>;
async done(): Promise<void>;
protected _run(executor: () => Promise<any>): void;
protected _connected(): void;
protected _emitFinal(): void;
_emit<Event extends keyof EventTypes>(event: Event, ...args: EventParameters<EventTypes, Event>): void;
}
Import
import { EventStream } from 'openai/lib/EventStream';
import type { BaseEvents, EventParameters } from 'openai/lib/EventStream';
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| executor | () => Promise<any> |
Yes (for _run) | An async function that performs the streaming operation. Called via setTimeout to avoid constructor timing issues.
|
| event | keyof EventTypes |
Yes (for on/off/once/emitted) | The event name to listen for or emit. |
| listener | EventTypes[Event] |
Yes (for on/off/once) | The callback function to register or remove. |
Outputs
| Name | Type | Description |
|---|---|---|
| this | EventStream |
The on(), off(), and once() methods return this for chaining.
|
| done() | Promise<void> |
Resolves when the stream ends successfully, or rejects with the stream's error. |
| emitted() | Promise |
Resolves with the event parameters the next time the specified event fires. |
Usage Examples
// EventStream is typically used as a base class, not directly instantiated.
// Here's how subclasses expose its functionality:
import OpenAI from 'openai';
const client = new OpenAI();
const stream = client.beta.chat.completions.stream({
model: 'gpt-4o',
messages: [{ role: 'user', content: 'Hello!' }],
});
// Lifecycle events from EventStream
stream.on('connect', () => console.log('Connected'));
stream.on('error', (err) => console.error('Error:', err));
stream.on('abort', (err) => console.log('Aborted:', err));
stream.on('end', () => console.log('Stream ended'));
// Abort the stream
// stream.abort();
// Await completion
await stream.done();