Implementation:Langfuse Langfuse ProcessEventBatch
| Knowledge Sources | |
|---|---|
| Domains | Event Processing, Trace Ingestion |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for orchestrating the full ingestion pipeline from raw event validation through S3 upload to queue dispatch provided by Langfuse.
Description
processEventBatch is the central function that drives the Langfuse trace ingestion pipeline. It accepts a batch of raw events, an authentication context, and optional processing options, then:
- Validates each event against the
ingestionEventZod discriminated union, collecting per-event validation errors. - Authorizes each validated event against the caller's access scope (
"project"or"scores"level). - Filters out SDK log events after logging them.
- Sorts the batch: creation events first (timestamp ascending), then update events (timestamp ascending).
- Groups events by
{entityType}-{eventBodyId}to batch S3 uploads per entity. - Uploads each group to S3 as a JSON file at the path
{prefix}/{projectId}/{entityType}/{eventBodyId}/{eventId}.json. - Checks sampling for each group using
isTraceIdInSample. Drops unsampled groups with metric recording. - Dispatches a BullMQ job to the appropriate
IngestionQueueshard for each surviving group. The job payload contains the S3 file key, auth context, event type, and flags. - Returns a structured result with per-event success/error statuses.
Error handling is designed for partial success: validation and authorization failures are collected and returned alongside successes. The function only throws for systemic failures (missing project ID, S3 upload failure, Redis unavailable).
The function also records OpenTelemetry metrics (langfuse.ingestion.event counter, langfuse.ingestion.event_distribution histogram, langfuse.ingestion.sampling counter) and sets span attributes for the project, organization, and batch size.
Usage
This function is called from:
- The REST API ingestion endpoint (
/api/public/ingestion) - The OTel ingestion endpoint (after converting OTel spans to Langfuse event format)
Code Reference
Source Location
- Repository: langfuse
- File:
packages/shared/src/server/ingestion/processEventBatch.ts - Lines: L104-464
Signature
export const processEventBatch = async (
input: unknown[],
authCheck: AuthHeaderValidVerificationResultIngestion,
options?: ProcessEventBatchOptions,
): Promise<{
successes: { id: string; status: number }[];
errors: {
id: string;
status: number;
message?: string;
error?: string;
}[];
}>
Import
import { processEventBatch } from "@langfuse/shared/src/server/ingestion/processEventBatch";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| input | unknown[] |
Yes | Array of raw ingestion events from the HTTP request body. Each element should conform to the IngestionEventType shape, but validation is performed internally.
|
| authCheck | AuthHeaderValidVerificationResultIngestion |
Yes | Authentication result containing scope.projectId, scope.accessLevel, scope.orgId, and scope.plan.
|
| options.delay | null | No | Override for queue job delay in milliseconds. null uses the default delay logic.
|
| options.source | "otel" | No | Source identifier for metrics. Defaults to "api". OTel events use 0 delay.
|
| options.isLangfuseInternal | boolean |
No | When true, allows internal environment names (e.g., "langfuse-experiments"). Defaults to false.
|
| options.forwardToEventsTable | boolean |
No | Whether to forward events to the staging events table for batch propagation. Falls back to environment flags if undefined. |
Outputs
| Name | Type | Description |
|---|---|---|
| successes | { id: string; status: number }[] |
Array of successfully processed event IDs with HTTP status 201. |
| errors | { id: string; status: number; message?: string; error?: string }[] |
Array of failed events with HTTP status codes: 400 (validation), 401 (auth), 404 (not found), or 500 (internal). |
Usage Examples
Processing a Batch from the REST API
import { processEventBatch } from "@langfuse/shared/src/server/ingestion/processEventBatch";
// Called from the /api/public/ingestion endpoint handler
const result = await processEventBatch(
req.body.batch, // Array of raw events from SDK
{
validKey: true,
scope: {
projectId: "proj_abc123",
accessLevel: "project",
orgId: "org_xyz",
plan: "cloud:pro",
rateLimitOverrides: [],
apiKeyId: "key_001",
publicKey: "pk_abc",
},
},
{
source: "api",
delay: null, // Use default delay logic
},
);
// Return multi-status response to the SDK
res.status(207).json(result);
// result.successes: [{ id: "evt_1", status: 201 }, { id: "evt_3", status: 201 }]
// result.errors: [{ id: "evt_2", status: 400, message: "Invalid request data", error: "..." }]
Processing OTel Events with Zero Delay
import { processEventBatch } from "@langfuse/shared/src/server/ingestion/processEventBatch";
// OTel events have already been batched by the OTel collector, so no delay needed
const result = await processEventBatch(
convertedOtelEvents,
authCheck,
{
source: "otel",
delay: null, // Will resolve to 0 for OTel source
},
);
Internal Langfuse Events (Prompt Experiments)
import { processEventBatch } from "@langfuse/shared/src/server/ingestion/processEventBatch";
// Internal events can use "langfuse-*" environment names
const result = await processEventBatch(
experimentTraceEvents,
internalAuthCheck,
{
isLangfuseInternal: true,
delay: 0,
source: "api",
},
);
Handling the Response
import { processEventBatch } from "@langfuse/shared/src/server/ingestion/processEventBatch";
try {
const { successes, errors } = await processEventBatch(events, authCheck);
// Log any partial failures
if (errors.length > 0) {
console.warn(`${errors.length} events failed validation or auth`, errors);
}
console.log(`${successes.length} events dispatched to queue`);
} catch (err) {
// Systemic failure: S3 upload failed, Redis unavailable, or missing project ID
console.error("Batch processing failed entirely", err);
throw err;
}
Related Pages
Implements Principle
- Principle:Langfuse_Langfuse_Worker_Event_Processing
- Environment:Langfuse_Langfuse_ClickHouse_Analytics
- Environment:Langfuse_Langfuse_Redis_7_Queue_Cache
- Environment:Langfuse_Langfuse_S3_Compatible_Storage
- Heuristic:Langfuse_Langfuse_Ingestion_Date_Boundary_Delay
- Heuristic:Langfuse_Langfuse_Fail_Open_Resilience_Pattern