Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse GetDatabaseReadStream

From Leeroopedia
Knowledge Sources
Domains Batch Export, Data Streaming, ClickHouse, Node.js Streams
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for creating paginated and streaming data readers from ClickHouse and PostgreSQL for batch export, provided by Langfuse.

Description

This implementation encompasses four primary streaming functions and a shared paginated reader:

1. getTraceStream -- Queries the ClickHouse traces table with a LEFT JOIN to an aggregated scores_agg CTE. Filters out observation-level filters (which would require an observations JOIN). Returns a Readable stream that buffers rows in batches of 1,000, fetches comments for each batch from PostgreSQL, processes numeric/boolean/categorical scores, and yields enriched trace objects with flattened score columns.

2. getObservationStream -- Queries the ClickHouse observations table with LEFT JOINs to traces (for trace metadata) and scores_agg. Includes a model cache (createModelCache) that lazily fetches Prisma Model records with their Price relations and caches them in a Map. Enriches each observation with model data via enrichObservationWithModelData. Supports OTEL deduplication bypass via shouldSkipObservationsFinal.

3. getEventsStream -- Queries the ClickHouse events table using the EventsQueryBuilder fluent API. The events table is denormalized with trace data already included, so no trace JOIN is needed for basic fields. Uses eventsScoresAggregation as a CTE for score data.

4. getDatabaseReadStreamPaginated -- A generic paginated reader using the DatabaseReadStream class. Handles scores, sessions, observations (legacy path), traces (legacy path), dataset_run_items, dataset_items, and audit_logs. Each table has a dedicated fetch function that queries ClickHouse or PostgreSQL with LIMIT/OFFSET pagination and enriches results with related data (user info, dataset names, session bookmarks, comments).

All streaming functions apply a cutoff filter using the job's createdAt timestamp to ensure data consistency, and all respect the BATCH_EXPORT_ROW_LIMIT environment variable as an upper bound on exported rows.

Usage

These functions are called by handleBatchExportJob based on the tableName in the export query. They are internal to the worker and are not part of a public API. Import them in the worker's batch export handler.

Code Reference

Source Location

  • Repository: langfuse
  • Files:
    • worker/src/features/database-read-stream/trace-stream.ts (Lines 29-339)
    • worker/src/features/database-read-stream/observation-stream.ts (Lines 77-428)
    • worker/src/features/database-read-stream/event-stream.ts (Lines 46-339)
    • worker/src/features/database-read-stream/getDatabaseReadStream.ts (Lines 99-633)

Signature

// Trace streaming (ClickHouse native)
export const getTraceStream = async (props: {
  projectId: string;
  cutoffCreatedAt: Date;
  filter: FilterCondition[] | null;
  searchQuery?: string;
  searchType?: TracingSearchType[];
  rowLimit?: number;
}): Promise<Readable>;

// Observation streaming (ClickHouse native)
export const getObservationStream = async (props: {
  projectId: string;
  cutoffCreatedAt: Date;
  filter: FilterCondition[] | null;
  searchQuery?: string;
  searchType?: TracingSearchType[];
  rowLimit?: number;
}): Promise<Readable>;

// Event streaming (ClickHouse native)
export const getEventsStream = async (props: {
  projectId: string;
  cutoffCreatedAt: Date;
  filter: FilterCondition[] | null;
  searchQuery?: string;
  searchType?: TracingSearchType[];
  rowLimit?: number;
}): Promise<Readable>;

// Paginated streaming (all other tables)
export const getDatabaseReadStreamPaginated = async (params: {
  projectId: string;
  cutoffCreatedAt: Date;
  searchQuery?: string;
  searchType?: TracingSearchType[];
  rowLimit?: number;
} & BatchExportQueryType): Promise<DatabaseReadStream<unknown>>;

Import

import { getTraceStream } from "../database-read-stream/trace-stream";
import { getObservationStream } from "../database-read-stream/observation-stream";
import { getEventsStream } from "../database-read-stream/event-stream";
import { getDatabaseReadStreamPaginated } from "../database-read-stream/getDatabaseReadStream";

I/O Contract

Inputs

Name Type Required Description
projectId string Yes The project to export data from. Used in all ClickHouse WHERE clauses and PostgreSQL queries.
cutoffCreatedAt Date Yes The timestamp of the export job's creation. Applied as an upper-bound filter (timestamp < cutoffCreatedAt) to ensure a consistent snapshot.
filter null No User-defined filter conditions from the UI. Converted to ClickHouse filter expressions via createFilterFromFilterState. Observation/trace-level filters are stripped when not applicable.
searchQuery string No Free-text search string applied to relevant columns (id, name, user_id, etc.) via clickhouseSearchCondition.
searchType TracingSearchType[] No Specifies which columns the search query applies to. Defaults to ["id"].
rowLimit number No Maximum number of rows to export. Defaults to env.BATCH_EXPORT_ROW_LIMIT.
tableName BatchTableNames Yes (paginated only) The target table name, used by getDatabaseReadStreamPaginated to select the correct fetch function.
orderBy OrderByState Yes (paginated only) Column and direction for ordering results.

Outputs

Name Type Description
stream Promise<Readable> or Promise<DatabaseReadStream<unknown>> A Node.js Readable stream in object mode. Each emitted object is a single enriched data row with flattened score columns, comments, and (for observations) model metadata. The stream ends when all matching rows have been emitted or the row limit is reached.

Usage Examples

Streaming traces with filters

import { getTraceStream } from "../database-read-stream/trace-stream";

const stream = await getTraceStream({
  projectId: "7a88fb47-b4e2-43b8-a06c-a5ce950dc53a",
  cutoffCreatedAt: new Date("2026-02-14T00:00:00Z"),
  filter: [
    {
      column: "Timestamp",
      operator: ">",
      value: new Date("2026-01-01"),
      type: "datetime",
    },
  ],
  rowLimit: 50000,
});

// Pipe to a format transform and upload
stream.pipe(transformStreamToCsv()).pipe(uploadStream);

Paginated streaming of scores

import { getDatabaseReadStreamPaginated } from "../database-read-stream/getDatabaseReadStream";

const stream = await getDatabaseReadStreamPaginated({
  projectId: "7a88fb47-b4e2-43b8-a06c-a5ce950dc53a",
  cutoffCreatedAt: new Date("2026-02-14T00:00:00Z"),
  tableName: "scores",
  filter: [
    { column: "name", operator: "=", value: "accuracy", type: "string" },
  ],
  orderBy: { column: "timestamp", order: "DESC" },
});

for await (const row of stream) {
  console.log(row);
}

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment