Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Langfuse Langfuse Worker Event Processing

From Leeroopedia
Knowledge Sources
Domains Event Processing, Trace Ingestion
Last Updated 2026-02-14 00:00 GMT

Overview

Worker event processing is the orchestration of validating, grouping, sampling, persisting, and dispatching ingestion events within a batch, transforming raw SDK payloads into durable queue jobs ready for downstream ClickHouse insertion.

Description

The processEventBatch function is the central orchestrator of the Langfuse trace ingestion pipeline. It is called by both the REST API ingestion endpoint and the OTel ingestion endpoint. Despite being a single function, it encapsulates the full pipeline from raw input to queue dispatch:

  1. Schema validation: Each event in the batch is parsed against the ingestionEvent discriminated union schema. Events that fail validation are collected into an error list rather than aborting the entire batch. This partial-success model allows well-formed events to proceed even when some events in the same batch are malformed.
  1. Authorization checking: After validation, each event is checked against the caller's access scope. Score creation events can be submitted with "scores" access level, but all other event types require "project" access level. SDK log events are always authorized since they carry diagnostic information only.
  1. SDK log filtering: Events of type sdk-log are logged for diagnostic purposes and then removed from the processing pipeline. They do not produce traces or observations.
  1. Batch sorting: The remaining events are sorted so that creation events come before update events, and within each category, events are sorted by timestamp ascending. This ordering ensures that when multiple events for the same entity arrive in a single batch, they are processed in the correct causal order.
  1. Entity grouping: Events are grouped by a composite key of {entityType}-{eventBodyId}. This grouping is critical for the S3 upload step, where events for the same entity are stored in a single JSON file, reducing the number of S3 operations and enabling the worker to process all updates for an entity atomically.
  1. S3 upload: Each group is serialized and uploaded to blob storage. The upload uses Promise.allSettled to handle partial failures without losing successful uploads.
  1. Sampling check: Before queue dispatch, each event group is checked against the project's sampling configuration. Events that fall outside the sample are dropped with a metric increment for monitoring.
  1. Queue dispatch: For each surviving event group, a BullMQ job is added to the appropriate shard of the ingestion queue. The job contains the S3 file key, auth context, and event metadata.
  1. Result aggregation: Validation errors, authorization errors, and successful event IDs are aggregated into a structured response that the API returns to the caller.

Usage

Apply this principle whenever:

  • Implementing a new ingestion endpoint that needs to accept event batches.
  • Adding a new entity type (beyond trace, observation, score, dataset_run_item) to the pipeline.
  • Modifying the processing order or grouping logic for events.
  • Adding new metrics or observability instrumentation to the ingestion path.

Theoretical Basis

Pipeline Architecture

Input: unknown[]  (raw JSON events from HTTP request body)
  |
  v
[1. VALIDATE] -- Parse each event against ingestionEvent Zod schema
  |                Collect validation errors
  v
[2. AUTHORIZE] -- Check event type against auth scope
  |                Collect authorization errors
  v
[3. FILTER] -- Remove sdk-log events (log and discard)
  |
  v
[4. SORT] -- Creation events first, then updates; each sorted by timestamp
  |
  v
[5. GROUP] -- Group by "{entityType}-{eventBodyId}"
  |
  v
[6. S3 UPLOAD] -- Upload each group as JSON to blob storage
  |                Use Promise.allSettled for partial failure tolerance
  |                Abort all queue dispatch if any upload fails
  v
[7. SAMPLE] -- Check each group against project sampling config
  |              Drop groups outside sample; record metrics
  v
[8. QUEUE DISPATCH] -- Add BullMQ job per group to sharded IngestionQueue
  |                      Include S3 file key, auth context, delay
  v
[9. AGGREGATE] -- Combine validation errors, auth errors, and successes
  |                Return { successes: [...], errors: [...] }
  v
Output: API response to caller

Partial Success Model

The function never throws for individual event failures.
Instead, it returns a structured result:

{
  successes: [
    { id: "evt_1", status: 201 },
    { id: "evt_3", status: 201 }
  ],
  errors: [
    { id: "evt_2", status: 400, message: "Invalid request data", error: "..." },
    { id: "evt_4", status: 401, message: "Authentication error", error: "..." }
  ]
}

The only conditions that cause the entire function to throw:
  - Empty auth scope (missing project ID) -> UnauthorizedError
  - S3 upload failure for any group -> Error (prevents queue dispatch)
  - Redis not initialized -> Error

Observability

The function records the following metrics via OpenTelemetry:

  langfuse.ingestion.event            -- Counter: total events received (by source)
  langfuse.ingestion.event_distribution -- Distribution: batch size histogram
  langfuse.ingestion.sampling         -- Counter: sampling decisions (in/out by project)

Span attributes set on the current trace:
  langfuse.ingestion.batch_size  -- Number of events in the batch
  langfuse.project.id            -- Project ID from auth
  langfuse.org.id                -- Organization ID from auth
  langfuse.org.plan              -- Billing plan from auth

Date Boundary Handling

A subtle but important detail is the queue delay logic around UTC date boundaries. ClickHouse tables are typically partitioned by date, and events processed exactly at the date boundary could be written to the wrong partition if processed before related events that should appear in the same partition. The configurable delay (default from LANGFUSE_INGESTION_QUEUE_DELAY_MS) between 23:45 and 00:15 UTC ensures that events arriving near the boundary are held long enough for related events to arrive, preventing cross-partition duplicates.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment