Implementation:Langfuse Langfuse ProcessToIngestionEvents
| Knowledge Sources | |
|---|---|
| Domains | Observability, Ingestion, OpenTelemetry, Deduplication |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for converting OTel ResourceSpans into deduplicated Langfuse IngestionEventType arrays provided by Langfuse.
Description
The processToIngestionEvents() method is the primary async entry point on OtelIngestionProcessor for converting raw OTel data into Langfuse ingestion events. It orchestrates:
- Lazy Redis initialization: On first call, loads the set of recently-seen trace IDs from Redis using SET NX with 600-second TTL. This enables cross-batch deduplication without requiring external state management by the caller.
- Span processing: Iterates through all ResourceSpan -> ScopeSpan -> Span levels, calling the private
processSpan()method which produces 1-2 IngestionEventType records per span (one trace-create event conditionally, one observation event always).
- Trace event creation: The private
createTraceEvent()method builds either a shallow trace (id + timestamp + environment only) or a full trace (all properties) based on whether the span is a root span, has trace updates, or is merely ensuring trace existence.
- Observation event creation: The private
createObservationEvent()method builds the observation with type determined byObservationTypeMapperRegistry, input/output extracted byextractInputAndOutput(), and all metadata, usage, cost, and model information extracted from span attributes.
- Post-filtering:
filterRedundantShallowTraces()performs an O(n) pass to remove shallow trace events that are superseded by full trace events for the same trace ID within the batch.
- Metrics recording: Emits
langfuse.ingestion.otel.trace_create_eventincrement counters categorized by reason (shallow, rootSpanClosed, traceUpdated).
The method handles errors gracefully: ForbiddenError is re-thrown (indicating auth issues), while other errors are logged and result in an empty array return to avoid breaking the ingestion pipeline.
Usage
This method is called by the worker-side otelIngestionQueueProcessor after downloading and parsing resourceSpans from S3.
const processor = new OtelIngestionProcessor({ projectId, publicKey });
const events: IngestionEventType[] = await processor.processToIngestionEvents(parsedSpans);
Code Reference
Source Location
- Repository: langfuse
- File: packages/shared/src/server/otel/OtelIngestionProcessor.ts
- Lines: 451-522 (processToIngestionEvents), 640-993 (processResourceSpan, processSpan, createTraceEvent, createObservationEvent)
Signature
async processToIngestionEvents(
resourceSpans: ResourceSpan[],
): Promise<IngestionEventType[]>
Import
import {
OtelIngestionProcessor,
type IngestionEventType,
type ResourceSpan,
} from "@langfuse/shared/src/server";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| resourceSpans | ResourceSpan[] | Yes | Array of parsed OTel ResourceSpan objects, typically downloaded from S3. Each ResourceSpan contains resource attributes and an array of ScopeSpans, each containing scope metadata and an array of Spans. |
| this.projectId | string | Yes (constructor) | Project ID for Redis key scoping and event tagging |
| this.publicKey | string | No (constructor) | Not used directly in this method but available on the instance |
Outputs
| Name | Type | Description |
|---|---|---|
| IngestionEventType[] | Array | Deduplicated array of Langfuse ingestion events. Each event has { id: string, type: string, timestamp: string, body: object }. Types include "trace-create", "span-create", "generation-create", "tool-create", "agent-create", etc.
|
| Side effect: Redis keys | void | SET NX operations for trace deduplication keys with 600s TTL |
| Side effect: metrics | void | Increment counters for trace event categories (shallow, rootSpanClosed, traceUpdated) |
Usage Examples
Basic Processing
const processor = new OtelIngestionProcessor({
projectId: "proj_abc123",
publicKey: "pk-lf-xxx",
});
const resourceSpans: ResourceSpan[] = [
{
resource: {
attributes: [
{ key: "service.name", value: { stringValue: "my-ai-service" } },
{ key: "telemetry.sdk.language", value: { stringValue: "python" } },
],
},
scopeSpans: [
{
scope: { name: "langfuse-sdk", version: "3.5.0" },
spans: [
{
traceId: "abc123",
spanId: "span001",
parentSpanId: null, // Root span
name: "chat-completion",
startTimeUnixNano: "1700000000000000000",
endTimeUnixNano: "1700000001000000000",
attributes: [
{ key: "langfuse.observation.type", value: { stringValue: "generation" } },
{ key: "langfuse.observation.model.name", value: { stringValue: "gpt-4o" } },
],
events: [],
status: { code: 0 },
},
],
},
],
},
];
const events = await processor.processToIngestionEvents(resourceSpans);
// events = [
// { id: "<uuid>", type: "trace-create", timestamp: "...", body: { id: "abc123", name: "chat-completion", ... } },
// { id: "<uuid>", type: "generation-create", timestamp: "...", body: { id: "span001", traceId: "abc123", model: "gpt-4o", ... } },
// ]
Deduplication Across Batches
// First batch: trace "abc123" is new -> Redis SET NX returns "OK" -> not in seenTraces
const events1 = await processor.processToIngestionEvents(batch1);
// Produces trace-create + observation events for trace "abc123"
// Second batch (same processor instance): trace "abc123" is in seenTraces
// Only root spans or spans with trace updates will produce trace-create events
// Non-root spans without trace attributes will skip trace creation for "abc123"
const events2 = await processor.processToIngestionEvents(batch2);
Shallow vs Full Trace Filtering
// Given a batch with:
// - Span A: root span for trace "t1" (produces full trace-create)
// - Span B: child span for trace "t1", no trace attrs (produces shallow trace-create)
// - Span C: child span for trace "t1", has userId (produces full trace-create)
// After filterRedundantShallowTraces():
// - Span A's full trace-create: KEPT
// - Span B's shallow trace-create: REMOVED (superseded by A and C)
// - Span C's full trace-create: KEPT
// - All observation events: KEPT (filtering only affects trace-create events)