Principle:Langfuse Langfuse OTel Ingestion Post Processing
| Knowledge Sources | |
|---|---|
| Domains | Ingestion, Queue Architecture, ClickHouse, Worker Processing |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
OTel Ingestion Post Processing is the principle of asynchronously consuming queued OTel ingestion jobs in a worker process to convert raw ResourceSpans into finalized trace and observation records written to the analytics database.
Description
After the web server uploads ResourceSpans to S3 and enqueues a job to BullMQ (see OTel S3 Upload and Queue Dispatch), the actual data processing happens in a separate worker process. This decoupled architecture provides several benefits:
- Independent scaling: Workers can be scaled horizontally based on queue depth without affecting web server capacity.
- Fault isolation: Processing failures do not impact API availability. Failed jobs are retried via BullMQ's built-in retry mechanism.
- Enterprise feature integration: The worker applies enterprise features like ingestion masking before processing, with fail-closed semantics (drops events on masking failure).
The post-processing pipeline has four major phases:
Phase 1 -- Download and Parse: The worker downloads the JSON file from S3 using the file key from the job payload, parses it into ResourceSpan[], and optionally applies ingestion masking.
Phase 2 -- Event Generation: The OtelIngestionProcessor.processToIngestionEvents() method converts ResourceSpans into IngestionEventType[], performing trace deduplication via Redis and shallow trace filtering.
Phase 3 -- Dual-Path Write: Events are split into two categories:
- Trace events (non-observation types) are processed via
processEventBatch(), which handles trace creation/updates in both PostgreSQL and ClickHouse. - Observation events are validated through the ingestion schema (for type transformations), then written individually via
IngestionService.mergeAndWrite(), which handles merge logic for the ClickHouse staging table.
Both paths run concurrently via Promise.all() for maximum throughput.
Phase 4 -- Post-Processing: After the primary writes, a second pass via processToEvent() generates enriched event records for two purposes:
- Observation-level evaluations: If eval configs exist for the project, observations are converted to eval-compatible format and scheduled for evaluation.
- Direct events table writes: For qualifying SDK versions (Python >= 3.9.0, JS >= 4.4.0) in experiment environments, events are written directly to the ClickHouse events table, bypassing the staging table.
Usage
This principle applies to any system where:
- Raw telemetry must be processed asynchronously from ingestion.
- Multiple downstream data stores require coordinated writes (ClickHouse for analytics, Redis for deduplication).
- Processing must support enterprise customization (masking, filtering) without modifying the core pipeline.
- SDK version-aware feature gating is needed to progressively roll out new write paths.
Theoretical Basis
WORKER RECEIVES BullMQ Job
Job payload: { fileKey, publicKey, projectId, orgId, propagatedHeaders? }
|
v
PHASE 1: DOWNLOAD AND PARSE
Download JSON from S3 via fileKey
Parse JSON -> ResourceSpan[]
|
v
IF ingestion masking enabled (EE feature):
Apply masking via applyIngestionMasking()
IF masking fails -> DROP event (fail-closed) -> RETURN
ELSE -> Use masked ResourceSpan[]
|
v
PHASE 2: EVENT GENERATION
OtelIngestionProcessor.processToIngestionEvents(resourceSpans)
-> Returns IngestionEventType[] (deduplicated)
|
v
PHASE 3: DUAL-PATH WRITE (concurrent)
|
|--- TRACE PATH:
| Filter events where entity type != "observation"
| processEventBatch(traces, auth, { delay: 0, source: "otel" })
| -> Writes to ClickHouse via existing batch processor
|
|--- OBSERVATION PATH:
| Filter events where entity type == "observation"
| Validate each through createIngestionEventSchema()
| For each valid observation:
| IngestionService.mergeAndWrite(entityType, projectId, id, timestamp, [event])
| -> Writes to ClickHouse staging table
|
v
PHASE 4: POST-PROCESSING
processToEvent(resourceSpans) -> enriched event records[]
|
IF no enriched events -> RETURN
|
v
DETERMINE processing needs:
|
|--- SDK VERSION CHECK:
| Extract sdkInfo from first resourceSpan
| Check: scopeName contains "langfuse" (case-insensitive)
| Check: Python SDK >= 3.9.0 OR JS SDK >= 4.4.0
| Check: environment == "sdk-experiment"
| -> shouldWriteToEventsTable = (version check passes AND env flag on)
|
|--- EVAL CONFIGS:
| fetchObservationEvalConfigs(projectId)
| -> hasEvalConfigs = (configs.length > 0)
|
IF !hasEvalConfigs AND !shouldWriteToEventsTable -> RETURN
|
v
FOR EACH enriched event (concurrent):
1. Create event record via IngestionService.createEventRecord()
2. IF hasEvalConfigs:
Convert to eval-compatible observation
scheduleObservationEvals(observation, configs)
3. IF shouldWriteToEventsTable:
IngestionService.writeEventRecord(eventRecord)
|
v
DONE (job completes successfully)
Key design decisions:
- Fail-closed masking: When enterprise ingestion masking is enabled and the masking service returns an error, the entire event batch is dropped rather than allowing unmasked data through. This ensures data privacy guarantees.
- Schema validation for observations: Each observation event is parsed through
createIngestionEventSchema()which applies Zod transformations. Invalid observations are logged and skipped rather than failing the entire batch. - Concurrent dual-path writes: Trace and observation writes run in parallel via
Promise.all(). This maximizes throughput at the cost of increased event loop pressure, which can be tuned if needed. - SDK version gating: Direct events table writes are gated behind both an environment variable flag (
LANGFUSE_EXPERIMENT_INSERT_INTO_EVENTS_TABLE) and SDK version checks. This allows progressive rollout of the new write path as SDKs adopt the required protocol changes. - Independent error handling in Phase 4: Each event record's eval scheduling and events table write are independently try/caught. A failure in one observation does not prevent processing of others.
- Events table forwarding logic: Non-qualifying observations (older SDK versions, non-Langfuse SDKs) can still be forwarded to the events table via a separate code path (
shouldForwardToEventsTable) controlled by environment flags, providing a migration path.