Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse ProcessEventBatch

From Leeroopedia
Knowledge Sources
Domains Event Processing, Trace Ingestion
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for orchestrating the full ingestion pipeline from raw event validation through S3 upload to queue dispatch provided by Langfuse.

Description

processEventBatch is the central function that drives the Langfuse trace ingestion pipeline. It accepts a batch of raw events, an authentication context, and optional processing options, then:

  1. Validates each event against the ingestionEvent Zod discriminated union, collecting per-event validation errors.
  2. Authorizes each validated event against the caller's access scope ("project" or "scores" level).
  3. Filters out SDK log events after logging them.
  4. Sorts the batch: creation events first (timestamp ascending), then update events (timestamp ascending).
  5. Groups events by {entityType}-{eventBodyId} to batch S3 uploads per entity.
  6. Uploads each group to S3 as a JSON file at the path {prefix}/{projectId}/{entityType}/{eventBodyId}/{eventId}.json.
  7. Checks sampling for each group using isTraceIdInSample. Drops unsampled groups with metric recording.
  8. Dispatches a BullMQ job to the appropriate IngestionQueue shard for each surviving group. The job payload contains the S3 file key, auth context, event type, and flags.
  9. Returns a structured result with per-event success/error statuses.

Error handling is designed for partial success: validation and authorization failures are collected and returned alongside successes. The function only throws for systemic failures (missing project ID, S3 upload failure, Redis unavailable).

The function also records OpenTelemetry metrics (langfuse.ingestion.event counter, langfuse.ingestion.event_distribution histogram, langfuse.ingestion.sampling counter) and sets span attributes for the project, organization, and batch size.

Usage

This function is called from:

  • The REST API ingestion endpoint (/api/public/ingestion)
  • The OTel ingestion endpoint (after converting OTel spans to Langfuse event format)

Code Reference

Source Location

  • Repository: langfuse
  • File: packages/shared/src/server/ingestion/processEventBatch.ts
  • Lines: L104-464

Signature

export const processEventBatch = async (
  input: unknown[],
  authCheck: AuthHeaderValidVerificationResultIngestion,
  options?: ProcessEventBatchOptions,
): Promise<{
  successes: { id: string; status: number }[];
  errors: {
    id: string;
    status: number;
    message?: string;
    error?: string;
  }[];
}>

Import

import { processEventBatch } from "@langfuse/shared/src/server/ingestion/processEventBatch";

I/O Contract

Inputs

Name Type Required Description
input unknown[] Yes Array of raw ingestion events from the HTTP request body. Each element should conform to the IngestionEventType shape, but validation is performed internally.
authCheck AuthHeaderValidVerificationResultIngestion Yes Authentication result containing scope.projectId, scope.accessLevel, scope.orgId, and scope.plan.
options.delay null No Override for queue job delay in milliseconds. null uses the default delay logic.
options.source "otel" No Source identifier for metrics. Defaults to "api". OTel events use 0 delay.
options.isLangfuseInternal boolean No When true, allows internal environment names (e.g., "langfuse-experiments"). Defaults to false.
options.forwardToEventsTable boolean No Whether to forward events to the staging events table for batch propagation. Falls back to environment flags if undefined.

Outputs

Name Type Description
successes { id: string; status: number }[] Array of successfully processed event IDs with HTTP status 201.
errors { id: string; status: number; message?: string; error?: string }[] Array of failed events with HTTP status codes: 400 (validation), 401 (auth), 404 (not found), or 500 (internal).

Usage Examples

Processing a Batch from the REST API

import { processEventBatch } from "@langfuse/shared/src/server/ingestion/processEventBatch";

// Called from the /api/public/ingestion endpoint handler
const result = await processEventBatch(
  req.body.batch,  // Array of raw events from SDK
  {
    validKey: true,
    scope: {
      projectId: "proj_abc123",
      accessLevel: "project",
      orgId: "org_xyz",
      plan: "cloud:pro",
      rateLimitOverrides: [],
      apiKeyId: "key_001",
      publicKey: "pk_abc",
    },
  },
  {
    source: "api",
    delay: null,  // Use default delay logic
  },
);

// Return multi-status response to the SDK
res.status(207).json(result);
// result.successes: [{ id: "evt_1", status: 201 }, { id: "evt_3", status: 201 }]
// result.errors: [{ id: "evt_2", status: 400, message: "Invalid request data", error: "..." }]

Processing OTel Events with Zero Delay

import { processEventBatch } from "@langfuse/shared/src/server/ingestion/processEventBatch";

// OTel events have already been batched by the OTel collector, so no delay needed
const result = await processEventBatch(
  convertedOtelEvents,
  authCheck,
  {
    source: "otel",
    delay: null,  // Will resolve to 0 for OTel source
  },
);

Internal Langfuse Events (Prompt Experiments)

import { processEventBatch } from "@langfuse/shared/src/server/ingestion/processEventBatch";

// Internal events can use "langfuse-*" environment names
const result = await processEventBatch(
  experimentTraceEvents,
  internalAuthCheck,
  {
    isLangfuseInternal: true,
    delay: 0,
    source: "api",
  },
);

Handling the Response

import { processEventBatch } from "@langfuse/shared/src/server/ingestion/processEventBatch";

try {
  const { successes, errors } = await processEventBatch(events, authCheck);

  // Log any partial failures
  if (errors.length > 0) {
    console.warn(`${errors.length} events failed validation or auth`, errors);
  }

  console.log(`${successes.length} events dispatched to queue`);
} catch (err) {
  // Systemic failure: S3 upload failed, Redis unavailable, or missing project ID
  console.error("Batch processing failed entirely", err);
  throw err;
}

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment