Implementation:Langfuse Langfuse Batch Export Stream Transformations
| Knowledge Sources | |
|---|---|
| Domains | Batch Export, Data Serialization, Node.js Streams |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for converting object-mode data streams into CSV, JSON, or JSONL text streams, provided by Langfuse.
Description
The stream transformation system consists of three factory functions and a registry object:
transformStreamToCsv() -- Returns a Transform stream that auto-detects CSV headers from the first row's keys, emits a header line, and converts each subsequent object into a comma-delimited row. Fields are escaped using RFC 4180 rules: every field is wrapped in double quotes, and internal double quotes are doubled. The transform implements cooperative scheduling with a 50ms yield interval using setImmediate to prevent event loop starvation during large exports.
transformStreamToJson() -- Returns a Transform stream that wraps the output in a JSON array. Emits [ before the first element, , between elements, and ] in the final handler. Outputs [] if no rows are processed, ensuring the result is always valid JSON.
transformStreamToJsonl() -- Returns a Transform stream that emits one JSON-serialized row per line. The simplest of the three, it appends a newline to each stringified row.
All three transforms use a shared stringify utility from the same module directory, which handles serialization of special JavaScript types (Date, BigInt, Decimal, nested objects/arrays) into JSON-compatible strings.
The streamTransformations registry object maps BatchExportFileFormat enum values to their corresponding factory functions, enabling format selection at runtime via streamTransformations[format]().
Usage
Import streamTransformations from the shared server utilities and call the appropriate factory based on the user's chosen export format. The returned transform should be inserted into a stream.pipeline() between the data source stream and the upload destination.
Code Reference
Source Location
- Repository: langfuse
- Files:
- packages/shared/src/server/utils/transforms/transformStreamToCsv.ts (Lines 13-56)
- packages/shared/src/server/utils/transforms/transformStreamToJson.ts (Lines 4-39)
- packages/shared/src/server/utils/transforms/transformStreamToJsonl.ts (Lines 4-17)
- packages/shared/src/server/utils/transforms/index.ts (Lines 8-15)
Signature
// CSV transform factory
export function transformStreamToCsv(): Transform;
// JSON transform factory
export function transformStreamToJson(): Transform;
// JSONL transform factory
export function transformStreamToJsonl(): Transform;
// Registry mapping format enum to factory
export const streamTransformations: Record<
BatchExportFileFormat,
() => Transform
> = {
CSV: transformStreamToCsv,
JSON: transformStreamToJson,
JSONL: transformStreamToJsonl,
};
Import
import { streamTransformations } from "@langfuse/shared/src/server";
// Or import individual transforms:
import { transformStreamToCsv } from "@langfuse/shared/src/server/utils/transforms/transformStreamToCsv";
import { transformStreamToJson } from "@langfuse/shared/src/server/utils/transforms/transformStreamToJson";
import { transformStreamToJsonl } from "@langfuse/shared/src/server/utils/transforms/transformStreamToJsonl";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| row (per chunk) | Record<string, any> | Yes | A JavaScript object representing a single data row. For CSV, the keys of the first row determine the header columns. For JSON/JSONL, the object is serialized directly. |
| encoding | BufferEncoding | Yes (implicit) | The encoding of the incoming chunk. Typically ignored since the transform operates in object mode. |
Outputs
| Name | Type | Description |
|---|---|---|
| text chunks | string (Buffer mode) | The transform emits string chunks representing the serialized output. For CSV: header line followed by data lines. For JSON: [, comma-separated JSON objects, ]. For JSONL: one JSON string per line.
|
Usage Examples
Using the registry in a pipeline
import { pipeline } from "stream";
import { streamTransformations } from "@langfuse/shared/src/server";
import { BatchExportFileFormat } from "@langfuse/shared";
const format: BatchExportFileFormat = "CSV";
const fileStream = pipeline(
dbReadStream, // object-mode Readable
streamTransformations[format](), // object-to-text Transform
(err) => {
if (err) console.error("Pipeline failed:", err);
},
);
// fileStream is now a Readable emitting text chunks
Direct CSV transform usage
import { transformStreamToCsv } from "@langfuse/shared/src/server/utils/transforms/transformStreamToCsv";
import { Readable } from "stream";
const data = [
{ id: "t1", name: "Hello World", score: 0.95 },
{ id: "t2", name: 'Value with "quotes"', score: 0.87 },
];
const csvTransform = transformStreamToCsv();
const source = Readable.from(data);
source.pipe(csvTransform).on("data", (chunk) => {
console.log(chunk);
// First call: '"id","name","score"\n'
// Second call: '"t1","Hello World","0.95"\n'
// Third call: '"t2","Value with ""quotes""","0.87"\n'
});
JSON transform with empty input
import { transformStreamToJson } from "@langfuse/shared/src/server/utils/transforms/transformStreamToJson";
import { Readable } from "stream";
const emptySource = Readable.from([]);
const jsonTransform = transformStreamToJson();
let output = "";
emptySource
.pipe(jsonTransform)
.on("data", (chunk) => { output += chunk; })
.on("end", () => {
console.log(output); // "[]"
});