Implementation:Openai Openai node ChatCompletionStream Class
| Knowledge Sources | |
|---|---|
| Domains | SDK, Streaming, Chat_Completions |
| Last Updated | 2026-02-15 12:00 GMT |
Overview
The ChatCompletionStream class provides a rich event-driven streaming interface for Chat Completions, accumulating chunks into a snapshot and emitting granular events for content, refusals, tool calls, and log probabilities.
Description
ChatCompletionStream<ParsedT> extends AbstractChatCompletionRunner and implements AsyncIterable<ChatCompletionChunk>. It wraps the raw SSE chunk stream from the Chat Completions API and progressively builds a ChatCompletionSnapshot -- a mutable accumulation of all received chunks that represents the current state of the completion.
The class manages a detailed event lifecycle. For each chunk, it accumulates the delta into the snapshot, then emits typed events: chunk (every chunk with its snapshot), content and content.delta/content.done for text content, refusal.delta/refusal.done for refusals, tool_calls.function.arguments.delta/tool_calls.function.arguments.done for function tool call argument streaming, and logprobs.content.delta/logprobs.content.done / logprobs.refusal.delta/logprobs.refusal.done for log probability events. It uses a ChoiceEventState tracker per choice to ensure done events fire exactly once.
When auto-parseable response formats or tools (created via zodResponseFormat or zodFunction) are provided, the stream performs partial parsing during accumulation using the partialParse utility, and full parsing on finalization via maybeParseChatCompletion. If a length or content_filter finish reason is encountered with auto-parseable inputs, the stream throws LengthFinishReasonError or ContentFilterFinishReasonError.
The finalizeChatCompletion helper converts the mutable snapshot into a proper ChatCompletion object, validating required fields (finish_reason, role, tool call IDs) and running the parser pipeline.
Usage
This class is created by client.chat.completions.stream() or client.beta.chat.completions.stream(). It can also be constructed from a ReadableStream via ChatCompletionStream.fromReadableStream() for frontend consumption of a stream produced on the backend.
Code Reference
Source Location
- Repository: openai-node
- File: src/lib/ChatCompletionStream.ts
- Lines: 1-872
Signature
export class ChatCompletionStream<ParsedT = null>
extends AbstractChatCompletionRunner<ChatCompletionStreamEvents<ParsedT>, ParsedT>
implements AsyncIterable<ChatCompletionChunk>
{
constructor(params: ChatCompletionCreateParams | null);
get currentChatCompletionSnapshot(): ChatCompletionSnapshot | undefined;
static fromReadableStream(stream: ReadableStream): ChatCompletionStream<null>;
static createChatCompletion<ParsedT>(
client: OpenAI,
params: ChatCompletionStreamParams,
options?: RequestOptions,
): ChatCompletionStream<ParsedT>;
[Symbol.asyncIterator](): AsyncIterator<ChatCompletionChunk>;
toReadableStream(): ReadableStream;
}
Import
import { ChatCompletionStream } from 'openai/lib/ChatCompletionStream';
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| params | ChatCompletionStreamParams |
Yes | Chat completion parameters (model, messages, tools, response_format, etc.) with stream forced to true
|
| client | OpenAI |
Yes (for createChatCompletion) | The OpenAI client instance |
| options | RequestOptions |
No | Request configuration (signal, headers, etc.) |
Outputs
| Name | Type | Description |
|---|---|---|
| ChatCompletionChunk | AsyncIterable<ChatCompletionChunk> |
Async iterable of raw chunks |
| currentChatCompletionSnapshot | ChatCompletionSnapshot |
The current accumulated state of the completion |
| finalChatCompletion() | Promise<ParsedChatCompletion<ParsedT>> |
The finalized parsed completion (inherited from AbstractChatCompletionRunner) |
| toReadableStream() | ReadableStream |
Serialized stream for cross-boundary transport |
Key Events
| Event | Callback Signature | Description |
|---|---|---|
| chunk | (chunk: ChatCompletionChunk, snapshot: ChatCompletionSnapshot) => void |
Fired for every received chunk |
| content | (delta: string, snapshot: string) => void |
Simple content delta with accumulated text |
| content.delta | (props: ContentDeltaEvent) => void |
Structured content delta with parsed partial output |
| content.done | (props: ContentDoneEvent<ParsedT>) => void |
Fired when content is finalized, with parsed result |
| refusal.delta | (props: RefusalDeltaEvent) => void |
Refusal content delta |
| refusal.done | (props: RefusalDoneEvent) => void |
Fired when refusal content is complete |
| tool_calls.function.arguments.delta | (props: FunctionToolCallArgumentsDeltaEvent) => void |
Tool call arguments delta with partial parse |
| tool_calls.function.arguments.done | (props: FunctionToolCallArgumentsDoneEvent) => void |
Fired when tool call arguments are complete |
| logprobs.content.delta | (props: LogProbsContentDeltaEvent) => void |
Log probability delta for content tokens |
| logprobs.content.done | (props: LogProbsContentDoneEvent) => void |
Fired when all content log probabilities are received |
Key Types
ChatCompletionSnapshot
export interface ChatCompletionSnapshot {
id: string;
choices: Array<ChatCompletionSnapshot.Choice>;
created: number;
model: string;
system_fingerprint?: string;
}
Usage Examples
Basic Usage
import OpenAI from 'openai';
const client = new OpenAI();
const stream = client.chat.completions.stream({
model: 'gpt-4o',
messages: [{ role: 'user', content: 'Write a haiku about TypeScript' }],
});
stream.on('content.delta', ({ delta, snapshot }) => {
process.stdout.write(delta);
});
const completion = await stream.finalChatCompletion();
console.log('\nFinish reason:', completion.choices[0].finish_reason);
With Structured Output
import { zodResponseFormat } from 'openai/helpers/zod';
import { z } from 'zod';
const Poem = z.object({ title: z.string(), lines: z.array(z.string()) });
const stream = client.chat.completions.stream({
model: 'gpt-4o',
messages: [{ role: 'user', content: 'Write a poem' }],
response_format: zodResponseFormat(Poem, 'poem'),
});
stream.on('content.delta', ({ parsed }) => {
// `parsed` is a partial Poem object during streaming
console.log('Partial parse:', parsed);
});
stream.on('content.done', ({ parsed }) => {
// `parsed` is the fully validated Poem
console.log('Final:', parsed);
});
Async Iteration
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) process.stdout.write(content);
}