Principle:Langfuse Langfuse Data Streaming from ClickHouse
| Knowledge Sources | |
|---|---|
| Domains | Batch Export, Data Streaming, ClickHouse, Analytics Database |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Data Streaming from ClickHouse is the principle of reading large volumes of analytics data from a columnar database through paginated or cursor-based streaming, enriching each batch with supplementary data, and emitting the results as a Node.js Readable stream suitable for downstream piping.
Description
LLM engineering platforms accumulate massive volumes of tracing data -- traces, observations (spans/generations), events, scores, and sessions -- in a columnar analytics database optimized for aggregation queries. When a user requests a bulk export, naively loading all matching rows into memory would exhaust the worker's heap and cause out-of-memory crashes.
Data Streaming from ClickHouse addresses this by implementing two complementary streaming strategies:
- ClickHouse native streaming: For high-volume tables (traces, observations, events), the system uses ClickHouse's streaming query API (
queryClickhouseStream), which returns an async generator that yields rows one at a time as they arrive from the database. This avoids buffering the entire result set in memory. The query itself performs server-side aggregation of scores using CTEs (Common Table Expressions) and LEFT JOINs, so the worker receives pre-enriched rows.
- Paginated database read stream: For other tables (scores, sessions, dataset items, audit logs), the system uses a
DatabaseReadStreamclass -- a custom Node.js Readable that fetches data in fixed-size pages. Each page is retrieved via a separate query withLIMIT/OFFSETsemantics, and the stream signals completion when a page returns fewer rows than the page size.
Both strategies share common enrichment patterns:
- Score flattening: Scores associated with each record are aggregated by name and type, then flattened into columns on the output row. Numeric and boolean scores are averaged; categorical scores are preserved as string arrays.
- Comment fetching: Comments are retrieved in batches (BATCH_SIZE=1000) from PostgreSQL and attached to their parent records via a lookup map. This batched approach avoids N+1 query problems.
- Model enrichment: For observations, model metadata (pricing, model name) is fetched from PostgreSQL with an in-memory cache to avoid repeated database hits for the same model ID.
- Cutoff filter: A timestamp cutoff filter (
createdAt < jobCreatedAt) is applied to prevent exporting data that was ingested after the export job was created, ensuring a consistent snapshot.
Usage
Use Data Streaming from ClickHouse whenever:
- You need to export more rows than can fit in memory (typically thousands to millions).
- The export data spans multiple source tables that must be joined or enriched.
- Downstream consumers (format transforms, upload streams) can process data incrementally.
- You need to apply user-defined filters, search queries, and ordering to the exported data.
Theoretical Basis
The streaming approach follows the Iterator Pattern combined with Batch Enrichment:
FUNCTION createDataStream(projectId, tableName, filters, cutoffDate, rowLimit):
-- Strategy selection based on table type
IF tableName IN (traces, observations, events):
-- Native ClickHouse streaming with server-side score aggregation
stream = CLICKHOUSE_STREAM(
query = BUILD_QUERY(tableName, filters, cutoffDate, rowLimit),
-- Query includes CTEs for score aggregation and LEFT JOINs
)
ELSE:
-- Paginated read stream for PostgreSQL-backed or simpler ClickHouse tables
stream = PAGINATED_STREAM(
fetchPage = (pageSize, offset) => QUERY(tableName, filters, cutoffDate, pageSize, offset),
pageSize = BATCH_EXPORT_PAGE_SIZE,
maxRows = rowLimit,
)
-- Enrichment layer (applied per-batch for efficiency)
enrichedStream = TRANSFORM(stream, batch of rows =>
FOR EACH batch of BATCH_SIZE rows:
-- Fetch comments in bulk
commentMap = FETCH_COMMENTS(projectId, tableName, rowIds)
-- Fetch model data (cached) for observations
IF tableName == observations:
FOR EACH row: row.model = MODEL_CACHE.get(row.internalModelId)
-- Flatten scores into columns
FOR EACH row:
row.scoreColumns = flattenScores(row.scores, emptyScoreColumns)
row.comments = commentMap.get(row.id) ?? []
YIELD enrichedRows
)
RETURN enrichedStream -- Node.js Readable in object mode
The empty score columns pattern deserves special note: before streaming begins, the system queries ClickHouse for all distinct score names across the filtered dataset. This produces a set of column headers that will appear in the output, even if a particular row has no value for a given score. Each row is then merged with a template of { scoreName: null } entries, ensuring consistent column structure across all rows -- which is essential for CSV output where every row must have the same columns.
The ClickHouse query configuration includes extended HTTP timeouts (http_send_timeout: 300, http_receive_timeout: 300) and a request timeout of 180 seconds to prevent premature disconnections during large exports that may stall while waiting for downstream consumers (S3 upload) to drain the stream.