Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langfuse Langfuse Events Repository

From Leeroopedia
Knowledge Sources
Domains Events, Tracing, ClickHouse, Analytics
Last Updated 2026-02-14 00:00 GMT

Overview

Repository for querying the ClickHouse events table, providing observation listings, trace aggregation, session queries, and analytics data export with support for both V1 (complete) and V2 (partial field group) API patterns.

Description

This module provides the data access layer for the unified ClickHouse events table, which contains denormalized observation/span records with trace context (user_id, session_id, trace_name) and experiment metadata. It is the primary query source for the new events-based architecture that replaces separate observations/traces queries.

Key function groups:

Observation queries:

  • getObservationsCountFromEventsTable -- Returns total count matching filter criteria.
  • getObservationsWithModelDataFromEventsTable -- Fetches observations enriched with model pricing data (V1 API, complete observations).
  • getObservationsFromEventsTableV2 -- Fetches observations with selective field groups (V2 API, partial observations for performance).
  • getObservationByIdFromEventsTable -- Single observation lookup by ID with model enrichment.

Trace queries (aggregated from events):

  • getTracesFromEventsTable -- Aggregates events into trace-level records using CTEs, joining scores and computing trace metrics.
  • getTracesCountFromEventsTable -- Count of distinct traces matching filters.
  • getTraceByIdFromEventsTable -- Single trace lookup with observation and score sub-queries.

Session queries:

  • hasAnySessionFromEventsTable -- Checks if any session exists for a project.

Analytics integration:

  • getAnalyticsObservationEventsFromEventsTable -- Async generator streaming observation events for PostHog/Mixpanel integration, including session ID extraction from metadata.

The module uses the EventsQueryBuilder and CTEQueryBuilder for constructing complex parameterized queries with the events-specific column definitions. It handles two sets of UI column mappings: eventsTableUiColumnDefinitions for observation-level filters and TRACES_FROM_EVENTS_UI_COLUMN_DEFINITIONS for trace-level filters remapped from the observations table.

Internal helpers include enrichObservationsWithModelData (fetches model pricing from PostgreSQL and merges it) and enrichObservationsWithTraceFields (adds trace-level fields like tags).

Usage

Use this repository when querying events-based data from ClickHouse. The V2 API functions with field groups should be preferred for new endpoints. The analytics streaming functions are used by background workers for data export to third-party analytics platforms.

Code Reference

Source Location

Signature

export const getObservationsCountFromEventsTable = async (
  opts: ObservationTableQuery,
) => Promise<number>;

export const getObservationsWithModelDataFromEventsTable = async (
  opts: ObservationTableQuery & { renderingProps?: RenderingProps },
) => Promise<Array<EventsObservation & ObservationPriceFields>>;

export const getObservationsFromEventsTableV2 = async (
  opts: ObservationTableQuery & { requestedFields: ObservationFieldGroup[] },
) => Promise<Array<EventsObservationPublic>>;

export const getObservationByIdFromEventsTable = async (
  opts: { projectId: string; observationId: string; startTime?: Date; traceId?: string },
) => Promise<(EventsObservation & ObservationPriceFields) | null>;

export const getTracesFromEventsTable = async (
  opts: { projectId: string; filter: FilterState; orderBy?: OrderByState; limit?: number; offset?: number; ... },
) => Promise<Array<Trace>>;

export const getTracesCountFromEventsTable = async (
  opts: { projectId: string; filter: FilterState; ... },
) => Promise<number>;

export const getTraceByIdFromEventsTable = async (
  opts: { projectId: string; traceId: string; timestamp?: Date },
) => Promise<Trace | null>;

export const getAnalyticsObservationEventsFromEventsTable = async function* (
  opts: { projectId: string; minTimestamp: Date; maxTimestamp: Date },
): AsyncGenerator<AnalyticsObservationEvent>;

export const hasAnySessionFromEventsTable = async (
  projectId: string,
): Promise<boolean>;

Import

import {
  getObservationsCountFromEventsTable,
  getObservationsWithModelDataFromEventsTable,
  getObservationsFromEventsTableV2,
  getObservationByIdFromEventsTable,
  getTracesFromEventsTable,
  getTracesCountFromEventsTable,
  getTraceByIdFromEventsTable,
  hasAnySessionFromEventsTable,
} from "@langfuse/shared/src/server/repositories/events";

I/O Contract

Inputs

Name Type Required Description
projectId string Yes Project scope for all queries
filter FilterState Yes Array of filter conditions (datetime, string, number filters)
orderBy OrderByState No Sorting specification
limit number No Page size for pagination
offset number No Page offset for pagination
observationId string Yes (for single lookup) Observation ID to retrieve
traceId string No Trace ID for scoped queries
startTime Date No Time hint for optimized ClickHouse partition pruning
requestedFields ObservationFieldGroup[] Yes (for V2) Field groups to include: "core", "model", "usage", "io", etc.
renderingProps RenderingProps No Options for input/output rendering (JSON parse, truncation)
minTimestamp / maxTimestamp Date Yes (for analytics) Time range for analytics export

Outputs

Name Type Description
EventsObservation & ObservationPriceFields Object Complete observation with model pricing (V1 API)
EventsObservationPublic Object Partial observation with requested field groups (V2 API)
Trace Object Aggregated trace with observations, scores, cost, latency
AnalyticsObservationEvent Object Flattened event for PostHog/Mixpanel export
count number Total matching records for pagination

Usage Examples

import {
  getObservationsCountFromEventsTable,
  getObservationsWithModelDataFromEventsTable,
  getTracesFromEventsTable,
} from "@langfuse/shared/src/server/repositories/events";

// Count observations matching filters
const count = await getObservationsCountFromEventsTable({
  projectId: "proj_123",
  filter: [{ column: "type", type: "string", operator: "=", value: "GENERATION" }],
});

// Fetch observations with model pricing
const observations = await getObservationsWithModelDataFromEventsTable({
  projectId: "proj_123",
  filter: [],
  limit: 50,
  offset: 0,
});

// Get traces aggregated from events
const traces = await getTracesFromEventsTable({
  projectId: "proj_123",
  filter: [],
  orderBy: { column: "timestamp", order: "DESC" },
  limit: 20,
});

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment