Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Openai Openai node EventStream

From Leeroopedia
Knowledge Sources
Domains SDK, Streaming, Event System
Last Updated 2026-02-15 12:00 GMT

Overview

EventStream is the base class for async streaming lifecycles in the SDK, providing event emission, abort control, error handling, and promise-based completion tracking.

Description

The EventStream<EventTypes> class manages the full lifecycle of an asynchronous streaming operation. It maintains internal state for connection, completion, error, and abort conditions, and exposes these through boolean getters (ended, errored, aborted). The class uses two internal promises: a connected promise that resolves when the stream establishes its connection, and an end promise that resolves when the stream completes.

The class provides a complete event listener system with on(), off(), once(), and emitted() methods, similar to the standalone EventEmitter but integrated with the streaming lifecycle. The BaseEvents interface defines four standard lifecycle events: connect, error, abort, and end. Error handling automatically wraps non-OpenAI errors into OpenAIError instances and converts AbortError into APIUserAbortError.

A key design feature is the unhandled rejection safety mechanism: if no error listener is registered and no promise-returning method (like done()) has been called, the class triggers a Promise.reject() to surface the error rather than silently swallowing it. Subclasses use the protected _run() method to start execution, _connected() to signal connection, and _emitFinal() as a hook for emitting final state before the end event.

Usage

Use EventStream as a base class for building streaming abstractions. It is extended by ChatCompletionStream, AbstractChatCompletionRunner, and the Responses API streaming classes. End users interact with it through these subclasses, using on() for event listeners, done() to await completion, and abort() to cancel the stream.

Code Reference

Source Location

Signature

export interface BaseEvents {
  connect: () => void;
  error: (error: OpenAIError) => void;
  abort: (error: APIUserAbortError) => void;
  end: () => void;
}

export class EventStream<EventTypes extends BaseEvents> {
  controller: AbortController;
  get ended(): boolean;
  get errored(): boolean;
  get aborted(): boolean;

  abort(): void;
  on<Event extends keyof EventTypes>(event: Event, listener: EventTypes[Event]): this;
  off<Event extends keyof EventTypes>(event: Event, listener: EventTypes[Event]): this;
  once<Event extends keyof EventTypes>(event: Event, listener: EventTypes[Event]): this;
  emitted<Event extends keyof EventTypes>(event: Event): Promise<...>;
  async done(): Promise<void>;

  protected _run(executor: () => Promise<any>): void;
  protected _connected(): void;
  protected _emitFinal(): void;
  _emit<Event extends keyof EventTypes>(event: Event, ...args: EventParameters<EventTypes, Event>): void;
}

Import

import { EventStream } from 'openai/lib/EventStream';
import type { BaseEvents, EventParameters } from 'openai/lib/EventStream';

I/O Contract

Inputs

Name Type Required Description
executor () => Promise<any> Yes (for _run) An async function that performs the streaming operation. Called via setTimeout to avoid constructor timing issues.
event keyof EventTypes Yes (for on/off/once/emitted) The event name to listen for or emit.
listener EventTypes[Event] Yes (for on/off/once) The callback function to register or remove.

Outputs

Name Type Description
this EventStream The on(), off(), and once() methods return this for chaining.
done() Promise<void> Resolves when the stream ends successfully, or rejects with the stream's error.
emitted() Promise Resolves with the event parameters the next time the specified event fires.

Usage Examples

// EventStream is typically used as a base class, not directly instantiated.
// Here's how subclasses expose its functionality:

import OpenAI from 'openai';

const client = new OpenAI();

const stream = client.beta.chat.completions.stream({
  model: 'gpt-4o',
  messages: [{ role: 'user', content: 'Hello!' }],
});

// Lifecycle events from EventStream
stream.on('connect', () => console.log('Connected'));
stream.on('error', (err) => console.error('Error:', err));
stream.on('abort', (err) => console.log('Aborted:', err));
stream.on('end', () => console.log('Stream ended'));

// Abort the stream
// stream.abort();

// Await completion
await stream.done();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment