Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:Langfuse Langfuse Event Validation and Schema Parsing

From Leeroopedia
Revision as of 17:43, 16 February 2026 by Admin (talk | contribs) (Auto-imported from principles/Langfuse_Langfuse_Event_Validation_and_Schema_Parsing.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains Data Validation, Trace Ingestion
Last Updated 2026-02-14 00:00 GMT

Overview

Event validation and schema parsing is the practice of defining strict structural contracts for every ingestion event type and enforcing those contracts at the boundary where raw JSON enters the system.

Description

The Langfuse trace ingestion pipeline accepts a wide variety of event types from multiple SDK languages (Python, JavaScript/TypeScript) and from OpenTelemetry-compatible instrumentations. Each event represents a different kind of telemetry: trace creation, span creation/update, generation creation/update, score creation, SDK log messages, and more. Without rigorous schema validation, malformed or incompatible payloads could silently corrupt analytics data, crash downstream processors, or produce misleading dashboards.

The event validation system addresses this by:

  1. Enumerating all event types: A central eventTypes constant maps human-readable event names (e.g., "trace-create", "generation-update") to their string identifiers. This enumeration serves as the single source of truth for what the pipeline can process.
  1. Defining body schemas per event type: Each event type has a corresponding Zod v4 schema that validates the event body. For example, TraceBody validates fields like id, name, sessionId, environment, tags, and metadata. Observation-related schemas (spans, generations, events) build on a shared OptionalObservationBody base and extend it with type-specific fields like endTime, model, usage, and usageDetails.
  1. Handling usage format diversity: Token usage can arrive in multiple formats: the legacy Langfuse format (promptTokens, completionTokens, totalTokens), the modern Langfuse format (input, output, total, unit), or the OpenAI-native format (prompt_tokens, completion_tokens, total_tokens with optional detail breakdowns). Transform pipelines within the schema normalize all formats into a unified representation.
  1. Supporting discriminated unions: The top-level ingestion event schema uses a Zod discriminated union on the type field. This ensures that the parser selects the correct body schema based on the event type, providing precise error messages when validation fails.
  1. Separating public and internal validation: A factory function createIngestionEventSchema produces either a "public" or "internal" variant. The public variant enforces that environment names must not start with "langfuse" (reserved prefix), while the internal variant relaxes this restriction for Langfuse-generated events such as prompt experiment traces.

Usage

Apply this principle whenever:

  • Adding a new event type to the ingestion pipeline (e.g., a new observation subtype).
  • Changing the structure of an existing event body (fields must be added as optional to preserve backward compatibility).
  • Integrating data from a new SDK or instrumentation library that may use a different field naming convention.
  • Normalizing data formats (e.g., token counts) from heterogeneous sources into a canonical representation.

Theoretical Basis

The validation system is built on the concept of parse, don't validate, where raw data is transformed into well-typed structures at the system boundary, and all downstream code operates on the validated types without further checking.

Event Type Taxonomy

eventTypes = {
  TRACE_CREATE:      "trace-create"        -- Top-level trace
  SCORE_CREATE:      "score-create"        -- Evaluation score
  EVENT_CREATE:      "event-create"        -- Instant observation event
  SPAN_CREATE:       "span-create"         -- Duration-based span
  SPAN_UPDATE:       "span-update"         -- Span update (partial)
  GENERATION_CREATE: "generation-create"   -- LLM generation
  GENERATION_UPDATE: "generation-update"   -- Generation update (partial)
  AGENT_CREATE:      "agent-create"        -- Agent observation
  TOOL_CREATE:       "tool-create"         -- Tool call observation
  CHAIN_CREATE:      "chain-create"        -- Chain observation
  RETRIEVER_CREATE:  "retriever-create"    -- Retriever observation
  EVALUATOR_CREATE:  "evaluator-create"    -- Evaluator observation
  EMBEDDING_CREATE:  "embedding-create"    -- Embedding observation
  GUARDRAIL_CREATE:  "guardrail-create"    -- Guardrail observation
  SDK_LOG:           "sdk-log"             -- SDK diagnostic log
  DATASET_RUN_ITEM_CREATE: "dataset-run-item-create"  -- Dataset run item (internal only)
  OBSERVATION_CREATE: "observation-create"  -- Legacy observation (deprecated)
  OBSERVATION_UPDATE: "observation-update"  -- Legacy observation update (deprecated)
}

Schema Inheritance Hierarchy

OptionalObservationBody
  |-- CreateEventEvent (adds required id)
  |     |-- CreateSpanBody (adds endTime)
  |     |     |-- CreateGenerationBody (adds model, modelParameters, usage,
  |     |           usageDetails, costDetails, promptName, promptVersion)
  |     |
  |     |-- UpdateSpanBody (adds endTime)
  |           |-- UpdateGenerationBody (same extensions as CreateGenerationBody)
  |
  TraceBody (independent schema with id, name, sessionId, userId, environment, tags, etc.)
  ScoreBody (discriminated union on dataType: NUMERIC | CATEGORICAL | BOOLEAN | CORRECTION)
  SdkLogEvent (log field only)

Usage Normalization Pipeline

Input: Raw usage object from SDK
  |
  +-- If contains promptTokens/completionTokens/totalTokens (legacy):
  |     Transform to { input, output, total, unit: "TOKENS" }
  |
  +-- If contains prompt_tokens/completion_tokens/total_tokens (OpenAI Completion API):
  |     Extract detail breakdowns (prompt_tokens_details, completion_tokens_details)
  |     Compute residual base counts after subtracting detail categories
  |     Transform to { input, output, total, input_*, output_* }
  |
  +-- If contains input_tokens/output_tokens/total_tokens (OpenAI Response API):
  |     Same breakdown logic as above
  |     Transform to { input, output, total, input_*, output_* }
  |
  +-- Otherwise: Pass through as raw { key: integer } record
  |
Output: Validated Usage or UsageDetails object

ID Constraints

All entity IDs are validated with the following constraints:

  • Minimum length: 1 character
  • Maximum length: 800 characters (to fit within AWS S3 object key limits of 1024 bytes with room for path prefixes)
  • Must not contain carriage return characters (\r)

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment