Implementation:Langfuse Langfuse BatchExportRouter Create
| Knowledge Sources | |
|---|---|
| Domains | Batch Export, tRPC, Job Scheduling |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for creating batch export jobs and dispatching them to the BullMQ worker queue, provided by Langfuse.
Description
The batchExportRouter.create endpoint is a tRPC protected project procedure that handles the full lifecycle of initiating a batch export. It validates user permissions using RBAC scopes, creates a batchExport record in PostgreSQL via Prisma with an initial status of QUEUED, logs the action to the audit log, and then enqueues a BatchExportJob message onto the BullMQ BatchExportQueue. The enqueued message uses the export record's ID as a deduplication key to prevent duplicate processing.
The router also exposes a cancel mutation (which sets status to CANCELLED) and an all query (which lists exports with pagination, user lookup, and expiration detection), but the create mutation is the primary entry point for the batch export pipeline.
Usage
Import and mount this router within the tRPC API root. The create mutation is called from the Langfuse web UI when a user clicks an export button on a table view (traces, observations, sessions, scores, etc.). The frontend passes the current filter state, table name, desired format, and a human-readable export name.
Code Reference
Source Location
- Repository: langfuse
- File: web/src/features/batch-exports/server/batchExport.ts
- Lines: 21-78
Signature
batchExportRouter.create: protectedProjectProcedure
.input(CreateBatchExportSchema)
.mutation(async ({ input, ctx }) => void)
Import
import { batchExportRouter } from "@/src/features/batch-exports/server/batchExport";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| projectId | string | Yes | The ID of the project from which to export data. |
| name | string | Yes | A human-readable name for the export (displayed in the UI and email notifications). |
| query | BatchExportQuerySchema | Yes | An object describing the data to export: { tableName, filter, searchQuery?, searchType?, orderBy, limit?, page? }. The tableName is one of: traces, observations, events, scores, sessions, dataset_run_items, dataset_items, audit_logs.
|
| format | "JSON" | "JSONL" | Yes | The desired output file format. |
| ctx.session | Session | Yes (implicit) | The authenticated user session, provided by tRPC middleware. Used for RBAC checks and to record the userId on the export record. |
Outputs
| Name | Type | Description |
|---|---|---|
| (void) | void | The mutation does not return a value. On success, a batchExport record is created in the database with status QUEUED and a BullMQ job is enqueued. On failure, a TRPCError is thrown with code INTERNAL_SERVER_ERROR.
|
Usage Examples
Creating a CSV export of traces
// From a tRPC client in the Next.js frontend
const utils = api.useUtils();
await api.batchExport.create.mutate({
projectId: "7a88fb47-b4e2-43b8-a06c-a5ce950dc53a",
name: "Traces Export 2026-02-14",
format: "CSV",
query: {
tableName: "traces",
filter: [
{
column: "Timestamp",
operator: ">",
value: new Date("2026-01-01"),
type: "datetime",
},
],
orderBy: { column: "timestamp", order: "DESC" },
},
});
Internal queue message structure
// The message enqueued to BatchExportQueue:
{
id: "export-record-uuid", // deduplication key
name: "batch-export-job",
timestamp: new Date(),
payload: {
batchExportId: "export-record-uuid",
projectId: "7a88fb47-b4e2-43b8-a06c-a5ce950dc53a",
},
}