Implementation:Langfuse Langfuse CreateExperimentJobClickhouse
| Knowledge Sources | |
|---|---|
| Domains | LLM Experimentation, Data Pipeline |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for processing an experiment run by iterating over dataset items, deduplicating against already-processed items, and executing each item through the LLM pipeline, provided by Langfuse.
Description
createExperimentJobClickhouse is the main worker function that processes experiment creation jobs consumed from ExperimentCreateQueue. It orchestrates the full lifecycle of an experiment run:
- Configuration validation via
validateAndSetupExperiment, which fetches the dataset run from PostgreSQL, parses its metadata (prompt ID, provider, model, model params), resolves the prompt viaPromptService, fetches the LLM API key, and extracts all template variables and placeholders. - Item retrieval via
getItemsToProcess, which fetches active dataset items from ClickHouse, filters them by variable compatibility usingvalidateDatasetItem, normalizes string inputs to object format, and deduplicates by querying thedataset_run_items_rmttable for existing run items. - Sequential processing via a for-loop that calls
processItemfor each remaining item. EachprocessItemcall creates a run item ingestion event, executes the LLM call, schedules observation evals, and enqueues a delayed run item upsert.
If configuration validation fails with an UnrecoverableError, the function calls createAllDatasetRunItemsWithConfigError, which creates error-level traces and run items for every active dataset item, giving the user visibility into the problem.
The function also supports getItemsToProcess as a separate exported helper, used to retrieve and filter the items to process. This helper performs batch deduplication by querying ClickHouse for all dataset item IDs that already have run items for the given run, then filtering those out.
Usage
This function is invoked by the BullMQ worker that consumes ExperimentCreateQueue. It is not called directly from the web application. The worker infrastructure handles job deserialization, retry logic, and error reporting.
Code Reference
Source Location
- Repository: langfuse
- File: worker/src/features/experiments/experimentServiceClickhouse.ts
- Lines: 231-366 (createExperimentJobClickhouse and getItemsToProcess), 67-165 (processItem)
Signature
export const createExperimentJobClickhouse = async ({
event,
}: {
event: z.infer<typeof ExperimentCreateEventSchema>;
}): Promise<{ success: true }> => { ... }
async function getItemsToProcess(
projectId: string,
datasetId: string,
runId: string,
config: PromptExperimentConfig,
): Promise<Array<DatasetItemDomain & { input: Prisma.JsonObject }>>
async function processItem(
projectId: string,
datasetItem: DatasetItemDomain & { input: Prisma.JsonObject },
config: PromptExperimentConfig,
): Promise<{ success: boolean }>
Import
import { createExperimentJobClickhouse } from "@/src/features/experiments/experimentServiceClickhouse";
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| event.projectId | string | Yes | The project ID for the experiment. |
| event.datasetId | string | Yes | The dataset ID whose items will be processed. |
| event.runId | string | Yes | The dataset run ID created by createExperiment.
|
| event.description | string | No | Optional description passed through from the experiment creation. |
Outputs
| Name | Type | Description |
|---|---|---|
| success | true | Always returns { success: true }. Individual item failures are logged but do not cause the function to throw.
|
Side Effects
| Effect | Description |
|---|---|
| Dataset run items | Ingestion events create dataset_run_items_rmt records in ClickHouse for each processed item.
|
| Traces | Internal traces are created in the langfuse-prompt-experiment environment for each item, containing the LLM generation.
|
| Observation evals | If generation details are available, observation-level evals are scheduled via scheduleExperimentObservationEvals.
|
| Delayed upsert jobs | A DatasetRunItemUpsert job is enqueued with a 30-second delay for each processed item.
|
| Error traces | On configuration errors, error-level traces and generations are created for every active dataset item. |
Usage Examples
Worker Queue Consumer
// This is how the BullMQ worker invokes the function:
import { createExperimentJobClickhouse } from "./experimentServiceClickhouse";
worker.on("completed", async (job) => {
const event = ExperimentCreateEventSchema.parse(job.data.payload);
await createExperimentJobClickhouse({ event });
});
Deduplication Query (Internal)
// getExistingRunItemDatasetItemIds performs this ClickHouse query:
const query = `
SELECT dataset_item_id as id
FROM dataset_run_items_rmt
WHERE project_id = {projectId: String}
AND dataset_id = {datasetId: String}
AND dataset_run_id = {runId: String}
`;
// Returns a Set<string> of already-processed dataset item IDs