Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langfuse Langfuse ClickHouse Repository

From Leeroopedia
Knowledge Sources
Domains ClickHouse, Analytics, Data Access
Last Updated 2026-02-14 00:00 GMT

Overview

Core ClickHouse data access layer providing instrumented query, streaming query, upsert, and command functions with automatic retry logic, resource error handling, and OpenTelemetry tracing.

Description

This module is the foundational ClickHouse repository used by all other ClickHouse-backed repositories in Langfuse. It wraps the raw ClickHouse client with instrumentation, error handling, and retry logic, providing four primary operations:

  • queryClickhouse -- Executes a SELECT query and returns all results as a typed array. Includes automatic retry with exponential backoff for transient network errors (socket hang up, connection reset, broken pipe). Retries are configurable via LANGFUSE_CLICKHOUSE_QUERY_MAX_ATTEMPTS.
  • queryClickhouseStream -- An async generator that streams query results row by row, suitable for large result sets and export operations. Detects mid-response ClickHouse exceptions embedded as data rows.
  • upsertClickhouse -- Inserts records into ClickHouse tables (scores, traces, observations, traces_null) while simultaneously writing event JSON to S3 blob storage and optionally logging file references to the blob_storage_file_log table.
  • commandClickhouse -- Executes DDL or mutation commands (e.g., DELETE, ALTER) that do not return result sets. Supports abort signals for cancellable operations.

All operations are wrapped in OpenTelemetry spans with attributes following the database semantic conventions, including query text, query ID, and ClickHouse summary headers for performance tuning.

The module also provides:

  • ClickHouseResourceError -- A custom error class that wraps memory limit, overcommit, and timeout errors with user-friendly advice messages.
  • parseClickhouseUTCDateTimeFormat -- Converts ClickHouse datetime strings to JavaScript Date objects.
  • clickhouseCompliantRandomCharacters -- Generates random alphabetic strings safe for ClickHouse identifiers.

Usage

Use queryClickhouse for standard data retrieval in repository functions. Use queryClickhouseStream for data exports or operations processing large volumes of rows. Use upsertClickhouse during trace/observation/score ingestion. Use commandClickhouse for deletion or DDL operations.

Code Reference

Source Location

Signature

export class ClickHouseResourceError extends Error {
  static ERROR_ADVICE_MESSAGE: string;
  public readonly errorType: "MEMORY_LIMIT" | "OVERCOMMIT" | "TIMEOUT";
  static wrapIfResourceError(originalError: Error): Error;
}

export async function upsertClickhouse<T extends Record<string, unknown>>(opts: {
  table: "scores" | "traces" | "observations" | "traces_null";
  records: T[];
  eventBodyMapper: (body: T) => Record<string, unknown>;
  tags?: Record<string, string>;
}): Promise<void>;

export async function* queryClickhouseStream<T>(opts: {
  query: string;
  params?: Record<string, unknown>;
  clickhouseConfigs?: NodeClickHouseClientConfigOptions;
  tags?: Record<string, string>;
  preferredClickhouseService?: PreferredClickhouseService;
  clickhouseSettings?: ClickHouseSettings;
}): AsyncGenerator<T>;

export async function queryClickhouse<T>(opts: {
  query: string;
  params?: Record<string, unknown>;
  clickhouseConfigs?: NodeClickHouseClientConfigOptions;
  tags?: Record<string, string>;
  preferredClickhouseService?: PreferredClickhouseService;
  clickhouseSettings?: ClickHouseSettings;
}): Promise<T[]>;

export async function commandClickhouse(opts: {
  query: string;
  params?: Record<string, unknown>;
  clickhouseConfigs?: NodeClickHouseClientConfigOptions;
  tags?: Record<string, string>;
  clickhouseSettings?: ClickHouseSettings;
  abortSignal?: AbortSignal;
}): Promise<void>;

export function parseClickhouseUTCDateTimeFormat(dateStr: string): Date;
export function clickhouseCompliantRandomCharacters(): string;

Import

import {
  queryClickhouse,
  queryClickhouseStream,
  upsertClickhouse,
  commandClickhouse,
  parseClickhouseUTCDateTimeFormat,
  ClickHouseResourceError,
} from "@langfuse/shared/src/server/repositories/clickhouse";

I/O Contract

Inputs

Name Type Required Description
query string Yes ClickHouse SQL query string with named parameter placeholders
params Record<string, unknown> No Named parameters for the query (type-safe ClickHouse placeholders like {name: Type})
table "traces" | "observations" | "traces_null" Yes (for upsert) Target ClickHouse table for insert
records T[] Yes (for upsert) Array of records to insert
eventBodyMapper (body: T) => Record<string, unknown> Yes (for upsert) Function to map records to S3 event format
tags Record<string, string> No Metadata tags attached to query log comments and spans
clickhouseConfigs NodeClickHouseClientConfigOptions No Client-level overrides (e.g., request_timeout)
clickhouseSettings ClickHouseSettings No Query-level ClickHouse settings
abortSignal AbortSignal No Signal to cancel command execution

Outputs

Name Type Description
queryClickhouse return Promise<T[]> Array of typed results parsed from JSONEachRow format
queryClickhouseStream return AsyncGenerator<T> Async iterator yielding individual typed rows
upsertClickhouse return Promise<void> Resolves on successful insert to both ClickHouse and S3
commandClickhouse return Promise<void> Resolves on successful command execution

Usage Examples

import { queryClickhouse, commandClickhouse } from "@langfuse/shared/src/server/repositories/clickhouse";

// Query traces for a project
const traces = await queryClickhouse<{ id: string; name: string }>({
  query: `
    SELECT id, name
    FROM traces FINAL
    WHERE project_id = {projectId: String}
    ORDER BY timestamp DESC
    LIMIT 100
  `,
  params: { projectId: "proj_123" },
  tags: { feature: "tracing", type: "trace", kind: "list", projectId: "proj_123" },
});

// Delete observations for a project
await commandClickhouse({
  query: `DELETE FROM observations WHERE project_id = {projectId: String}`,
  params: { projectId: "proj_123" },
  tags: { feature: "tracing", type: "observation", kind: "delete" },
});

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment