Implementation:Langfuse Langfuse ClickHouse Repository
| Knowledge Sources | |
|---|---|
| Domains | ClickHouse, Analytics, Data Access |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Core ClickHouse data access layer providing instrumented query, streaming query, upsert, and command functions with automatic retry logic, resource error handling, and OpenTelemetry tracing.
Description
This module is the foundational ClickHouse repository used by all other ClickHouse-backed repositories in Langfuse. It wraps the raw ClickHouse client with instrumentation, error handling, and retry logic, providing four primary operations:
queryClickhouse-- Executes a SELECT query and returns all results as a typed array. Includes automatic retry with exponential backoff for transient network errors (socket hang up, connection reset, broken pipe). Retries are configurable viaLANGFUSE_CLICKHOUSE_QUERY_MAX_ATTEMPTS.queryClickhouseStream-- An async generator that streams query results row by row, suitable for large result sets and export operations. Detects mid-response ClickHouse exceptions embedded as data rows.upsertClickhouse-- Inserts records into ClickHouse tables (scores, traces, observations, traces_null) while simultaneously writing event JSON to S3 blob storage and optionally logging file references to theblob_storage_file_logtable.commandClickhouse-- Executes DDL or mutation commands (e.g., DELETE, ALTER) that do not return result sets. Supports abort signals for cancellable operations.
All operations are wrapped in OpenTelemetry spans with attributes following the database semantic conventions, including query text, query ID, and ClickHouse summary headers for performance tuning.
The module also provides:
ClickHouseResourceError-- A custom error class that wraps memory limit, overcommit, and timeout errors with user-friendly advice messages.parseClickhouseUTCDateTimeFormat-- Converts ClickHouse datetime strings to JavaScript Date objects.clickhouseCompliantRandomCharacters-- Generates random alphabetic strings safe for ClickHouse identifiers.
Usage
Use queryClickhouse for standard data retrieval in repository functions. Use queryClickhouseStream for data exports or operations processing large volumes of rows. Use upsertClickhouse during trace/observation/score ingestion. Use commandClickhouse for deletion or DDL operations.
Code Reference
Source Location
- Repository: Langfuse
- File: packages/shared/src/server/repositories/clickhouse.ts
- Lines: 1-516
Signature
export class ClickHouseResourceError extends Error {
static ERROR_ADVICE_MESSAGE: string;
public readonly errorType: "MEMORY_LIMIT" | "OVERCOMMIT" | "TIMEOUT";
static wrapIfResourceError(originalError: Error): Error;
}
export async function upsertClickhouse<T extends Record<string, unknown>>(opts: {
table: "scores" | "traces" | "observations" | "traces_null";
records: T[];
eventBodyMapper: (body: T) => Record<string, unknown>;
tags?: Record<string, string>;
}): Promise<void>;
export async function* queryClickhouseStream<T>(opts: {
query: string;
params?: Record<string, unknown>;
clickhouseConfigs?: NodeClickHouseClientConfigOptions;
tags?: Record<string, string>;
preferredClickhouseService?: PreferredClickhouseService;
clickhouseSettings?: ClickHouseSettings;
}): AsyncGenerator<T>;
export async function queryClickhouse<T>(opts: {
query: string;
params?: Record<string, unknown>;
clickhouseConfigs?: NodeClickHouseClientConfigOptions;
tags?: Record<string, string>;
preferredClickhouseService?: PreferredClickhouseService;
clickhouseSettings?: ClickHouseSettings;
}): Promise<T[]>;
export async function commandClickhouse(opts: {
query: string;
params?: Record<string, unknown>;
clickhouseConfigs?: NodeClickHouseClientConfigOptions;
tags?: Record<string, string>;
clickhouseSettings?: ClickHouseSettings;
abortSignal?: AbortSignal;
}): Promise<void>;
export function parseClickhouseUTCDateTimeFormat(dateStr: string): Date;
export function clickhouseCompliantRandomCharacters(): string;
Import
import {
queryClickhouse,
queryClickhouseStream,
upsertClickhouse,
commandClickhouse,
parseClickhouseUTCDateTimeFormat,
ClickHouseResourceError,
} from "@langfuse/shared/src/server/repositories/clickhouse";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| query | string | Yes | ClickHouse SQL query string with named parameter placeholders |
| params | Record<string, unknown> | No | Named parameters for the query (type-safe ClickHouse placeholders like {name: Type}) |
| table | "traces" | "observations" | "traces_null" | Yes (for upsert) | Target ClickHouse table for insert |
| records | T[] | Yes (for upsert) | Array of records to insert |
| eventBodyMapper | (body: T) => Record<string, unknown> | Yes (for upsert) | Function to map records to S3 event format |
| tags | Record<string, string> | No | Metadata tags attached to query log comments and spans |
| clickhouseConfigs | NodeClickHouseClientConfigOptions | No | Client-level overrides (e.g., request_timeout) |
| clickhouseSettings | ClickHouseSettings | No | Query-level ClickHouse settings |
| abortSignal | AbortSignal | No | Signal to cancel command execution |
Outputs
| Name | Type | Description |
|---|---|---|
| queryClickhouse return | Promise<T[]> | Array of typed results parsed from JSONEachRow format |
| queryClickhouseStream return | AsyncGenerator<T> | Async iterator yielding individual typed rows |
| upsertClickhouse return | Promise<void> | Resolves on successful insert to both ClickHouse and S3 |
| commandClickhouse return | Promise<void> | Resolves on successful command execution |
Usage Examples
import { queryClickhouse, commandClickhouse } from "@langfuse/shared/src/server/repositories/clickhouse";
// Query traces for a project
const traces = await queryClickhouse<{ id: string; name: string }>({
query: `
SELECT id, name
FROM traces FINAL
WHERE project_id = {projectId: String}
ORDER BY timestamp DESC
LIMIT 100
`,
params: { projectId: "proj_123" },
tags: { feature: "tracing", type: "trace", kind: "list", projectId: "proj_123" },
});
// Delete observations for a project
await commandClickhouse({
query: `DELETE FROM observations WHERE project_id = {projectId: String}`,
params: { projectId: "proj_123" },
tags: { feature: "tracing", type: "observation", kind: "delete" },
});