Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Langfuse Langfuse Data Streaming from ClickHouse

From Leeroopedia
Knowledge Sources
Domains Batch Export, Data Streaming, ClickHouse, Analytics Database
Last Updated 2026-02-14 00:00 GMT

Overview

Data Streaming from ClickHouse is the principle of reading large volumes of analytics data from a columnar database through paginated or cursor-based streaming, enriching each batch with supplementary data, and emitting the results as a Node.js Readable stream suitable for downstream piping.

Description

LLM engineering platforms accumulate massive volumes of tracing data -- traces, observations (spans/generations), events, scores, and sessions -- in a columnar analytics database optimized for aggregation queries. When a user requests a bulk export, naively loading all matching rows into memory would exhaust the worker's heap and cause out-of-memory crashes.

Data Streaming from ClickHouse addresses this by implementing two complementary streaming strategies:

  1. ClickHouse native streaming: For high-volume tables (traces, observations, events), the system uses ClickHouse's streaming query API (queryClickhouseStream), which returns an async generator that yields rows one at a time as they arrive from the database. This avoids buffering the entire result set in memory. The query itself performs server-side aggregation of scores using CTEs (Common Table Expressions) and LEFT JOINs, so the worker receives pre-enriched rows.
  1. Paginated database read stream: For other tables (scores, sessions, dataset items, audit logs), the system uses a DatabaseReadStream class -- a custom Node.js Readable that fetches data in fixed-size pages. Each page is retrieved via a separate query with LIMIT/OFFSET semantics, and the stream signals completion when a page returns fewer rows than the page size.

Both strategies share common enrichment patterns:

  • Score flattening: Scores associated with each record are aggregated by name and type, then flattened into columns on the output row. Numeric and boolean scores are averaged; categorical scores are preserved as string arrays.
  • Comment fetching: Comments are retrieved in batches (BATCH_SIZE=1000) from PostgreSQL and attached to their parent records via a lookup map. This batched approach avoids N+1 query problems.
  • Model enrichment: For observations, model metadata (pricing, model name) is fetched from PostgreSQL with an in-memory cache to avoid repeated database hits for the same model ID.
  • Cutoff filter: A timestamp cutoff filter (createdAt < jobCreatedAt) is applied to prevent exporting data that was ingested after the export job was created, ensuring a consistent snapshot.

Usage

Use Data Streaming from ClickHouse whenever:

  • You need to export more rows than can fit in memory (typically thousands to millions).
  • The export data spans multiple source tables that must be joined or enriched.
  • Downstream consumers (format transforms, upload streams) can process data incrementally.
  • You need to apply user-defined filters, search queries, and ordering to the exported data.

Theoretical Basis

The streaming approach follows the Iterator Pattern combined with Batch Enrichment:

FUNCTION createDataStream(projectId, tableName, filters, cutoffDate, rowLimit):

  -- Strategy selection based on table type
  IF tableName IN (traces, observations, events):
    -- Native ClickHouse streaming with server-side score aggregation
    stream = CLICKHOUSE_STREAM(
      query = BUILD_QUERY(tableName, filters, cutoffDate, rowLimit),
      -- Query includes CTEs for score aggregation and LEFT JOINs
    )
  ELSE:
    -- Paginated read stream for PostgreSQL-backed or simpler ClickHouse tables
    stream = PAGINATED_STREAM(
      fetchPage = (pageSize, offset) => QUERY(tableName, filters, cutoffDate, pageSize, offset),
      pageSize = BATCH_EXPORT_PAGE_SIZE,
      maxRows = rowLimit,
    )

  -- Enrichment layer (applied per-batch for efficiency)
  enrichedStream = TRANSFORM(stream, batch of rows =>
    FOR EACH batch of BATCH_SIZE rows:
      -- Fetch comments in bulk
      commentMap = FETCH_COMMENTS(projectId, tableName, rowIds)

      -- Fetch model data (cached) for observations
      IF tableName == observations:
        FOR EACH row: row.model = MODEL_CACHE.get(row.internalModelId)

      -- Flatten scores into columns
      FOR EACH row:
        row.scoreColumns = flattenScores(row.scores, emptyScoreColumns)
        row.comments = commentMap.get(row.id) ?? []

      YIELD enrichedRows
  )

  RETURN enrichedStream  -- Node.js Readable in object mode

The empty score columns pattern deserves special note: before streaming begins, the system queries ClickHouse for all distinct score names across the filtered dataset. This produces a set of column headers that will appear in the output, even if a particular row has no value for a given score. Each row is then merged with a template of { scoreName: null } entries, ensuring consistent column structure across all rows -- which is essential for CSV output where every row must have the same columns.

The ClickHouse query configuration includes extended HTTP timeouts (http_send_timeout: 300, http_receive_timeout: 300) and a request timeout of 180 seconds to prevent premature disconnections during large exports that may stall while waiting for downstream consumers (S3 upload) to drain the stream.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment