Implementation:Langfuse Langfuse Events Repository
| Knowledge Sources | |
|---|---|
| Domains | Events, Tracing, ClickHouse, Analytics |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Repository for querying the ClickHouse events table, providing observation listings, trace aggregation, session queries, and analytics data export with support for both V1 (complete) and V2 (partial field group) API patterns.
Description
This module provides the data access layer for the unified ClickHouse events table, which contains denormalized observation/span records with trace context (user_id, session_id, trace_name) and experiment metadata. It is the primary query source for the new events-based architecture that replaces separate observations/traces queries.
Key function groups:
Observation queries:
getObservationsCountFromEventsTable-- Returns total count matching filter criteria.getObservationsWithModelDataFromEventsTable-- Fetches observations enriched with model pricing data (V1 API, complete observations).getObservationsFromEventsTableV2-- Fetches observations with selective field groups (V2 API, partial observations for performance).getObservationByIdFromEventsTable-- Single observation lookup by ID with model enrichment.
Trace queries (aggregated from events):
getTracesFromEventsTable-- Aggregates events into trace-level records using CTEs, joining scores and computing trace metrics.getTracesCountFromEventsTable-- Count of distinct traces matching filters.getTraceByIdFromEventsTable-- Single trace lookup with observation and score sub-queries.
Session queries:
hasAnySessionFromEventsTable-- Checks if any session exists for a project.
Analytics integration:
getAnalyticsObservationEventsFromEventsTable-- Async generator streaming observation events for PostHog/Mixpanel integration, including session ID extraction from metadata.
The module uses the EventsQueryBuilder and CTEQueryBuilder for constructing complex parameterized queries with the events-specific column definitions. It handles two sets of UI column mappings: eventsTableUiColumnDefinitions for observation-level filters and TRACES_FROM_EVENTS_UI_COLUMN_DEFINITIONS for trace-level filters remapped from the observations table.
Internal helpers include enrichObservationsWithModelData (fetches model pricing from PostgreSQL and merges it) and enrichObservationsWithTraceFields (adds trace-level fields like tags).
Usage
Use this repository when querying events-based data from ClickHouse. The V2 API functions with field groups should be preferred for new endpoints. The analytics streaming functions are used by background workers for data export to third-party analytics platforms.
Code Reference
Source Location
- Repository: Langfuse
- File: packages/shared/src/server/repositories/events.ts
- Lines: 1-2599
Signature
export const getObservationsCountFromEventsTable = async (
opts: ObservationTableQuery,
) => Promise<number>;
export const getObservationsWithModelDataFromEventsTable = async (
opts: ObservationTableQuery & { renderingProps?: RenderingProps },
) => Promise<Array<EventsObservation & ObservationPriceFields>>;
export const getObservationsFromEventsTableV2 = async (
opts: ObservationTableQuery & { requestedFields: ObservationFieldGroup[] },
) => Promise<Array<EventsObservationPublic>>;
export const getObservationByIdFromEventsTable = async (
opts: { projectId: string; observationId: string; startTime?: Date; traceId?: string },
) => Promise<(EventsObservation & ObservationPriceFields) | null>;
export const getTracesFromEventsTable = async (
opts: { projectId: string; filter: FilterState; orderBy?: OrderByState; limit?: number; offset?: number; ... },
) => Promise<Array<Trace>>;
export const getTracesCountFromEventsTable = async (
opts: { projectId: string; filter: FilterState; ... },
) => Promise<number>;
export const getTraceByIdFromEventsTable = async (
opts: { projectId: string; traceId: string; timestamp?: Date },
) => Promise<Trace | null>;
export const getAnalyticsObservationEventsFromEventsTable = async function* (
opts: { projectId: string; minTimestamp: Date; maxTimestamp: Date },
): AsyncGenerator<AnalyticsObservationEvent>;
export const hasAnySessionFromEventsTable = async (
projectId: string,
): Promise<boolean>;
Import
import {
getObservationsCountFromEventsTable,
getObservationsWithModelDataFromEventsTable,
getObservationsFromEventsTableV2,
getObservationByIdFromEventsTable,
getTracesFromEventsTable,
getTracesCountFromEventsTable,
getTraceByIdFromEventsTable,
hasAnySessionFromEventsTable,
} from "@langfuse/shared/src/server/repositories/events";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| projectId | string | Yes | Project scope for all queries |
| filter | FilterState | Yes | Array of filter conditions (datetime, string, number filters) |
| orderBy | OrderByState | No | Sorting specification |
| limit | number | No | Page size for pagination |
| offset | number | No | Page offset for pagination |
| observationId | string | Yes (for single lookup) | Observation ID to retrieve |
| traceId | string | No | Trace ID for scoped queries |
| startTime | Date | No | Time hint for optimized ClickHouse partition pruning |
| requestedFields | ObservationFieldGroup[] | Yes (for V2) | Field groups to include: "core", "model", "usage", "io", etc. |
| renderingProps | RenderingProps | No | Options for input/output rendering (JSON parse, truncation) |
| minTimestamp / maxTimestamp | Date | Yes (for analytics) | Time range for analytics export |
Outputs
| Name | Type | Description |
|---|---|---|
| EventsObservation & ObservationPriceFields | Object | Complete observation with model pricing (V1 API) |
| EventsObservationPublic | Object | Partial observation with requested field groups (V2 API) |
| Trace | Object | Aggregated trace with observations, scores, cost, latency |
| AnalyticsObservationEvent | Object | Flattened event for PostHog/Mixpanel export |
| count | number | Total matching records for pagination |
Usage Examples
import {
getObservationsCountFromEventsTable,
getObservationsWithModelDataFromEventsTable,
getTracesFromEventsTable,
} from "@langfuse/shared/src/server/repositories/events";
// Count observations matching filters
const count = await getObservationsCountFromEventsTable({
projectId: "proj_123",
filter: [{ column: "type", type: "string", operator: "=", value: "GENERATION" }],
});
// Fetch observations with model pricing
const observations = await getObservationsWithModelDataFromEventsTable({
projectId: "proj_123",
filter: [],
limit: 50,
offset: 0,
});
// Get traces aggregated from events
const traces = await getTracesFromEventsTable({
projectId: "proj_123",
filter: [],
orderBy: { column: "timestamp", order: "DESC" },
limit: 20,
});