Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse CreateExperimentJobClickhouse

From Leeroopedia
Knowledge Sources
Domains LLM Experimentation, Data Pipeline
Last Updated 2026-02-14 00:00 GMT

Overview

Concrete tool for processing an experiment run by iterating over dataset items, deduplicating against already-processed items, and executing each item through the LLM pipeline, provided by Langfuse.

Description

createExperimentJobClickhouse is the main worker function that processes experiment creation jobs consumed from ExperimentCreateQueue. It orchestrates the full lifecycle of an experiment run:

  1. Configuration validation via validateAndSetupExperiment, which fetches the dataset run from PostgreSQL, parses its metadata (prompt ID, provider, model, model params), resolves the prompt via PromptService, fetches the LLM API key, and extracts all template variables and placeholders.
  2. Item retrieval via getItemsToProcess, which fetches active dataset items from ClickHouse, filters them by variable compatibility using validateDatasetItem, normalizes string inputs to object format, and deduplicates by querying the dataset_run_items_rmt table for existing run items.
  3. Sequential processing via a for-loop that calls processItem for each remaining item. Each processItem call creates a run item ingestion event, executes the LLM call, schedules observation evals, and enqueues a delayed run item upsert.

If configuration validation fails with an UnrecoverableError, the function calls createAllDatasetRunItemsWithConfigError, which creates error-level traces and run items for every active dataset item, giving the user visibility into the problem.

The function also supports getItemsToProcess as a separate exported helper, used to retrieve and filter the items to process. This helper performs batch deduplication by querying ClickHouse for all dataset item IDs that already have run items for the given run, then filtering those out.

Usage

This function is invoked by the BullMQ worker that consumes ExperimentCreateQueue. It is not called directly from the web application. The worker infrastructure handles job deserialization, retry logic, and error reporting.

Code Reference

Source Location

  • Repository: langfuse
  • File: worker/src/features/experiments/experimentServiceClickhouse.ts
  • Lines: 231-366 (createExperimentJobClickhouse and getItemsToProcess), 67-165 (processItem)

Signature

export const createExperimentJobClickhouse = async ({
  event,
}: {
  event: z.infer<typeof ExperimentCreateEventSchema>;
}): Promise<{ success: true }> => { ... }

async function getItemsToProcess(
  projectId: string,
  datasetId: string,
  runId: string,
  config: PromptExperimentConfig,
): Promise<Array<DatasetItemDomain & { input: Prisma.JsonObject }>>

async function processItem(
  projectId: string,
  datasetItem: DatasetItemDomain & { input: Prisma.JsonObject },
  config: PromptExperimentConfig,
): Promise<{ success: boolean }>

Import

import { createExperimentJobClickhouse } from "@/src/features/experiments/experimentServiceClickhouse";

I/O Contract

Inputs

Name Type Required Description
event.projectId string Yes The project ID for the experiment.
event.datasetId string Yes The dataset ID whose items will be processed.
event.runId string Yes The dataset run ID created by createExperiment.
event.description string No Optional description passed through from the experiment creation.

Outputs

Name Type Description
success true Always returns { success: true }. Individual item failures are logged but do not cause the function to throw.

Side Effects

Effect Description
Dataset run items Ingestion events create dataset_run_items_rmt records in ClickHouse for each processed item.
Traces Internal traces are created in the langfuse-prompt-experiment environment for each item, containing the LLM generation.
Observation evals If generation details are available, observation-level evals are scheduled via scheduleExperimentObservationEvals.
Delayed upsert jobs A DatasetRunItemUpsert job is enqueued with a 30-second delay for each processed item.
Error traces On configuration errors, error-level traces and generations are created for every active dataset item.

Usage Examples

Worker Queue Consumer

// This is how the BullMQ worker invokes the function:
import { createExperimentJobClickhouse } from "./experimentServiceClickhouse";

worker.on("completed", async (job) => {
  const event = ExperimentCreateEventSchema.parse(job.data.payload);
  await createExperimentJobClickhouse({ event });
});

Deduplication Query (Internal)

// getExistingRunItemDatasetItemIds performs this ClickHouse query:
const query = `
  SELECT dataset_item_id as id
  FROM dataset_run_items_rmt
  WHERE project_id = {projectId: String}
  AND dataset_id = {datasetId: String}
  AND dataset_run_id = {runId: String}
`;
// Returns a Set<string> of already-processed dataset item IDs

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment