Implementation:Langfuse Langfuse PublishToOtelIngestionQueue
| Knowledge Sources | |
|---|---|
| Domains | Ingestion, Queue Architecture, Blob Storage |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for uploading parsed OTel resourceSpans to S3 blob storage and enqueuing a BullMQ processing job provided by Langfuse.
Description
The publishToOtelIngestionQueue method is an async instance method on the OtelIngestionProcessor class. It orchestrates the two-step handoff from the web server to the worker:
- S3 Upload: Serializes the incoming
ResourceSpan[]array as JSON and uploads it to the configured S3 bucket usinggetS3EventStorageClient().uploadJson(). The file key is constructed from a configurable prefix, the project ID, a time-based directory path (down to the minute), and a random UUID for uniqueness.
- Queue Dispatch: Obtains a singleton instance of
OtelIngestionQueueand adds a job namedQueueJobs.OtelIngestionJob. The job payload carries the S3 file key, the public API key, and the full authentication scope (projectId, accessLevel, orgId). Optionally includes propagated headers for enterprise ingestion masking.
The method uses constructor-injected configuration (projectId, publicKey, orgId, propagatedHeaders) that is set when the OtelIngestionProcessor is instantiated in the HTTP handler.
Usage
This method is called at the end of the OTel traces HTTP handler after the request body has been parsed and validated. It is the bridge between the synchronous HTTP layer and the asynchronous worker layer.
const processor = new OtelIngestionProcessor({
projectId: auth.scope.projectId,
publicKey: auth.scope.publicKey,
orgId: auth.scope.orgId,
propagatedHeaders: propagatedHeaders,
});
return processor.publishToOtelIngestionQueue(resourceSpans);
Code Reference
Source Location
- Repository: langfuse
- File: packages/shared/src/server/otel/OtelIngestionProcessor.ts
- Lines: 141-173
Signature
async publishToOtelIngestionQueue(resourceSpans: ResourceSpan[]): Promise<any>
Import
import { OtelIngestionProcessor } from "@langfuse/shared/src/server";
// ResourceSpan type is also exported from the same module
import type { ResourceSpan } from "@langfuse/shared/src/server";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| resourceSpans | ResourceSpan[] | Yes | Array of parsed OpenTelemetry ResourceSpan objects extracted from the HTTP request body |
| this.projectId | string | Yes (constructor) | Project identifier used to scope the S3 key and queue job |
| this.publicKey | string | No (constructor) | Public API key included in the queue job payload for downstream auth |
| this.orgId | string | No (constructor) | Organization ID included in the queue job auth scope |
| this.propagatedHeaders | Record<string, string> | No (constructor) | HTTP headers propagated for enterprise ingestion masking |
Outputs
| Name | Type | Description |
|---|---|---|
| Promise<any> | Promise | Resolves when the BullMQ job has been successfully enqueued. The resolved value is the BullMQ Job object. |
| Side effect: S3 object | void | JSON file uploaded at key {prefix}otel/{projectId}/{yyyy/mm/dd/hh/mm}/{uuid}.json
|
| Side effect: BullMQ job | void | OtelIngestionJob added to OtelIngestionQueue with payload referencing the S3 file key |
| Rejection | string | Rejects with "Failed to instantiate otel ingestion queue" if the queue singleton is unavailable |
Usage Examples
Basic Usage in HTTP Handler
const processor = new OtelIngestionProcessor({
projectId: "proj_abc123",
publicKey: "pk-lf-xxx",
orgId: "org_def456",
});
// resourceSpans comes from parsing the HTTP request body
await processor.publishToOtelIngestionQueue(resourceSpans);
// At this point:
// 1. JSON file exists in S3 at otel/proj_abc123/2026/02/14/10/30/{uuid}.json
// 2. BullMQ job is enqueued referencing that file key
With Propagated Headers for Ingestion Masking
const propagatedHeaders: Record<string, string> = {};
for (const headerName of env.LANGFUSE_INGESTION_MASKING_PROPAGATED_HEADERS) {
const value = req.headers[headerName];
if (typeof value === "string") {
propagatedHeaders[headerName] = value;
}
}
const processor = new OtelIngestionProcessor({
projectId: auth.scope.projectId,
publicKey: auth.scope.publicKey,
orgId: auth.scope.orgId,
propagatedHeaders:
Object.keys(propagatedHeaders).length > 0 ? propagatedHeaders : undefined,
});
return processor.publishToOtelIngestionQueue(resourceSpans);