Workflow:Langfuse Langfuse Otel ingestion pipeline
| Knowledge Sources | |
|---|---|
| Domains | LLM_Ops, OpenTelemetry, Data_Ingestion, Multi_Framework |
| Last Updated | 2026-02-14 05:00 GMT |
Overview
End-to-end process for ingesting OpenTelemetry spans from 20+ AI frameworks into Langfuse, converting them into native trace and observation data through framework-aware detection, attribute extraction, and ChatML normalization.
Description
This workflow describes how Langfuse receives, converts, and stores OpenTelemetry (OTel) trace data from diverse AI frameworks. The OTel endpoint accepts standard ExportTraceServiceRequest payloads in both Protobuf and JSON formats with gzip compression support. The OtelIngestionProcessor converts OTel resource spans into Langfuse's native event format by detecting the originating framework, mapping span types to observation categories (GENERATION, SPAN, AGENT, TOOL, etc.), extracting input/output data in framework-specific formats, and normalizing messages through the ChatML adapter system. The converted events are then processed through the standard ingestion pipeline for storage in ClickHouse and PostgreSQL.
Usage
Execute this workflow when LLM applications instrumented with OpenTelemetry-compatible frameworks send trace data to Langfuse. This covers frameworks including LangChain, LlamaIndex, OpenAI Agents, Pydantic AI, Vercel AI SDK, Google ADK, LangGraph, Microsoft Agent Framework, Semantic Kernel, and many others that emit OTel spans.
Execution Steps
Step 1: OTel Endpoint Reception
The OTel API endpoint receives ExportTraceServiceRequest payloads via HTTP POST. The endpoint supports both Protobuf (application/x-protobuf) and JSON (application/json) content types, with automatic gzip decompression. After authentication via project API keys, the project is marked as an OTel user in Redis (24-hour TTL) for conditional UI features.
Key considerations:
- Standard OTel export protocol compatibility ensures any OTel-instrumented framework works
- Project OTel usage tracking enables framework-specific UI customization
- Ingestion suspension is checked for organizations that have exceeded usage thresholds
Step 2: S3 Upload and Queue Dispatch
The raw resourceSpans payload is uploaded to S3-compatible storage with a path structure of otel/{projectId}/{date}/{uuid}.json. A job is then dispatched to the sharded OTel ingestion queue in Redis, containing only the S3 file reference. This decouples the API response from the processing work.
Key considerations:
- Sharding is controlled by LANGFUSE_OTEL_INGESTION_QUEUE_SHARD_COUNT
- S3 offloading keeps queue messages small and prevents Redis memory pressure
- The queue supports 6 retry attempts with exponential backoff (5-second initial delay)
Step 3: Framework Detection and Type Mapping
The worker downloads the resourceSpans from S3 and processes each span through the ObservationTypeMapper. This priority-based registry system detects the originating framework by examining span attributes and maps each span to a Langfuse observation type (SPAN, GENERATION, EVENT, EMBEDDING, AGENT, TOOL, CHAIN, RETRIEVER, GUARDRAIL, EVALUATOR).
Key considerations:
- Detection priority ensures specific matchers run before generic ones (lower number = higher priority)
- Direct Langfuse SDK mapping (langfuse.observation.type attribute) takes highest priority
- OpenInference, GenAI semantic conventions, Vercel AI SDK, and model-based detection are all supported
- The default fallback type is SPAN when no specific pattern matches
Step 4: Input and Output Extraction
For each span, the processor extracts input and output data using framework-specific attribute keys. Over 20 framework formats are supported, from Langfuse native attributes through Vercel AI SDK, Google Vertex AI, OpenTelemetry GenAI conventions, OpenInference, Pydantic AI, TraceLoop, MLflow, and SmolAgents. The extraction follows a priority cascade, trying Langfuse-specific attributes first and falling back through progressively more generic conventions.
Key considerations:
- Each framework stores input/output in different attribute keys and formats
- OTel events (span events) are checked for gen_ai system/user/assistant messages
- Model parameters, usage details, and cost details are extracted in parallel
- Tags, user ID, session ID, and metadata are extracted from trace-level attributes
Step 5: Trace and Observation Event Generation
The processor generates Langfuse-native ingestion events from the extracted data. Root spans (those without a parent span ID) or spans with trace-level updates generate trace-create events. Every span generates an observation-create event. A Redis-based deduplication mechanism (10-minute TTL) ensures that redundant shallow trace records are filtered when a full trace record has already been seen.
Key considerations:
- Trace IDs and span IDs are converted from binary or hex OTel format to Langfuse format
- Shallow traces (containing only ID and timestamp) are suppressed if a full trace exists
- The NX Redis operation ensures only the first seen trace creates the cache entry
- Environment, release, and version attributes are propagated from OTel resource attributes
Step 6: ChatML Normalization
Extracted input and output data passes through the ChatML adapter system for normalization into a unified message format. Nine adapters handle different provider formats: LangGraph, Vercel AI SDK, OpenAI, Gemini, Microsoft Agent, Pydantic AI, Semantic Kernel, and a generic fallback. Each adapter implements a detect function for format identification and a preprocess function for normalization, including tool event extraction.
Key considerations:
- Adapter detection follows a priority order with specific adapters checked before generic ones
- The LangGraph adapter runs before OpenAI due to format overlap
- Tool calls and tool results are extracted into a unified ToolEvent representation
- The generic adapter serves as a universal fallback when no specific format is detected
Step 7: Ingestion and Post-Processing
The converted events are split into traces and observations. Observations are written to ClickHouse via the IngestionService merge-and-write pipeline. Traces are processed through the standard processEventBatch function for dual-database storage. If observation-level evaluation configurations exist for the project, observation evaluations are scheduled. For newer SDK versions, events are also written directly to the events staging table.
Key considerations:
- Enterprise ingestion masking is applied before processing if enabled
- Observation-level evals upload observation snapshots to S3 for consistent evaluation
- SDK version gating controls whether direct events table writes are enabled
- All standard post-ingestion side effects (webhooks, evaluations) apply