Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse HandleBatchExportJob

From Leeroopedia
Knowledge Sources
Domains Batch Export, Worker, Data Pipeline
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for validating and orchestrating the full batch export pipeline -- from precondition checks through data streaming, format transformation, S3 upload, and email notification -- provided by Langfuse.

Description

handleBatchExportJob is the top-level worker function invoked by the BullMQ consumer when a BatchExportJob is dequeued. It orchestrates the entire export lifecycle:

  1. Validation: Checks that LANGFUSE_S3_BATCH_EXPORT_ENABLED is "true", retrieves the job record from PostgreSQL, and applies guard clauses for cancellation, staleness (30-day cutoff), and unexpected status.
  2. Query parsing: Validates the stored query against BatchExportQuerySchema using Zod. Preprocesses comment-based filters by resolving them via applyCommentFilters, which queries comment records and converts them into ID-based filters.
  3. Stream selection: Routes to the appropriate ClickHouse streaming function based on tableName: getObservationStream for observations, getTraceStream for traces, getEventsStream for events, and getDatabaseReadStreamPaginated for all other table types (scores, sessions, dataset items, audit logs).
  4. Format transformation: Pipes the data stream through a logging transform (which counts rows and logs progress every 5,000 rows) and then through the appropriate format transform (streamTransformations[format]()).
  5. Upload: Uploads the resulting file stream to blob storage via StorageServiceFactory.getInstance().uploadWithSignedUrl(), using multipart upload with configurable part size and queue size.
  6. Completion: Updates the job record to COMPLETED with the signed download URL and expiration timestamp, then sends a success email to the requesting user.

Usage

This function is registered as the processor for the BatchExportQueue in the Langfuse worker application. It is not called directly by application code; instead, it is invoked by BullMQ when a job becomes available.

Code Reference

Source Location

  • Repository: langfuse
  • File: worker/src/features/batchExport/handleBatchExportJob.ts
  • Lines: 34-301

Signature

export const handleBatchExportJob = async (
  batchExportJob: BatchExportJobType,
): Promise<void>

Where:

type BatchExportJobType = {
  projectId: string;
  batchExportId: string;
};

Import

import { handleBatchExportJob } from "../features/batchExport/handleBatchExportJob";

I/O Contract

Inputs

Name Type Required Description
batchExportJob.projectId string Yes The project ID to scope all database queries and storage paths.
batchExportJob.batchExportId string Yes The ID of the batchExport record in PostgreSQL, used to retrieve job details, update status, and deduplicate.

Outputs

Name Type Description
(void) Promise<void> The function resolves on success. Side effects: job status transitions through PROCESSING to COMPLETED; export file uploaded to S3 with signed URL; email notification sent to the requesting user. On validation failure, the function either returns early (cancelled/stale) or throws an error (feature disabled, record not found, invalid query).

Usage Examples

Registering the handler with BullMQ

import { Worker } from "bullmq";
import { handleBatchExportJob } from "../features/batchExport/handleBatchExportJob";
import { BatchExportQueue, QueueJobs } from "@langfuse/shared/src/server";

const worker = new Worker(
  BatchExportQueue.QUEUE_NAME,
  async (job) => {
    if (job.name === QueueJobs.BatchExportJob) {
      await handleBatchExportJob(job.data.payload);
    }
  },
  { connection: redisConnection },
);

Stream pipeline constructed internally

// Simplified view of the internal pipeline:
const dbReadStream = await getTraceStream({
  projectId,
  cutoffCreatedAt: jobDetails.createdAt,
  filter: processedFilter,
});

const fileStream = pipeline(
  dbReadStream,
  loggingTransform,              // counts rows, logs every 5000
  streamTransformations["CSV"](), // format-specific transform
  (err) => { if (err) logger.error(err); },
);

const { signedUrl } = await StorageServiceFactory.getInstance({
  bucketName: env.LANGFUSE_S3_BATCH_EXPORT_BUCKET,
  // ...S3 config
}).uploadWithSignedUrl({
  fileName: `${prefix}${timestamp}-lf-traces-export-${projectId}.csv`,
  fileType: "text/csv; charset=utf-8",
  data: fileStream,
  expiresInSeconds: env.BATCH_EXPORT_DOWNLOAD_LINK_EXPIRATION_HOURS * 3600,
  partSize: env.BATCH_EXPORT_S3_PART_SIZE_MIB * 1024 * 1024,
  queueSize: 4,
});

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment