Implementation:Langfuse Langfuse HandleBatchExportJob
| Knowledge Sources | |
|---|---|
| Domains | Batch Export, Worker, Data Pipeline |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for validating and orchestrating the full batch export pipeline -- from precondition checks through data streaming, format transformation, S3 upload, and email notification -- provided by Langfuse.
Description
handleBatchExportJob is the top-level worker function invoked by the BullMQ consumer when a BatchExportJob is dequeued. It orchestrates the entire export lifecycle:
- Validation: Checks that
LANGFUSE_S3_BATCH_EXPORT_ENABLEDis"true", retrieves the job record from PostgreSQL, and applies guard clauses for cancellation, staleness (30-day cutoff), and unexpected status. - Query parsing: Validates the stored query against
BatchExportQuerySchemausing Zod. Preprocesses comment-based filters by resolving them viaapplyCommentFilters, which queries comment records and converts them into ID-based filters. - Stream selection: Routes to the appropriate ClickHouse streaming function based on
tableName:getObservationStreamfor observations,getTraceStreamfor traces,getEventsStreamfor events, andgetDatabaseReadStreamPaginatedfor all other table types (scores, sessions, dataset items, audit logs). - Format transformation: Pipes the data stream through a logging transform (which counts rows and logs progress every 5,000 rows) and then through the appropriate format transform (
streamTransformations[format]()). - Upload: Uploads the resulting file stream to blob storage via
StorageServiceFactory.getInstance().uploadWithSignedUrl(), using multipart upload with configurable part size and queue size. - Completion: Updates the job record to COMPLETED with the signed download URL and expiration timestamp, then sends a success email to the requesting user.
Usage
This function is registered as the processor for the BatchExportQueue in the Langfuse worker application. It is not called directly by application code; instead, it is invoked by BullMQ when a job becomes available.
Code Reference
Source Location
- Repository: langfuse
- File: worker/src/features/batchExport/handleBatchExportJob.ts
- Lines: 34-301
Signature
export const handleBatchExportJob = async (
batchExportJob: BatchExportJobType,
): Promise<void>
Where:
type BatchExportJobType = {
projectId: string;
batchExportId: string;
};
Import
import { handleBatchExportJob } from "../features/batchExport/handleBatchExportJob";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| batchExportJob.projectId | string | Yes | The project ID to scope all database queries and storage paths. |
| batchExportJob.batchExportId | string | Yes | The ID of the batchExport record in PostgreSQL, used to retrieve job details, update status, and deduplicate. |
Outputs
| Name | Type | Description |
|---|---|---|
| (void) | Promise<void> | The function resolves on success. Side effects: job status transitions through PROCESSING to COMPLETED; export file uploaded to S3 with signed URL; email notification sent to the requesting user. On validation failure, the function either returns early (cancelled/stale) or throws an error (feature disabled, record not found, invalid query). |
Usage Examples
Registering the handler with BullMQ
import { Worker } from "bullmq";
import { handleBatchExportJob } from "../features/batchExport/handleBatchExportJob";
import { BatchExportQueue, QueueJobs } from "@langfuse/shared/src/server";
const worker = new Worker(
BatchExportQueue.QUEUE_NAME,
async (job) => {
if (job.name === QueueJobs.BatchExportJob) {
await handleBatchExportJob(job.data.payload);
}
},
{ connection: redisConnection },
);
Stream pipeline constructed internally
// Simplified view of the internal pipeline:
const dbReadStream = await getTraceStream({
projectId,
cutoffCreatedAt: jobDetails.createdAt,
filter: processedFilter,
});
const fileStream = pipeline(
dbReadStream,
loggingTransform, // counts rows, logs every 5000
streamTransformations["CSV"](), // format-specific transform
(err) => { if (err) logger.error(err); },
);
const { signedUrl } = await StorageServiceFactory.getInstance({
bucketName: env.LANGFUSE_S3_BATCH_EXPORT_BUCKET,
// ...S3 config
}).uploadWithSignedUrl({
fileName: `${prefix}${timestamp}-lf-traces-export-${projectId}.csv`,
fileType: "text/csv; charset=utf-8",
data: fileStream,
expiresInSeconds: env.BATCH_EXPORT_DOWNLOAD_LINK_EXPIRATION_HOURS * 3600,
partSize: env.BATCH_EXPORT_S3_PART_SIZE_MIB * 1024 * 1024,
queueSize: 4,
});