Principle:Openai Openai node Response Stream Processing
| Knowledge Sources | |
|---|---|
| Domains | Streaming, Event_Driven_Architecture |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
A principle for processing incremental server-sent event (SSE) streams from a language model API, accumulating chunks into a final response while emitting typed events.
Description
Response Stream Processing handles the consumption of streaming API responses where the server sends data incrementally as Server-Sent Events (SSE). Instead of waiting for the complete response, the client processes each chunk as it arrives, enabling real-time display of generated text.
The stream processing involves: parsing SSE frames into typed event objects, accumulating content deltas into a complete response snapshot, emitting granular events (content.delta, content.done, chunk, tool_calls.delta), and providing both event-driven and async-iterable consumption patterns.
Usage
Use this principle when building applications that need real-time text display (chatbots, writing assistants), progress feedback during long generations, or server-to-client stream proxying. Prefer the high-level stream helper over raw SSE parsing.
Theoretical Basis
Stream processing follows the Observer Pattern combined with Accumulator Pattern:
// Observer: consumers subscribe to events
stream.on('content.delta', (delta) => display(delta))
stream.on('content.done', (full) => finalize(full))
// Accumulator: chunks are merged into snapshot
for each chunk in SSE_stream:
snapshot = merge(snapshot, chunk)
emit('chunk', chunk)
emit('content.delta', { delta: chunk.content, snapshot: snapshot.content })
if chunk.finish_reason:
emit('content.done', { content: snapshot.content })
// Async Iterable: direct consumption
for await (const chunk of stream):
process(chunk)
The dual interface (events + async iteration) gives consumers flexibility: events are ideal for UI updates with fine-grained control, while async iteration is simpler for sequential processing.