Implementation:Langfuse Langfuse OtelIngestionQueueProcessor
| Knowledge Sources | |
|---|---|
| Domains | Ingestion, Queue Architecture, ClickHouse, Worker Processing |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for processing queued OTel ingestion jobs in the Langfuse worker, converting S3-stored ResourceSpans into ClickHouse records provided by Langfuse.
Description
The otelIngestionQueueProcessor is a BullMQ Processor function that handles OtelIngestionQueue jobs. Each job references an S3 file containing OTel ResourceSpans that were uploaded by the web server. The processor:
- Downloads from S3: Retrieves the JSON file using the file key from the job payload via
getS3EventStorageClient().download(). - Applies ingestion masking: If enterprise ingestion masking is enabled, applies masking with fail-closed semantics (drops event on masking failure).
- Generates ingestion events: Instantiates
OtelIngestionProcessorand callsprocessToIngestionEvents()to convert ResourceSpans to deduplicatedIngestionEventType[]. - Splits events: Separates trace events from observation events using
getClickhouseEntityType(). - Validates observations: Parses each observation through
createIngestionEventSchema()for type transformations. - Writes concurrently: Processes traces via
processEventBatch()and observations viaIngestionService.mergeAndWrite()in parallel. - Post-processes: Generates enriched event records via
processToEvent()for observation evaluations and optional direct events table writes.
The processor also includes two helper functions:
getSdkInfoFromResourceSpans(): Extracts scope name, version, and SDK language from the first ResourceSpan.checkSdkVersionRequirements(): Validates whether the SDK meets version requirements for direct event writes (Python >= 3.9.0, JS >= 4.4.0, must be a Langfuse SDK scope).
Usage
This processor is registered with the BullMQ worker for the QueueName.OtelIngestionQueue queue. It is invoked automatically when jobs are added to the queue by the web server.
Code Reference
Source Location
- Repository: langfuse
- File: worker/src/queues/otelIngestionQueue.ts
- Lines: 123-408
Signature
export const otelIngestionQueueProcessor: Processor = async (
job: Job<TQueueJobTypes[QueueName.OtelIngestionQueue]>,
): Promise<void>
Import
import { otelIngestionQueueProcessor } from "../queues/otelIngestionQueue";
// Dependencies used internally:
import {
OtelIngestionProcessor,
processEventBatch,
getS3EventStorageClient,
clickhouseClient,
redis,
type IngestionEventType,
type ResourceSpan,
createIngestionEventSchema,
getClickhouseEntityType,
QueueName,
type TQueueJobTypes,
compareVersions,
} from "@langfuse/shared/src/server";
import { IngestionService } from "../services/IngestionService";
import { ClickhouseWriter } from "../services/ClickhouseWriter";
import { prisma } from "@langfuse/shared/src/db";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| job.data.payload.data.fileKey | string | Yes | S3 object key pointing to the JSON file containing the ResourceSpan[] array |
| job.data.payload.data.publicKey | string | No | Public API key for the project, passed to OtelIngestionProcessor |
| job.data.payload.authCheck.scope.projectId | string | Yes | Project identifier for scoping all operations |
| job.data.payload.authCheck.scope.orgId | string | No | Organization ID for enterprise features |
| job.data.payload.propagatedHeaders | Record<string, string> | No | HTTP headers propagated from the original request for ingestion masking |
| job.data.id | string | Yes | Unique job identifier for tracing |
Outputs
| Name | Type | Description |
|---|---|---|
| Promise<void> | void | Resolves when all processing is complete. The function writes data to ClickHouse as a side effect. |
| Side effect: ClickHouse traces | void | Trace events written via processEventBatch()
|
| Side effect: ClickHouse observations | void | Observation events written via IngestionService.mergeAndWrite()
|
| Side effect: ClickHouse events table | void | (Conditional) Event records written directly for qualifying SDK versions |
| Side effect: eval jobs | void | (Conditional) Observation eval jobs scheduled via scheduleObservationEvals()
|
| Side effect: metrics | void | Distribution and increment metrics for ingestion stats |
| Throws | Error | Re-throws on infrastructure failures (Redis/Prisma unavailable) to trigger BullMQ retry. Logs and swallows ForbiddenError.
|
Usage Examples
Queue Registration in Worker
import { Worker } from "bullmq";
import { QueueName } from "@langfuse/shared/src/server";
import { otelIngestionQueueProcessor } from "./queues/otelIngestionQueue";
const worker = new Worker(
QueueName.OtelIngestionQueue,
otelIngestionQueueProcessor,
{
connection: redisConnection,
concurrency: 10,
},
);
Job Payload Structure
// This is the shape of the job that the web server enqueues:
const jobData = {
id: "550e8400-e29b-41d4-a716-446655440000",
timestamp: new Date(),
name: "OtelIngestionJob",
payload: {
data: {
fileKey: "otel/proj_abc123/2026/02/14/10/30/uuid-here.json",
publicKey: "pk-lf-xxx",
},
authCheck: {
validKey: true,
scope: {
projectId: "proj_abc123",
accessLevel: "project",
orgId: "org_def456",
},
},
propagatedHeaders: {
"x-custom-header": "value",
},
},
};
SDK Version Check Logic
// Internal helper: determines if SDK version qualifies for direct event writes
function checkSdkVersionRequirements(
sdkInfo: SdkInfo,
isSdkExperimentBatch: boolean,
): boolean {
// Must be a Langfuse SDK (scope name contains "langfuse")
// Python SDK >= 3.9.0
// JS SDK >= 4.4.0
// AND must be in "sdk-experiment" environment
}
// Example: Python SDK 3.10.0 in experiment env -> true
checkSdkVersionRequirements(
{ scopeName: "langfuse-sdk", scopeVersion: "3.10.0", telemetrySdkLanguage: "python" },
true, // has experiment environment
);
// Returns: true
// Example: Python SDK 3.8.0 -> false (version too low)
checkSdkVersionRequirements(
{ scopeName: "langfuse-sdk", scopeVersion: "3.8.0", telemetrySdkLanguage: "python" },
true,
);
// Returns: false
// Example: Non-Langfuse SDK -> false
checkSdkVersionRequirements(
{ scopeName: "openai-sdk", scopeVersion: "5.0.0", telemetrySdkLanguage: "python" },
true,
);
// Returns: false
Processing Flow Summary
// Simplified view of what the processor does:
async function otelIngestionQueueProcessor(job) {
const { fileKey, publicKey } = job.data.payload.data;
const { projectId } = job.data.payload.authCheck.scope;
// Phase 1: Download from S3
const raw = await s3Client.download(fileKey);
let parsedSpans = JSON.parse(raw);
// Phase 1b: Apply masking (EE)
if (maskingEnabled) {
parsedSpans = await applyMasking(parsedSpans);
}
// Phase 2: Generate events
const processor = new OtelIngestionProcessor({ projectId, publicKey });
const events = await processor.processToIngestionEvents(parsedSpans);
// Phase 3: Split and write concurrently
const traces = events.filter(e => !isObservation(e));
const observations = events.filter(e => isObservation(e));
await Promise.all([
observations.map(o => ingestionService.mergeAndWrite(/*...*/)),
processEventBatch(traces, auth, { source: "otel" }),
]);
// Phase 4: Post-processing (evals + direct writes)
const enrichedEvents = processor.processToEvent(parsedSpans);
for (const event of enrichedEvents) {
const record = await ingestionService.createEventRecord(event);
if (hasEvalConfigs) await scheduleObservationEvals(record);
if (shouldWriteToEventsTable) ingestionService.writeEventRecord(record);
}
}