Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse OtelIngestionQueueProcessor

From Leeroopedia
Knowledge Sources
Domains Ingestion, Queue Architecture, ClickHouse, Worker Processing
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for processing queued OTel ingestion jobs in the Langfuse worker, converting S3-stored ResourceSpans into ClickHouse records provided by Langfuse.

Description

The otelIngestionQueueProcessor is a BullMQ Processor function that handles OtelIngestionQueue jobs. Each job references an S3 file containing OTel ResourceSpans that were uploaded by the web server. The processor:

  1. Downloads from S3: Retrieves the JSON file using the file key from the job payload via getS3EventStorageClient().download().
  2. Applies ingestion masking: If enterprise ingestion masking is enabled, applies masking with fail-closed semantics (drops event on masking failure).
  3. Generates ingestion events: Instantiates OtelIngestionProcessor and calls processToIngestionEvents() to convert ResourceSpans to deduplicated IngestionEventType[].
  4. Splits events: Separates trace events from observation events using getClickhouseEntityType().
  5. Validates observations: Parses each observation through createIngestionEventSchema() for type transformations.
  6. Writes concurrently: Processes traces via processEventBatch() and observations via IngestionService.mergeAndWrite() in parallel.
  7. Post-processes: Generates enriched event records via processToEvent() for observation evaluations and optional direct events table writes.

The processor also includes two helper functions:

  • getSdkInfoFromResourceSpans(): Extracts scope name, version, and SDK language from the first ResourceSpan.
  • checkSdkVersionRequirements(): Validates whether the SDK meets version requirements for direct event writes (Python >= 3.9.0, JS >= 4.4.0, must be a Langfuse SDK scope).

Usage

This processor is registered with the BullMQ worker for the QueueName.OtelIngestionQueue queue. It is invoked automatically when jobs are added to the queue by the web server.

Code Reference

Source Location

  • Repository: langfuse
  • File: worker/src/queues/otelIngestionQueue.ts
  • Lines: 123-408

Signature

export const otelIngestionQueueProcessor: Processor = async (
  job: Job<TQueueJobTypes[QueueName.OtelIngestionQueue]>,
): Promise<void>

Import

import { otelIngestionQueueProcessor } from "../queues/otelIngestionQueue";

// Dependencies used internally:
import {
  OtelIngestionProcessor,
  processEventBatch,
  getS3EventStorageClient,
  clickhouseClient,
  redis,
  type IngestionEventType,
  type ResourceSpan,
  createIngestionEventSchema,
  getClickhouseEntityType,
  QueueName,
  type TQueueJobTypes,
  compareVersions,
} from "@langfuse/shared/src/server";
import { IngestionService } from "../services/IngestionService";
import { ClickhouseWriter } from "../services/ClickhouseWriter";
import { prisma } from "@langfuse/shared/src/db";

I/O Contract

Inputs

Name Type Required Description
job.data.payload.data.fileKey string Yes S3 object key pointing to the JSON file containing the ResourceSpan[] array
job.data.payload.data.publicKey string No Public API key for the project, passed to OtelIngestionProcessor
job.data.payload.authCheck.scope.projectId string Yes Project identifier for scoping all operations
job.data.payload.authCheck.scope.orgId string No Organization ID for enterprise features
job.data.payload.propagatedHeaders Record<string, string> No HTTP headers propagated from the original request for ingestion masking
job.data.id string Yes Unique job identifier for tracing

Outputs

Name Type Description
Promise<void> void Resolves when all processing is complete. The function writes data to ClickHouse as a side effect.
Side effect: ClickHouse traces void Trace events written via processEventBatch()
Side effect: ClickHouse observations void Observation events written via IngestionService.mergeAndWrite()
Side effect: ClickHouse events table void (Conditional) Event records written directly for qualifying SDK versions
Side effect: eval jobs void (Conditional) Observation eval jobs scheduled via scheduleObservationEvals()
Side effect: metrics void Distribution and increment metrics for ingestion stats
Throws Error Re-throws on infrastructure failures (Redis/Prisma unavailable) to trigger BullMQ retry. Logs and swallows ForbiddenError.

Usage Examples

Queue Registration in Worker

import { Worker } from "bullmq";
import { QueueName } from "@langfuse/shared/src/server";
import { otelIngestionQueueProcessor } from "./queues/otelIngestionQueue";

const worker = new Worker(
  QueueName.OtelIngestionQueue,
  otelIngestionQueueProcessor,
  {
    connection: redisConnection,
    concurrency: 10,
  },
);

Job Payload Structure

// This is the shape of the job that the web server enqueues:
const jobData = {
  id: "550e8400-e29b-41d4-a716-446655440000",
  timestamp: new Date(),
  name: "OtelIngestionJob",
  payload: {
    data: {
      fileKey: "otel/proj_abc123/2026/02/14/10/30/uuid-here.json",
      publicKey: "pk-lf-xxx",
    },
    authCheck: {
      validKey: true,
      scope: {
        projectId: "proj_abc123",
        accessLevel: "project",
        orgId: "org_def456",
      },
    },
    propagatedHeaders: {
      "x-custom-header": "value",
    },
  },
};

SDK Version Check Logic

// Internal helper: determines if SDK version qualifies for direct event writes
function checkSdkVersionRequirements(
  sdkInfo: SdkInfo,
  isSdkExperimentBatch: boolean,
): boolean {
  // Must be a Langfuse SDK (scope name contains "langfuse")
  // Python SDK >= 3.9.0
  // JS SDK >= 4.4.0
  // AND must be in "sdk-experiment" environment
}

// Example: Python SDK 3.10.0 in experiment env -> true
checkSdkVersionRequirements(
  { scopeName: "langfuse-sdk", scopeVersion: "3.10.0", telemetrySdkLanguage: "python" },
  true, // has experiment environment
);
// Returns: true

// Example: Python SDK 3.8.0 -> false (version too low)
checkSdkVersionRequirements(
  { scopeName: "langfuse-sdk", scopeVersion: "3.8.0", telemetrySdkLanguage: "python" },
  true,
);
// Returns: false

// Example: Non-Langfuse SDK -> false
checkSdkVersionRequirements(
  { scopeName: "openai-sdk", scopeVersion: "5.0.0", telemetrySdkLanguage: "python" },
  true,
);
// Returns: false

Processing Flow Summary

// Simplified view of what the processor does:
async function otelIngestionQueueProcessor(job) {
  const { fileKey, publicKey } = job.data.payload.data;
  const { projectId } = job.data.payload.authCheck.scope;

  // Phase 1: Download from S3
  const raw = await s3Client.download(fileKey);
  let parsedSpans = JSON.parse(raw);

  // Phase 1b: Apply masking (EE)
  if (maskingEnabled) {
    parsedSpans = await applyMasking(parsedSpans);
  }

  // Phase 2: Generate events
  const processor = new OtelIngestionProcessor({ projectId, publicKey });
  const events = await processor.processToIngestionEvents(parsedSpans);

  // Phase 3: Split and write concurrently
  const traces = events.filter(e => !isObservation(e));
  const observations = events.filter(e => isObservation(e));

  await Promise.all([
    observations.map(o => ingestionService.mergeAndWrite(/*...*/)),
    processEventBatch(traces, auth, { source: "otel" }),
  ]);

  // Phase 4: Post-processing (evals + direct writes)
  const enrichedEvents = processor.processToEvent(parsedSpans);
  for (const event of enrichedEvents) {
    const record = await ingestionService.createEventRecord(event);
    if (hasEvalConfigs) await scheduleObservationEvals(record);
    if (shouldWriteToEventsTable) ingestionService.writeEventRecord(record);
  }
}

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment