Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Openai Openai node AssistantStream

From Leeroopedia
Knowledge Sources
Domains SDK, Streaming, Assistants_API
Last Updated 2026-02-15 12:00 GMT

Overview

The AssistantStream class provides a high-level event-driven streaming interface for consuming Assistants API responses, accumulating deltas into snapshots for messages, run steps, and tool calls.

Description

AssistantStream extends EventStream<AssistantStreamEvents> and implements AsyncIterable<AssistantStreamEvent>. It wraps the raw SSE stream from the Assistants API and processes each server-sent event through a comprehensive event dispatch system. The class maintains internal snapshots for messages, run steps, and the current run, progressively accumulating delta events into complete objects.

The class offers multiple static factory methods for different Assistants API operations: createAssistantStream for creating a run on an existing thread, createThreadAssistantStream for creating a thread and run simultaneously, createToolAssistantStream for submitting tool outputs, and fromReadableStream for consuming a stream produced elsewhere (e.g., forwarded from a backend). Each factory sets up the connection, iterates over the SSE stream, dispatches events, and resolves with the final Run object.

The event system emits granular events such as messageCreated, messageDelta, messageDone, textCreated, textDelta, textDone, runStepCreated, runStepDelta, toolCallCreated, toolCallDelta, toolCallDone, and more. A static accumulateDelta utility method recursively merges delta objects into accumulated snapshots, handling string concatenation, number addition, and indexed array merging.

Usage

This class is used when streaming responses from the Assistants API (beta). It is typically created through the SDK's resource methods such as client.beta.threads.runs.stream() rather than instantiated directly.

Code Reference

Source Location

Signature

export class AssistantStream
  extends EventStream<AssistantStreamEvents>
  implements AsyncIterable<AssistantStreamEvent>
{
  static fromReadableStream(stream: ReadableStream): AssistantStream;

  static createAssistantStream(
    threadId: string,
    runs: Runs,
    params: RunCreateParamsBaseStream,
    options?: RequestOptions,
  ): AssistantStream;

  static createThreadAssistantStream(
    params: ThreadCreateAndRunParamsBaseStream,
    thread: Threads,
    options?: RequestOptions,
  ): AssistantStream;

  static createToolAssistantStream(
    runId: string,
    runs: Runs,
    params: RunSubmitToolOutputsParamsStream,
    options: RequestOptions | undefined,
  ): AssistantStream;

  currentEvent(): AssistantStreamEvent | undefined;
  currentRun(): Run | undefined;
  currentMessageSnapshot(): Message | undefined;
  currentRunStepSnapshot(): Runs.RunStep | undefined;

  finalRunSteps(): Promise<Runs.RunStep[]>;
  finalMessages(): Promise<Message[]>;
  finalRun(): Promise<Run>;

  toReadableStream(): ReadableStream;
}

Import

import { AssistantStream } from 'openai/lib/AssistantStream';

I/O Contract

Inputs

Name Type Required Description
threadId string Yes (for createAssistantStream) The ID of the thread to create a run on
runs Runs Yes The Runs resource instance for API calls
params RunCreateParamsBaseStream Yes Run creation parameters (assistant_id, instructions, etc.)
options RequestOptions No Request configuration (signal, headers, etc.)

Outputs

Name Type Description
AssistantStreamEvent AsyncIterable<AssistantStreamEvent> Async iterable of all streamed events
finalRun() Promise<Run> The completed Run object after the stream ends
finalMessages() Promise<Message[]> All accumulated message snapshots
finalRunSteps() Promise<Runs.RunStep[]> All accumulated run step snapshots

Key Events

Event Callback Signature Description
messageCreated (message: Message) => void Fired when a new message is created
messageDelta (delta: MessageDelta, snapshot: Message) => void Fired on each message delta with the accumulated snapshot
messageDone (message: Message) => void Fired when a message is complete
textCreated (content: Text) => void Fired when new text content begins
textDelta (delta: TextDelta, snapshot: Text) => void Fired on each text content delta
textDone (content: Text, snapshot: Message) => void Fired when text content is complete
toolCallCreated (toolCall: ToolCall) => void Fired when a new tool call begins
toolCallDelta (delta: ToolCallDelta, snapshot: ToolCall) => void Fired on each tool call delta
toolCallDone (toolCall: ToolCall) => void Fired when a tool call is complete
runStepCreated (runStep: RunStep) => void Fired when a new run step is created
event (event: AssistantStreamEvent) => void Catch-all for every raw event

Usage Examples

Basic Usage

import OpenAI from 'openai';

const client = new OpenAI();
const stream = client.beta.threads.runs.stream('thread_abc123', {
  assistant_id: 'asst_abc123',
});

stream.on('textDelta', (delta, snapshot) => {
  process.stdout.write(delta.value || '');
});

stream.on('messageDone', (message) => {
  console.log('\nFinal message:', message.content);
});

const run = await stream.finalRun();
console.log('Run status:', run.status);

Async Iteration

const stream = client.beta.threads.runs.stream('thread_abc123', {
  assistant_id: 'asst_abc123',
});

for await (const event of stream) {
  if (event.event === 'thread.message.delta') {
    // process delta
  }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment