Principle:Langfuse Langfuse Batch Export Format Transformation
| Knowledge Sources | |
|---|---|
| Domains | Batch Export, Data Serialization, Node.js Streams |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Format Transformation is the principle of converting an object-mode data stream into a serialized text stream using pluggable, stateless transform factories, enabling a single data pipeline to produce multiple output formats without duplicating the data retrieval or enrichment logic.
Description
In a batch export pipeline, the data retrieval and enrichment stages are expensive: they involve ClickHouse queries, PostgreSQL lookups, score aggregation, and comment fetching. The final output format (CSV, JSON, or JSONL) is a user preference that should not affect how data is fetched. Format Transformation decouples these concerns by introducing a Transform stream layer between the data source and the upload destination.
Each format transform is implemented as a factory function that returns a new Node.js Transform stream instance. The transform operates in object mode on its readable side (accepting JavaScript objects) and buffer mode on its writable side (emitting encoded text). This design allows the transforms to be composed into a pipeline using Node.js stream.pipeline().
The three supported formats each have distinct serialization requirements:
- CSV: The most complex transform. It must auto-detect column headers from the first row's keys, emit a header row, and then serialize each subsequent row in the same column order. Field values are stringified and double-quote escaped (doubling any internal quotes). The transform also implements cooperative scheduling: after accumulating 50ms of processing time, it yields to the event loop via
setImmediateto prevent blocking other concurrent work in the worker process.
- JSON: The output must be a valid JSON array (
[{...},{...}]). The transform pushes an opening bracket before the first element, prepends commas between elements, and pushes a closing bracket in thefinalhandler. If no rows are processed, it emits[]to ensure valid JSON.
- JSONL (JSON Lines): The simplest format. Each row is serialized as a single JSON string followed by a newline character. No array wrapping or delimiters are needed.
All three transforms use a shared stringify utility that handles special types (Dates, BigInts, Decimals, nested objects) consistently.
Usage
Use Format Transformation whenever:
- A data pipeline needs to support multiple output serialization formats.
- You want to avoid buffering the entire dataset in memory before serialization.
- The serialization logic must be composable with other stream stages (data source, logging, upload).
- You need to add new formats in the future without modifying the upstream pipeline.
Theoretical Basis
The pattern follows the Strategy Pattern combined with Stream Composition:
-- Format transform factory registry
TRANSFORMS = {
"CSV": createCsvTransform,
"JSON": createJsonTransform,
"JSONL": createJsonlTransform,
}
FUNCTION createCsvTransform():
state = { isFirstChunk: true, headers: [], processingTimeMs: 0 }
RETURN Transform(objectMode=true):
ON transform(row):
IF state.isFirstChunk:
state.headers = KEYS(row)
EMIT escapeCsv(state.headers) + "\n"
state.isFirstChunk = false
values = FOR EACH header IN state.headers:
YIELD escapeCsv(stringify(row[header]))
EMIT JOIN(values, ",") + "\n"
-- Cooperative scheduling to prevent event loop starvation
state.processingTimeMs += elapsed
IF state.processingTimeMs >= 50ms:
state.processingTimeMs = 0
YIELD_TO_EVENT_LOOP()
FUNCTION createJsonTransform():
state = { isFirstElement: true }
RETURN Transform(objectMode=true):
ON transform(row):
IF state.isFirstElement:
EMIT "["
state.isFirstElement = false
ELSE:
EMIT ","
EMIT stringify(row)
ON final():
IF state.isFirstElement:
EMIT "[]" -- empty result set
ELSE:
EMIT "]"
FUNCTION createJsonlTransform():
RETURN Transform(objectMode=true):
ON transform(row):
EMIT stringify(row) + "\n"
The CSV escape algorithm follows RFC 4180: every field is wrapped in double quotes, and any double quote within the field is escaped by doubling it (" becomes ""). This ensures that fields containing commas, newlines, or quotes are serialized correctly.
The cooperative scheduling in the CSV transform is notable: because CSV serialization involves per-field string manipulation for potentially millions of rows, the transform tracks cumulative processing time and periodically defers the callback to setImmediate. This allows other I/O operations (such as the ClickHouse stream reading or S3 upload acknowledgments) to proceed, preventing the worker from becoming unresponsive under heavy load.