Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Langfuse Langfuse Trace ingestion pipeline

From Leeroopedia
Knowledge Sources
Domains LLM_Ops, Data_Ingestion, Observability
Last Updated 2026-02-14 05:00 GMT

Overview

End-to-end process for ingesting LLM application trace data from SDK clients into Langfuse's dual database system (PostgreSQL and ClickHouse) via authenticated REST API endpoints and asynchronous queue processing.

Description

This workflow describes how Langfuse receives, validates, and stores observability data from LLM applications. Client SDKs (Python, JavaScript/TypeScript) submit batches of events representing traces, observations (spans, generations, events), and scores through authenticated REST API endpoints. The system validates each event against Zod schemas, optionally uploads large payloads to S3-compatible blob storage, and enqueues the events into sharded BullMQ queues for asynchronous processing by the worker service. The worker then merges events into ClickHouse for analytics and PostgreSQL for relational queries, applying model matching, cost calculation, and tool extraction along the way.

Usage

Execute this workflow when an LLM application instrumented with a Langfuse SDK sends trace data to the Langfuse server. This is the primary data ingestion path for all non-OpenTelemetry trace data, handling trace creation, observation creation/updates, score submissions, and dataset run item linkage.

Execution Steps

Step 1: API Authentication and Rate Limiting

The client SDK sends an HTTP POST request to the public ingestion endpoint with a batch of events. The server authenticates the request using Basic Auth (public key + secret key), resolves the project scope, and applies rate limiting based on the organization's plan. The authentication result includes the project ID, organization ID, and subscription plan details.

Key considerations:

  • API keys are cached in Redis to minimize database lookups
  • Rate limits are enforced per project and per resource type
  • Both project-scoped and organization-scoped API keys are supported

Step 2: Event Validation and Schema Parsing

Each event in the batch is validated against the appropriate Zod schema based on its event type (trace-create, observation-create, observation-update, score-create, etc.). Invalid events are collected as errors with descriptive messages. Valid events are normalized, including usage format transformation from legacy OpenAI token format to the unified usage model.

Key considerations:

  • 20+ event types are supported including trace, span, generation, agent, tool, chain, retriever, evaluator, and embedding
  • IDs are validated to be under 800 characters (S3 key length constraint)
  • Usage data supports both legacy (promptTokens/completionTokens) and modern (input/output/total) formats

Step 3: Trace Sampling Check

For projects with sampling configured, a deterministic sampling check is performed using the trace ID. This uses a hash-based algorithm to consistently include or exclude traces based on the configured sampling rate, ensuring that the same trace ID always produces the same sampling decision.

Key considerations:

  • Sampling is project-level and optional
  • The algorithm is deterministic so retried events produce the same decision
  • Sampling applies to the entire trace and all its children

Step 4: S3 Payload Upload

If S3 event upload is configured, the validated event payloads are uploaded to S3-compatible blob storage. This offloads large payloads from the Redis queue, keeping queue messages small and improving throughput. The S3 path includes the project ID, date, and entity type for organized storage.

Key considerations:

  • S3 upload is optional and controlled by environment configuration
  • S3 SlowDown errors are tracked per project via Redis to enable backpressure
  • Server-side encryption (SSE/KMS) is supported

Step 5: Queue Dispatch

Validated events are dispatched to sharded BullMQ ingestion queues in Redis. The sharding uses consistent hashing on the project ID to distribute load across multiple queue instances. Events are enqueued with a configurable delay to handle out-of-order processing near date boundaries (UTC midnight).

Key considerations:

  • Queue sharding is controlled by LANGFUSE_INGESTION_QUEUE_SHARD_COUNT
  • A 5-second default delay prevents duplicate processing from concurrent events
  • Around UTC midnight (23:45-00:15), the full configured delay is applied to handle date boundary events

Step 6: Worker Event Processing

The worker service consumes events from the ingestion queue. For each event, it downloads the payload from S3 (if applicable), determines the entity type (trace or observation), and routes to the appropriate processing pipeline. Observations are merged and written to ClickHouse via the IngestionService, while traces are written to both PostgreSQL and ClickHouse.

Key considerations:

  • The worker applies model matching to associate observations with configured model definitions
  • Cost calculation is performed based on matched model pricing tiers
  • Tool definitions and tool calls are extracted from observation input/output data

Step 7: Post-Ingestion Side Effects

After successful ingestion, several asynchronous side effects are triggered. Trace upsert events are dispatched to the TraceUpsertQueue, which triggers evaluation job creation for any matching eval configurations. Entity change events are dispatched for webhook delivery. Score events trigger score-specific processing pipelines.

Key considerations:

  • Evaluation triggering is filtered to exclude internal Langfuse traces (environment starting with "langfuse-")
  • Webhook delivery includes HMAC-SHA256 signature verification
  • Dataset run item creation events trigger dataset run item upsert processing

Execution Diagram

GitHub URL

Workflow Repository