Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse PublishToOtelIngestionQueue

From Leeroopedia
Knowledge Sources
Domains Ingestion, Queue Architecture, Blob Storage
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for uploading parsed OTel resourceSpans to S3 blob storage and enqueuing a BullMQ processing job provided by Langfuse.

Description

The publishToOtelIngestionQueue method is an async instance method on the OtelIngestionProcessor class. It orchestrates the two-step handoff from the web server to the worker:

  1. S3 Upload: Serializes the incoming ResourceSpan[] array as JSON and uploads it to the configured S3 bucket using getS3EventStorageClient().uploadJson(). The file key is constructed from a configurable prefix, the project ID, a time-based directory path (down to the minute), and a random UUID for uniqueness.
  1. Queue Dispatch: Obtains a singleton instance of OtelIngestionQueue and adds a job named QueueJobs.OtelIngestionJob. The job payload carries the S3 file key, the public API key, and the full authentication scope (projectId, accessLevel, orgId). Optionally includes propagated headers for enterprise ingestion masking.

The method uses constructor-injected configuration (projectId, publicKey, orgId, propagatedHeaders) that is set when the OtelIngestionProcessor is instantiated in the HTTP handler.

Usage

This method is called at the end of the OTel traces HTTP handler after the request body has been parsed and validated. It is the bridge between the synchronous HTTP layer and the asynchronous worker layer.

const processor = new OtelIngestionProcessor({
  projectId: auth.scope.projectId,
  publicKey: auth.scope.publicKey,
  orgId: auth.scope.orgId,
  propagatedHeaders: propagatedHeaders,
});

return processor.publishToOtelIngestionQueue(resourceSpans);

Code Reference

Source Location

  • Repository: langfuse
  • File: packages/shared/src/server/otel/OtelIngestionProcessor.ts
  • Lines: 141-173

Signature

async publishToOtelIngestionQueue(resourceSpans: ResourceSpan[]): Promise<any>

Import

import { OtelIngestionProcessor } from "@langfuse/shared/src/server";
// ResourceSpan type is also exported from the same module
import type { ResourceSpan } from "@langfuse/shared/src/server";

I/O Contract

Inputs

Name Type Required Description
resourceSpans ResourceSpan[] Yes Array of parsed OpenTelemetry ResourceSpan objects extracted from the HTTP request body
this.projectId string Yes (constructor) Project identifier used to scope the S3 key and queue job
this.publicKey string No (constructor) Public API key included in the queue job payload for downstream auth
this.orgId string No (constructor) Organization ID included in the queue job auth scope
this.propagatedHeaders Record<string, string> No (constructor) HTTP headers propagated for enterprise ingestion masking

Outputs

Name Type Description
Promise<any> Promise Resolves when the BullMQ job has been successfully enqueued. The resolved value is the BullMQ Job object.
Side effect: S3 object void JSON file uploaded at key {prefix}otel/{projectId}/{yyyy/mm/dd/hh/mm}/{uuid}.json
Side effect: BullMQ job void OtelIngestionJob added to OtelIngestionQueue with payload referencing the S3 file key
Rejection string Rejects with "Failed to instantiate otel ingestion queue" if the queue singleton is unavailable

Usage Examples

Basic Usage in HTTP Handler

const processor = new OtelIngestionProcessor({
  projectId: "proj_abc123",
  publicKey: "pk-lf-xxx",
  orgId: "org_def456",
});

// resourceSpans comes from parsing the HTTP request body
await processor.publishToOtelIngestionQueue(resourceSpans);
// At this point:
// 1. JSON file exists in S3 at otel/proj_abc123/2026/02/14/10/30/{uuid}.json
// 2. BullMQ job is enqueued referencing that file key

With Propagated Headers for Ingestion Masking

const propagatedHeaders: Record<string, string> = {};
for (const headerName of env.LANGFUSE_INGESTION_MASKING_PROPAGATED_HEADERS) {
  const value = req.headers[headerName];
  if (typeof value === "string") {
    propagatedHeaders[headerName] = value;
  }
}

const processor = new OtelIngestionProcessor({
  projectId: auth.scope.projectId,
  publicKey: auth.scope.publicKey,
  orgId: auth.scope.orgId,
  propagatedHeaders:
    Object.keys(propagatedHeaders).length > 0 ? propagatedHeaders : undefined,
});

return processor.publishToOtelIngestionQueue(resourceSpans);

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment