Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langfuse Langfuse Batch Export Stream Transformations

From Leeroopedia
Knowledge Sources
Domains Batch Export, Data Serialization, Node.js Streams
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for converting object-mode data streams into CSV, JSON, or JSONL text streams, provided by Langfuse.

Description

The stream transformation system consists of three factory functions and a registry object:

transformStreamToCsv() -- Returns a Transform stream that auto-detects CSV headers from the first row's keys, emits a header line, and converts each subsequent object into a comma-delimited row. Fields are escaped using RFC 4180 rules: every field is wrapped in double quotes, and internal double quotes are doubled. The transform implements cooperative scheduling with a 50ms yield interval using setImmediate to prevent event loop starvation during large exports.

transformStreamToJson() -- Returns a Transform stream that wraps the output in a JSON array. Emits [ before the first element, , between elements, and ] in the final handler. Outputs [] if no rows are processed, ensuring the result is always valid JSON.

transformStreamToJsonl() -- Returns a Transform stream that emits one JSON-serialized row per line. The simplest of the three, it appends a newline to each stringified row.

All three transforms use a shared stringify utility from the same module directory, which handles serialization of special JavaScript types (Date, BigInt, Decimal, nested objects/arrays) into JSON-compatible strings.

The streamTransformations registry object maps BatchExportFileFormat enum values to their corresponding factory functions, enabling format selection at runtime via streamTransformations[format]().

Usage

Import streamTransformations from the shared server utilities and call the appropriate factory based on the user's chosen export format. The returned transform should be inserted into a stream.pipeline() between the data source stream and the upload destination.

Code Reference

Source Location

  • Repository: langfuse
  • Files:
    • packages/shared/src/server/utils/transforms/transformStreamToCsv.ts (Lines 13-56)
    • packages/shared/src/server/utils/transforms/transformStreamToJson.ts (Lines 4-39)
    • packages/shared/src/server/utils/transforms/transformStreamToJsonl.ts (Lines 4-17)
    • packages/shared/src/server/utils/transforms/index.ts (Lines 8-15)

Signature

// CSV transform factory
export function transformStreamToCsv(): Transform;

// JSON transform factory
export function transformStreamToJson(): Transform;

// JSONL transform factory
export function transformStreamToJsonl(): Transform;

// Registry mapping format enum to factory
export const streamTransformations: Record<
  BatchExportFileFormat,
  () => Transform
> = {
  CSV: transformStreamToCsv,
  JSON: transformStreamToJson,
  JSONL: transformStreamToJsonl,
};

Import

import { streamTransformations } from "@langfuse/shared/src/server";

// Or import individual transforms:
import { transformStreamToCsv } from "@langfuse/shared/src/server/utils/transforms/transformStreamToCsv";
import { transformStreamToJson } from "@langfuse/shared/src/server/utils/transforms/transformStreamToJson";
import { transformStreamToJsonl } from "@langfuse/shared/src/server/utils/transforms/transformStreamToJsonl";

I/O Contract

Inputs

Name Type Required Description
row (per chunk) Record<string, any> Yes A JavaScript object representing a single data row. For CSV, the keys of the first row determine the header columns. For JSON/JSONL, the object is serialized directly.
encoding BufferEncoding Yes (implicit) The encoding of the incoming chunk. Typically ignored since the transform operates in object mode.

Outputs

Name Type Description
text chunks string (Buffer mode) The transform emits string chunks representing the serialized output. For CSV: header line followed by data lines. For JSON: [, comma-separated JSON objects, ]. For JSONL: one JSON string per line.

Usage Examples

Using the registry in a pipeline

import { pipeline } from "stream";
import { streamTransformations } from "@langfuse/shared/src/server";
import { BatchExportFileFormat } from "@langfuse/shared";

const format: BatchExportFileFormat = "CSV";

const fileStream = pipeline(
  dbReadStream,                          // object-mode Readable
  streamTransformations[format](),       // object-to-text Transform
  (err) => {
    if (err) console.error("Pipeline failed:", err);
  },
);

// fileStream is now a Readable emitting text chunks

Direct CSV transform usage

import { transformStreamToCsv } from "@langfuse/shared/src/server/utils/transforms/transformStreamToCsv";
import { Readable } from "stream";

const data = [
  { id: "t1", name: "Hello World", score: 0.95 },
  { id: "t2", name: 'Value with "quotes"', score: 0.87 },
];

const csvTransform = transformStreamToCsv();
const source = Readable.from(data);

source.pipe(csvTransform).on("data", (chunk) => {
  console.log(chunk);
  // First call: '"id","name","score"\n'
  // Second call: '"t1","Hello World","0.95"\n'
  // Third call: '"t2","Value with ""quotes""","0.87"\n'
});

JSON transform with empty input

import { transformStreamToJson } from "@langfuse/shared/src/server/utils/transforms/transformStreamToJson";
import { Readable } from "stream";

const emptySource = Readable.from([]);
const jsonTransform = transformStreamToJson();

let output = "";
emptySource
  .pipe(jsonTransform)
  .on("data", (chunk) => { output += chunk; })
  .on("end", () => {
    console.log(output); // "[]"
  });

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment