Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Langfuse Langfuse Otel ingestion pipeline

From Leeroopedia
Knowledge Sources
Domains LLM_Ops, OpenTelemetry, Data_Ingestion, Multi_Framework
Last Updated 2026-02-14 05:00 GMT

Overview

End-to-end process for ingesting OpenTelemetry spans from 20+ AI frameworks into Langfuse, converting them into native trace and observation data through framework-aware detection, attribute extraction, and ChatML normalization.

Description

This workflow describes how Langfuse receives, converts, and stores OpenTelemetry (OTel) trace data from diverse AI frameworks. The OTel endpoint accepts standard ExportTraceServiceRequest payloads in both Protobuf and JSON formats with gzip compression support. The OtelIngestionProcessor converts OTel resource spans into Langfuse's native event format by detecting the originating framework, mapping span types to observation categories (GENERATION, SPAN, AGENT, TOOL, etc.), extracting input/output data in framework-specific formats, and normalizing messages through the ChatML adapter system. The converted events are then processed through the standard ingestion pipeline for storage in ClickHouse and PostgreSQL.

Usage

Execute this workflow when LLM applications instrumented with OpenTelemetry-compatible frameworks send trace data to Langfuse. This covers frameworks including LangChain, LlamaIndex, OpenAI Agents, Pydantic AI, Vercel AI SDK, Google ADK, LangGraph, Microsoft Agent Framework, Semantic Kernel, and many others that emit OTel spans.

Execution Steps

Step 1: OTel Endpoint Reception

The OTel API endpoint receives ExportTraceServiceRequest payloads via HTTP POST. The endpoint supports both Protobuf (application/x-protobuf) and JSON (application/json) content types, with automatic gzip decompression. After authentication via project API keys, the project is marked as an OTel user in Redis (24-hour TTL) for conditional UI features.

Key considerations:

  • Standard OTel export protocol compatibility ensures any OTel-instrumented framework works
  • Project OTel usage tracking enables framework-specific UI customization
  • Ingestion suspension is checked for organizations that have exceeded usage thresholds

Step 2: S3 Upload and Queue Dispatch

The raw resourceSpans payload is uploaded to S3-compatible storage with a path structure of otel/{projectId}/{date}/{uuid}.json. A job is then dispatched to the sharded OTel ingestion queue in Redis, containing only the S3 file reference. This decouples the API response from the processing work.

Key considerations:

  • Sharding is controlled by LANGFUSE_OTEL_INGESTION_QUEUE_SHARD_COUNT
  • S3 offloading keeps queue messages small and prevents Redis memory pressure
  • The queue supports 6 retry attempts with exponential backoff (5-second initial delay)

Step 3: Framework Detection and Type Mapping

The worker downloads the resourceSpans from S3 and processes each span through the ObservationTypeMapper. This priority-based registry system detects the originating framework by examining span attributes and maps each span to a Langfuse observation type (SPAN, GENERATION, EVENT, EMBEDDING, AGENT, TOOL, CHAIN, RETRIEVER, GUARDRAIL, EVALUATOR).

Key considerations:

  • Detection priority ensures specific matchers run before generic ones (lower number = higher priority)
  • Direct Langfuse SDK mapping (langfuse.observation.type attribute) takes highest priority
  • OpenInference, GenAI semantic conventions, Vercel AI SDK, and model-based detection are all supported
  • The default fallback type is SPAN when no specific pattern matches

Step 4: Input and Output Extraction

For each span, the processor extracts input and output data using framework-specific attribute keys. Over 20 framework formats are supported, from Langfuse native attributes through Vercel AI SDK, Google Vertex AI, OpenTelemetry GenAI conventions, OpenInference, Pydantic AI, TraceLoop, MLflow, and SmolAgents. The extraction follows a priority cascade, trying Langfuse-specific attributes first and falling back through progressively more generic conventions.

Key considerations:

  • Each framework stores input/output in different attribute keys and formats
  • OTel events (span events) are checked for gen_ai system/user/assistant messages
  • Model parameters, usage details, and cost details are extracted in parallel
  • Tags, user ID, session ID, and metadata are extracted from trace-level attributes

Step 5: Trace and Observation Event Generation

The processor generates Langfuse-native ingestion events from the extracted data. Root spans (those without a parent span ID) or spans with trace-level updates generate trace-create events. Every span generates an observation-create event. A Redis-based deduplication mechanism (10-minute TTL) ensures that redundant shallow trace records are filtered when a full trace record has already been seen.

Key considerations:

  • Trace IDs and span IDs are converted from binary or hex OTel format to Langfuse format
  • Shallow traces (containing only ID and timestamp) are suppressed if a full trace exists
  • The NX Redis operation ensures only the first seen trace creates the cache entry
  • Environment, release, and version attributes are propagated from OTel resource attributes

Step 6: ChatML Normalization

Extracted input and output data passes through the ChatML adapter system for normalization into a unified message format. Nine adapters handle different provider formats: LangGraph, Vercel AI SDK, OpenAI, Gemini, Microsoft Agent, Pydantic AI, Semantic Kernel, and a generic fallback. Each adapter implements a detect function for format identification and a preprocess function for normalization, including tool event extraction.

Key considerations:

  • Adapter detection follows a priority order with specific adapters checked before generic ones
  • The LangGraph adapter runs before OpenAI due to format overlap
  • Tool calls and tool results are extracted into a unified ToolEvent representation
  • The generic adapter serves as a universal fallback when no specific format is detected

Step 7: Ingestion and Post-Processing

The converted events are split into traces and observations. Observations are written to ClickHouse via the IngestionService merge-and-write pipeline. Traces are processed through the standard processEventBatch function for dual-database storage. If observation-level evaluation configurations exist for the project, observation evaluations are scheduled. For newer SDK versions, events are also written directly to the events staging table.

Key considerations:

  • Enterprise ingestion masking is applied before processing if enabled
  • Observation-level evals upload observation snapshots to S3 for consistent evaluation
  • SDK version gating controls whether direct events table writes are enabled
  • All standard post-ingestion side effects (webhooks, evaluations) apply

Execution Diagram

GitHub URL

Workflow Repository