Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langfuse Langfuse ProcessToIngestionEvents

From Leeroopedia
Knowledge Sources
Domains Observability, Ingestion, OpenTelemetry, Deduplication
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for converting OTel ResourceSpans into deduplicated Langfuse IngestionEventType arrays provided by Langfuse.

Description

The processToIngestionEvents() method is the primary async entry point on OtelIngestionProcessor for converting raw OTel data into Langfuse ingestion events. It orchestrates:

  1. Lazy Redis initialization: On first call, loads the set of recently-seen trace IDs from Redis using SET NX with 600-second TTL. This enables cross-batch deduplication without requiring external state management by the caller.
  1. Span processing: Iterates through all ResourceSpan -> ScopeSpan -> Span levels, calling the private processSpan() method which produces 1-2 IngestionEventType records per span (one trace-create event conditionally, one observation event always).
  1. Trace event creation: The private createTraceEvent() method builds either a shallow trace (id + timestamp + environment only) or a full trace (all properties) based on whether the span is a root span, has trace updates, or is merely ensuring trace existence.
  1. Observation event creation: The private createObservationEvent() method builds the observation with type determined by ObservationTypeMapperRegistry, input/output extracted by extractInputAndOutput(), and all metadata, usage, cost, and model information extracted from span attributes.
  1. Post-filtering: filterRedundantShallowTraces() performs an O(n) pass to remove shallow trace events that are superseded by full trace events for the same trace ID within the batch.
  1. Metrics recording: Emits langfuse.ingestion.otel.trace_create_event increment counters categorized by reason (shallow, rootSpanClosed, traceUpdated).

The method handles errors gracefully: ForbiddenError is re-thrown (indicating auth issues), while other errors are logged and result in an empty array return to avoid breaking the ingestion pipeline.

Usage

This method is called by the worker-side otelIngestionQueueProcessor after downloading and parsing resourceSpans from S3.

const processor = new OtelIngestionProcessor({ projectId, publicKey });
const events: IngestionEventType[] = await processor.processToIngestionEvents(parsedSpans);

Code Reference

Source Location

  • Repository: langfuse
  • File: packages/shared/src/server/otel/OtelIngestionProcessor.ts
  • Lines: 451-522 (processToIngestionEvents), 640-993 (processResourceSpan, processSpan, createTraceEvent, createObservationEvent)

Signature

async processToIngestionEvents(
  resourceSpans: ResourceSpan[],
): Promise<IngestionEventType[]>

Import

import {
  OtelIngestionProcessor,
  type IngestionEventType,
  type ResourceSpan,
} from "@langfuse/shared/src/server";

I/O Contract

Inputs

Name Type Required Description
resourceSpans ResourceSpan[] Yes Array of parsed OTel ResourceSpan objects, typically downloaded from S3. Each ResourceSpan contains resource attributes and an array of ScopeSpans, each containing scope metadata and an array of Spans.
this.projectId string Yes (constructor) Project ID for Redis key scoping and event tagging
this.publicKey string No (constructor) Not used directly in this method but available on the instance

Outputs

Name Type Description
IngestionEventType[] Array Deduplicated array of Langfuse ingestion events. Each event has { id: string, type: string, timestamp: string, body: object }. Types include "trace-create", "span-create", "generation-create", "tool-create", "agent-create", etc.
Side effect: Redis keys void SET NX operations for trace deduplication keys with 600s TTL
Side effect: metrics void Increment counters for trace event categories (shallow, rootSpanClosed, traceUpdated)

Usage Examples

Basic Processing

const processor = new OtelIngestionProcessor({
  projectId: "proj_abc123",
  publicKey: "pk-lf-xxx",
});

const resourceSpans: ResourceSpan[] = [
  {
    resource: {
      attributes: [
        { key: "service.name", value: { stringValue: "my-ai-service" } },
        { key: "telemetry.sdk.language", value: { stringValue: "python" } },
      ],
    },
    scopeSpans: [
      {
        scope: { name: "langfuse-sdk", version: "3.5.0" },
        spans: [
          {
            traceId: "abc123",
            spanId: "span001",
            parentSpanId: null,  // Root span
            name: "chat-completion",
            startTimeUnixNano: "1700000000000000000",
            endTimeUnixNano: "1700000001000000000",
            attributes: [
              { key: "langfuse.observation.type", value: { stringValue: "generation" } },
              { key: "langfuse.observation.model.name", value: { stringValue: "gpt-4o" } },
            ],
            events: [],
            status: { code: 0 },
          },
        ],
      },
    ],
  },
];

const events = await processor.processToIngestionEvents(resourceSpans);
// events = [
//   { id: "<uuid>", type: "trace-create", timestamp: "...", body: { id: "abc123", name: "chat-completion", ... } },
//   { id: "<uuid>", type: "generation-create", timestamp: "...", body: { id: "span001", traceId: "abc123", model: "gpt-4o", ... } },
// ]

Deduplication Across Batches

// First batch: trace "abc123" is new -> Redis SET NX returns "OK" -> not in seenTraces
const events1 = await processor.processToIngestionEvents(batch1);
// Produces trace-create + observation events for trace "abc123"

// Second batch (same processor instance): trace "abc123" is in seenTraces
// Only root spans or spans with trace updates will produce trace-create events
// Non-root spans without trace attributes will skip trace creation for "abc123"
const events2 = await processor.processToIngestionEvents(batch2);

Shallow vs Full Trace Filtering

// Given a batch with:
// - Span A: root span for trace "t1" (produces full trace-create)
// - Span B: child span for trace "t1", no trace attrs (produces shallow trace-create)
// - Span C: child span for trace "t1", has userId (produces full trace-create)

// After filterRedundantShallowTraces():
// - Span A's full trace-create: KEPT
// - Span B's shallow trace-create: REMOVED (superseded by A and C)
// - Span C's full trace-create: KEPT
// - All observation events: KEPT (filtering only affects trace-create events)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment