Implementation:Openai Openai node AssistantStream
| Knowledge Sources | |
|---|---|
| Domains | SDK, Streaming, Assistants_API |
| Last Updated | 2026-02-15 12:00 GMT |
Overview
The AssistantStream class provides a high-level event-driven streaming interface for consuming Assistants API responses, accumulating deltas into snapshots for messages, run steps, and tool calls.
Description
AssistantStream extends EventStream<AssistantStreamEvents> and implements AsyncIterable<AssistantStreamEvent>. It wraps the raw SSE stream from the Assistants API and processes each server-sent event through a comprehensive event dispatch system. The class maintains internal snapshots for messages, run steps, and the current run, progressively accumulating delta events into complete objects.
The class offers multiple static factory methods for different Assistants API operations: createAssistantStream for creating a run on an existing thread, createThreadAssistantStream for creating a thread and run simultaneously, createToolAssistantStream for submitting tool outputs, and fromReadableStream for consuming a stream produced elsewhere (e.g., forwarded from a backend). Each factory sets up the connection, iterates over the SSE stream, dispatches events, and resolves with the final Run object.
The event system emits granular events such as messageCreated, messageDelta, messageDone, textCreated, textDelta, textDone, runStepCreated, runStepDelta, toolCallCreated, toolCallDelta, toolCallDone, and more. A static accumulateDelta utility method recursively merges delta objects into accumulated snapshots, handling string concatenation, number addition, and indexed array merging.
Usage
This class is used when streaming responses from the Assistants API (beta). It is typically created through the SDK's resource methods such as client.beta.threads.runs.stream() rather than instantiated directly.
Code Reference
Source Location
- Repository: openai-node
- File: src/lib/AssistantStream.ts
- Lines: 1-778
Signature
export class AssistantStream
extends EventStream<AssistantStreamEvents>
implements AsyncIterable<AssistantStreamEvent>
{
static fromReadableStream(stream: ReadableStream): AssistantStream;
static createAssistantStream(
threadId: string,
runs: Runs,
params: RunCreateParamsBaseStream,
options?: RequestOptions,
): AssistantStream;
static createThreadAssistantStream(
params: ThreadCreateAndRunParamsBaseStream,
thread: Threads,
options?: RequestOptions,
): AssistantStream;
static createToolAssistantStream(
runId: string,
runs: Runs,
params: RunSubmitToolOutputsParamsStream,
options: RequestOptions | undefined,
): AssistantStream;
currentEvent(): AssistantStreamEvent | undefined;
currentRun(): Run | undefined;
currentMessageSnapshot(): Message | undefined;
currentRunStepSnapshot(): Runs.RunStep | undefined;
finalRunSteps(): Promise<Runs.RunStep[]>;
finalMessages(): Promise<Message[]>;
finalRun(): Promise<Run>;
toReadableStream(): ReadableStream;
}
Import
import { AssistantStream } from 'openai/lib/AssistantStream';
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| threadId | string |
Yes (for createAssistantStream) | The ID of the thread to create a run on |
| runs | Runs |
Yes | The Runs resource instance for API calls |
| params | RunCreateParamsBaseStream |
Yes | Run creation parameters (assistant_id, instructions, etc.) |
| options | RequestOptions |
No | Request configuration (signal, headers, etc.) |
Outputs
| Name | Type | Description |
|---|---|---|
| AssistantStreamEvent | AsyncIterable<AssistantStreamEvent> |
Async iterable of all streamed events |
| finalRun() | Promise<Run> |
The completed Run object after the stream ends |
| finalMessages() | Promise<Message[]> |
All accumulated message snapshots |
| finalRunSteps() | Promise<Runs.RunStep[]> |
All accumulated run step snapshots |
Key Events
| Event | Callback Signature | Description |
|---|---|---|
| messageCreated | (message: Message) => void |
Fired when a new message is created |
| messageDelta | (delta: MessageDelta, snapshot: Message) => void |
Fired on each message delta with the accumulated snapshot |
| messageDone | (message: Message) => void |
Fired when a message is complete |
| textCreated | (content: Text) => void |
Fired when new text content begins |
| textDelta | (delta: TextDelta, snapshot: Text) => void |
Fired on each text content delta |
| textDone | (content: Text, snapshot: Message) => void |
Fired when text content is complete |
| toolCallCreated | (toolCall: ToolCall) => void |
Fired when a new tool call begins |
| toolCallDelta | (delta: ToolCallDelta, snapshot: ToolCall) => void |
Fired on each tool call delta |
| toolCallDone | (toolCall: ToolCall) => void |
Fired when a tool call is complete |
| runStepCreated | (runStep: RunStep) => void |
Fired when a new run step is created |
| event | (event: AssistantStreamEvent) => void |
Catch-all for every raw event |
Usage Examples
Basic Usage
import OpenAI from 'openai';
const client = new OpenAI();
const stream = client.beta.threads.runs.stream('thread_abc123', {
assistant_id: 'asst_abc123',
});
stream.on('textDelta', (delta, snapshot) => {
process.stdout.write(delta.value || '');
});
stream.on('messageDone', (message) => {
console.log('\nFinal message:', message.content);
});
const run = await stream.finalRun();
console.log('Run status:', run.status);
Async Iteration
const stream = client.beta.threads.runs.stream('thread_abc123', {
assistant_id: 'asst_abc123',
});
for await (const event of stream) {
if (event.event === 'thread.message.delta') {
// process delta
}
}