Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:Langfuse Langfuse Batch Export Format Transformation

From Leeroopedia
Knowledge Sources
Domains Batch Export, Data Serialization, Node.js Streams
Last Updated 2026-02-14 00:00 GMT

Overview

Format Transformation is the principle of converting an object-mode data stream into a serialized text stream using pluggable, stateless transform factories, enabling a single data pipeline to produce multiple output formats without duplicating the data retrieval or enrichment logic.

Description

In a batch export pipeline, the data retrieval and enrichment stages are expensive: they involve ClickHouse queries, PostgreSQL lookups, score aggregation, and comment fetching. The final output format (CSV, JSON, or JSONL) is a user preference that should not affect how data is fetched. Format Transformation decouples these concerns by introducing a Transform stream layer between the data source and the upload destination.

Each format transform is implemented as a factory function that returns a new Node.js Transform stream instance. The transform operates in object mode on its readable side (accepting JavaScript objects) and buffer mode on its writable side (emitting encoded text). This design allows the transforms to be composed into a pipeline using Node.js stream.pipeline().

The three supported formats each have distinct serialization requirements:

  1. CSV: The most complex transform. It must auto-detect column headers from the first row's keys, emit a header row, and then serialize each subsequent row in the same column order. Field values are stringified and double-quote escaped (doubling any internal quotes). The transform also implements cooperative scheduling: after accumulating 50ms of processing time, it yields to the event loop via setImmediate to prevent blocking other concurrent work in the worker process.
  1. JSON: The output must be a valid JSON array ([{...},{...}]). The transform pushes an opening bracket before the first element, prepends commas between elements, and pushes a closing bracket in the final handler. If no rows are processed, it emits [] to ensure valid JSON.
  1. JSONL (JSON Lines): The simplest format. Each row is serialized as a single JSON string followed by a newline character. No array wrapping or delimiters are needed.

All three transforms use a shared stringify utility that handles special types (Dates, BigInts, Decimals, nested objects) consistently.

Usage

Use Format Transformation whenever:

  • A data pipeline needs to support multiple output serialization formats.
  • You want to avoid buffering the entire dataset in memory before serialization.
  • The serialization logic must be composable with other stream stages (data source, logging, upload).
  • You need to add new formats in the future without modifying the upstream pipeline.

Theoretical Basis

The pattern follows the Strategy Pattern combined with Stream Composition:

-- Format transform factory registry
TRANSFORMS = {
  "CSV":  createCsvTransform,
  "JSON": createJsonTransform,
  "JSONL": createJsonlTransform,
}

FUNCTION createCsvTransform():
  state = { isFirstChunk: true, headers: [], processingTimeMs: 0 }

  RETURN Transform(objectMode=true):
    ON transform(row):
      IF state.isFirstChunk:
        state.headers = KEYS(row)
        EMIT escapeCsv(state.headers) + "\n"
        state.isFirstChunk = false

      values = FOR EACH header IN state.headers:
        YIELD escapeCsv(stringify(row[header]))
      EMIT JOIN(values, ",") + "\n"

      -- Cooperative scheduling to prevent event loop starvation
      state.processingTimeMs += elapsed
      IF state.processingTimeMs >= 50ms:
        state.processingTimeMs = 0
        YIELD_TO_EVENT_LOOP()

FUNCTION createJsonTransform():
  state = { isFirstElement: true }

  RETURN Transform(objectMode=true):
    ON transform(row):
      IF state.isFirstElement:
        EMIT "["
        state.isFirstElement = false
      ELSE:
        EMIT ","
      EMIT stringify(row)

    ON final():
      IF state.isFirstElement:
        EMIT "[]"   -- empty result set
      ELSE:
        EMIT "]"

FUNCTION createJsonlTransform():
  RETURN Transform(objectMode=true):
    ON transform(row):
      EMIT stringify(row) + "\n"

The CSV escape algorithm follows RFC 4180: every field is wrapped in double quotes, and any double quote within the field is escaped by doubling it (" becomes ""). This ensures that fields containing commas, newlines, or quotes are serialized correctly.

The cooperative scheduling in the CSV transform is notable: because CSV serialization involves per-field string manipulation for potentially millions of rows, the transform tracks cumulative processing time and periodically defers the callback to setImmediate. This allows other I/O operations (such as the ClickHouse stream reading or S3 upload acknowledgments) to proceed, preventing the worker from becoming unresponsive under heavy load.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment