Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Langfuse Langfuse OTel Ingestion Post Processing

From Leeroopedia
Knowledge Sources
Domains Ingestion, Queue Architecture, ClickHouse, Worker Processing
Last Updated 2026-02-14 00:00 GMT

Overview

OTel Ingestion Post Processing is the principle of asynchronously consuming queued OTel ingestion jobs in a worker process to convert raw ResourceSpans into finalized trace and observation records written to the analytics database.

Description

After the web server uploads ResourceSpans to S3 and enqueues a job to BullMQ (see OTel S3 Upload and Queue Dispatch), the actual data processing happens in a separate worker process. This decoupled architecture provides several benefits:

  • Independent scaling: Workers can be scaled horizontally based on queue depth without affecting web server capacity.
  • Fault isolation: Processing failures do not impact API availability. Failed jobs are retried via BullMQ's built-in retry mechanism.
  • Enterprise feature integration: The worker applies enterprise features like ingestion masking before processing, with fail-closed semantics (drops events on masking failure).

The post-processing pipeline has four major phases:

Phase 1 -- Download and Parse: The worker downloads the JSON file from S3 using the file key from the job payload, parses it into ResourceSpan[], and optionally applies ingestion masking.

Phase 2 -- Event Generation: The OtelIngestionProcessor.processToIngestionEvents() method converts ResourceSpans into IngestionEventType[], performing trace deduplication via Redis and shallow trace filtering.

Phase 3 -- Dual-Path Write: Events are split into two categories:

  • Trace events (non-observation types) are processed via processEventBatch(), which handles trace creation/updates in both PostgreSQL and ClickHouse.
  • Observation events are validated through the ingestion schema (for type transformations), then written individually via IngestionService.mergeAndWrite(), which handles merge logic for the ClickHouse staging table.

Both paths run concurrently via Promise.all() for maximum throughput.

Phase 4 -- Post-Processing: After the primary writes, a second pass via processToEvent() generates enriched event records for two purposes:

  • Observation-level evaluations: If eval configs exist for the project, observations are converted to eval-compatible format and scheduled for evaluation.
  • Direct events table writes: For qualifying SDK versions (Python >= 3.9.0, JS >= 4.4.0) in experiment environments, events are written directly to the ClickHouse events table, bypassing the staging table.

Usage

This principle applies to any system where:

  • Raw telemetry must be processed asynchronously from ingestion.
  • Multiple downstream data stores require coordinated writes (ClickHouse for analytics, Redis for deduplication).
  • Processing must support enterprise customization (masking, filtering) without modifying the core pipeline.
  • SDK version-aware feature gating is needed to progressively roll out new write paths.

Theoretical Basis

WORKER RECEIVES BullMQ Job
    Job payload: { fileKey, publicKey, projectId, orgId, propagatedHeaders? }
    |
    v
PHASE 1: DOWNLOAD AND PARSE
    Download JSON from S3 via fileKey
    Parse JSON -> ResourceSpan[]
    |
    v
    IF ingestion masking enabled (EE feature):
        Apply masking via applyIngestionMasking()
        IF masking fails -> DROP event (fail-closed) -> RETURN
        ELSE -> Use masked ResourceSpan[]
    |
    v
PHASE 2: EVENT GENERATION
    OtelIngestionProcessor.processToIngestionEvents(resourceSpans)
    -> Returns IngestionEventType[] (deduplicated)
    |
    v
PHASE 3: DUAL-PATH WRITE (concurrent)
    |
    |--- TRACE PATH:
    |    Filter events where entity type != "observation"
    |    processEventBatch(traces, auth, { delay: 0, source: "otel" })
    |    -> Writes to ClickHouse via existing batch processor
    |
    |--- OBSERVATION PATH:
    |    Filter events where entity type == "observation"
    |    Validate each through createIngestionEventSchema()
    |    For each valid observation:
    |        IngestionService.mergeAndWrite(entityType, projectId, id, timestamp, [event])
    |    -> Writes to ClickHouse staging table
    |
    v
PHASE 4: POST-PROCESSING
    processToEvent(resourceSpans) -> enriched event records[]
    |
    IF no enriched events -> RETURN
    |
    v
    DETERMINE processing needs:
    |
    |--- SDK VERSION CHECK:
    |    Extract sdkInfo from first resourceSpan
    |    Check: scopeName contains "langfuse" (case-insensitive)
    |    Check: Python SDK >= 3.9.0 OR JS SDK >= 4.4.0
    |    Check: environment == "sdk-experiment"
    |    -> shouldWriteToEventsTable = (version check passes AND env flag on)
    |
    |--- EVAL CONFIGS:
    |    fetchObservationEvalConfigs(projectId)
    |    -> hasEvalConfigs = (configs.length > 0)
    |
    IF !hasEvalConfigs AND !shouldWriteToEventsTable -> RETURN
    |
    v
    FOR EACH enriched event (concurrent):
        1. Create event record via IngestionService.createEventRecord()
        2. IF hasEvalConfigs:
               Convert to eval-compatible observation
               scheduleObservationEvals(observation, configs)
        3. IF shouldWriteToEventsTable:
               IngestionService.writeEventRecord(eventRecord)
    |
    v
DONE (job completes successfully)

Key design decisions:

  • Fail-closed masking: When enterprise ingestion masking is enabled and the masking service returns an error, the entire event batch is dropped rather than allowing unmasked data through. This ensures data privacy guarantees.
  • Schema validation for observations: Each observation event is parsed through createIngestionEventSchema() which applies Zod transformations. Invalid observations are logged and skipped rather than failing the entire batch.
  • Concurrent dual-path writes: Trace and observation writes run in parallel via Promise.all(). This maximizes throughput at the cost of increased event loop pressure, which can be tuned if needed.
  • SDK version gating: Direct events table writes are gated behind both an environment variable flag (LANGFUSE_EXPERIMENT_INSERT_INTO_EVENTS_TABLE) and SDK version checks. This allows progressive rollout of the new write path as SDKs adopt the required protocol changes.
  • Independent error handling in Phase 4: Each event record's eval scheduling and events table write are independently try/caught. A failure in one observation does not prevent processing of others.
  • Events table forwarding logic: Non-qualifying observations (older SDK versions, non-Langfuse SDKs) can still be forwarded to the events table via a separate code path (shouldForwardToEventsTable) controlled by environment flags, providing a migration path.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment