Implementation:Langfuse Langfuse GetDatabaseReadStream
| Knowledge Sources | |
|---|---|
| Domains | Batch Export, Data Streaming, ClickHouse, Node.js Streams |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for creating paginated and streaming data readers from ClickHouse and PostgreSQL for batch export, provided by Langfuse.
Description
This implementation encompasses four primary streaming functions and a shared paginated reader:
1. getTraceStream -- Queries the ClickHouse traces table with a LEFT JOIN to an aggregated scores_agg CTE. Filters out observation-level filters (which would require an observations JOIN). Returns a Readable stream that buffers rows in batches of 1,000, fetches comments for each batch from PostgreSQL, processes numeric/boolean/categorical scores, and yields enriched trace objects with flattened score columns.
2. getObservationStream -- Queries the ClickHouse observations table with LEFT JOINs to traces (for trace metadata) and scores_agg. Includes a model cache (createModelCache) that lazily fetches Prisma Model records with their Price relations and caches them in a Map. Enriches each observation with model data via enrichObservationWithModelData. Supports OTEL deduplication bypass via shouldSkipObservationsFinal.
3. getEventsStream -- Queries the ClickHouse events table using the EventsQueryBuilder fluent API. The events table is denormalized with trace data already included, so no trace JOIN is needed for basic fields. Uses eventsScoresAggregation as a CTE for score data.
4. getDatabaseReadStreamPaginated -- A generic paginated reader using the DatabaseReadStream class. Handles scores, sessions, observations (legacy path), traces (legacy path), dataset_run_items, dataset_items, and audit_logs. Each table has a dedicated fetch function that queries ClickHouse or PostgreSQL with LIMIT/OFFSET pagination and enriches results with related data (user info, dataset names, session bookmarks, comments).
All streaming functions apply a cutoff filter using the job's createdAt timestamp to ensure data consistency, and all respect the BATCH_EXPORT_ROW_LIMIT environment variable as an upper bound on exported rows.
Usage
These functions are called by handleBatchExportJob based on the tableName in the export query. They are internal to the worker and are not part of a public API. Import them in the worker's batch export handler.
Code Reference
Source Location
- Repository: langfuse
- Files:
- worker/src/features/database-read-stream/trace-stream.ts (Lines 29-339)
- worker/src/features/database-read-stream/observation-stream.ts (Lines 77-428)
- worker/src/features/database-read-stream/event-stream.ts (Lines 46-339)
- worker/src/features/database-read-stream/getDatabaseReadStream.ts (Lines 99-633)
Signature
// Trace streaming (ClickHouse native)
export const getTraceStream = async (props: {
projectId: string;
cutoffCreatedAt: Date;
filter: FilterCondition[] | null;
searchQuery?: string;
searchType?: TracingSearchType[];
rowLimit?: number;
}): Promise<Readable>;
// Observation streaming (ClickHouse native)
export const getObservationStream = async (props: {
projectId: string;
cutoffCreatedAt: Date;
filter: FilterCondition[] | null;
searchQuery?: string;
searchType?: TracingSearchType[];
rowLimit?: number;
}): Promise<Readable>;
// Event streaming (ClickHouse native)
export const getEventsStream = async (props: {
projectId: string;
cutoffCreatedAt: Date;
filter: FilterCondition[] | null;
searchQuery?: string;
searchType?: TracingSearchType[];
rowLimit?: number;
}): Promise<Readable>;
// Paginated streaming (all other tables)
export const getDatabaseReadStreamPaginated = async (params: {
projectId: string;
cutoffCreatedAt: Date;
searchQuery?: string;
searchType?: TracingSearchType[];
rowLimit?: number;
} & BatchExportQueryType): Promise<DatabaseReadStream<unknown>>;
Import
import { getTraceStream } from "../database-read-stream/trace-stream";
import { getObservationStream } from "../database-read-stream/observation-stream";
import { getEventsStream } from "../database-read-stream/event-stream";
import { getDatabaseReadStreamPaginated } from "../database-read-stream/getDatabaseReadStream";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| projectId | string | Yes | The project to export data from. Used in all ClickHouse WHERE clauses and PostgreSQL queries. |
| cutoffCreatedAt | Date | Yes | The timestamp of the export job's creation. Applied as an upper-bound filter (timestamp < cutoffCreatedAt) to ensure a consistent snapshot.
|
| filter | null | No | User-defined filter conditions from the UI. Converted to ClickHouse filter expressions via createFilterFromFilterState. Observation/trace-level filters are stripped when not applicable.
|
| searchQuery | string | No | Free-text search string applied to relevant columns (id, name, user_id, etc.) via clickhouseSearchCondition.
|
| searchType | TracingSearchType[] | No | Specifies which columns the search query applies to. Defaults to ["id"].
|
| rowLimit | number | No | Maximum number of rows to export. Defaults to env.BATCH_EXPORT_ROW_LIMIT.
|
| tableName | BatchTableNames | Yes (paginated only) | The target table name, used by getDatabaseReadStreamPaginated to select the correct fetch function.
|
| orderBy | OrderByState | Yes (paginated only) | Column and direction for ordering results. |
Outputs
| Name | Type | Description |
|---|---|---|
| stream | Promise<Readable> or Promise<DatabaseReadStream<unknown>> | A Node.js Readable stream in object mode. Each emitted object is a single enriched data row with flattened score columns, comments, and (for observations) model metadata. The stream ends when all matching rows have been emitted or the row limit is reached. |
Usage Examples
Streaming traces with filters
import { getTraceStream } from "../database-read-stream/trace-stream";
const stream = await getTraceStream({
projectId: "7a88fb47-b4e2-43b8-a06c-a5ce950dc53a",
cutoffCreatedAt: new Date("2026-02-14T00:00:00Z"),
filter: [
{
column: "Timestamp",
operator: ">",
value: new Date("2026-01-01"),
type: "datetime",
},
],
rowLimit: 50000,
});
// Pipe to a format transform and upload
stream.pipe(transformStreamToCsv()).pipe(uploadStream);
Paginated streaming of scores
import { getDatabaseReadStreamPaginated } from "../database-read-stream/getDatabaseReadStream";
const stream = await getDatabaseReadStreamPaginated({
projectId: "7a88fb47-b4e2-43b8-a06c-a5ce950dc53a",
cutoffCreatedAt: new Date("2026-02-14T00:00:00Z"),
tableName: "scores",
filter: [
{ column: "name", operator: "=", value: "accuracy", type: "string" },
],
orderBy: { column: "timestamp", order: "DESC" },
});
for await (const row of stream) {
console.log(row);
}