Implementation:Langfuse Langfuse FetchLLMCompletion Experiment
| Knowledge Sources | |
|---|---|
| Domains | LLM Experimentation, LLM Integration |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Concrete tool for executing a single LLM call against a dataset item within the experiment pipeline, including variable substitution, trace creation, and generation detail extraction, provided by Langfuse.
Description
The experiment LLM execution is implemented across two functions in experimentServiceClickhouse.ts:
processItem is the top-level function called for each dataset item. It:
- Generates a deterministic W3C trace ID from
config.runIdanddatasetItem.idusingcreateW3CTraceId, ensuring idempotent trace creation on retries. - Creates a dataset run item via
processEventBatchwith an ingestion event of typeDATASET_RUN_ITEM_CREATE. - Delegates to
processLLMCallfor the actual LLM invocation. - If generation details are available, calls
scheduleExperimentObservationEvalsto schedule evaluations. - Enqueues a
DatasetRunItemUpsertjob to the delayed queue.
processLLMCall handles variable substitution and LLM invocation:
- Calls
replaceVariablesInPromptwhich processes template variables and message placeholders, returning an array ofChatMessageobjects. - Constructs a
TraceSinkParamsobject that configures internal tracing with environmentlangfuse-prompt-experiment, the target project ID, and metadata containing dataset and experiment identifiers. - Attaches an
onGenerationCompletecallback that capturesGenerationDetails(observation ID, input, output, metadata). - Calls
fetchLLMCompletionwithstreaming: false,maxRetries: 1, the assembled messages, model parameters (provider, model, adapter, temperature, etc.), and optional structured output schema. - Returns
{ success: true, generationDetails }. Errors fromfetchLLMCompletionare caught and swallowed (the.catch()at the end), meaning a failed LLM call still returns success to avoid halting the experiment loop.
This is a wrapper around the shared fetchLLMCompletion utility, which itself delegates to LangChain provider implementations.
Usage
These functions are called internally by createExperimentJobClickhouse during the sequential item processing loop. They are not intended to be called directly from outside the experiment service.
Code Reference
Source Location
- Repository: langfuse
- File: worker/src/features/experiments/experimentServiceClickhouse.ts
- Lines: 67-229
Signature
async function processItem(
projectId: string,
datasetItem: DatasetItemDomain & { input: Prisma.JsonObject },
config: PromptExperimentConfig,
): Promise<{ success: boolean }>
async function processLLMCall(
runItemId: string,
traceId: string,
datasetItem: DatasetItemDomain & { input: Prisma.JsonObject },
config: PromptExperimentConfig,
): Promise<{ success: boolean; generationDetails?: GenerationDetails }>
Import
// Internal to the worker experiment service; not exported
// The entry point is createExperimentJobClickhouse which calls processItem internally.
import { createExperimentJobClickhouse } from "./experimentServiceClickhouse";
I/O Contract
Inputs (processItem)
| Name | Type | Required | Description |
|---|---|---|---|
| projectId | string | Yes | The project ID for trace creation and eval scheduling. |
| datasetItem | DatasetItemDomain & { input: Prisma.JsonObject } | Yes | The dataset item with validated and normalized input as a JSON object. |
| config | PromptExperimentConfig | Yes | The full experiment configuration including prompt, model params, API key, variables, run ID, etc. |
Inputs (processLLMCall)
| Name | Type | Required | Description |
|---|---|---|---|
| runItemId | string | Yes | UUID for the dataset run item, used in trace naming. |
| traceId | string | Yes | Deterministic W3C trace ID for the experiment item. |
| datasetItem | DatasetItemDomain & { input: Prisma.JsonObject } | Yes | The dataset item with input values for variable substitution. |
| config | PromptExperimentConfig | Yes | Experiment configuration containing validatedPrompt, allVariables, placeholderNames, provider, model, model_params, validatedApiKey, structuredOutputSchema, prompt metadata, etc. |
Outputs (processItem)
| Name | Type | Description |
|---|---|---|
| success | boolean | Whether the item was processed without a fatal error. |
Outputs (processLLMCall)
| Name | Type | Description |
|---|---|---|
| success | boolean | Always true (errors are caught). Indicates the function completed without throwing.
|
| generationDetails | GenerationDetails (optional) | Contains observationId, name, input, output, and metadata from the traced generation. Undefined if the LLM call failed silently.
|
Side Effects
| Effect | Description |
|---|---|
| Run item ingestion | A DATASET_RUN_ITEM_CREATE event is processed, creating the run item record.
|
| Trace creation | An internal trace is created in the langfuse-prompt-experiment environment in the user's project.
|
| Generation observation | A generation observation is created within the trace, recording input, output, usage, cost, and model info. |
| Eval scheduling | If generation details are captured, observation-level evals are scheduled. |
| Delayed upsert | A DatasetRunItemUpsert job is enqueued with a 30-second delay.
|
Usage Examples
Variable Substitution Flow
// Given a text prompt: "Answer the question: {{question}}\nContext: {{context}}"
// And a dataset item with input: { question: "What is 2+2?", context: "Math basics" }
// replaceVariablesInPrompt produces:
[
{
role: "system",
content: "Answer the question: What is 2+2?\nContext: Math basics",
type: "system",
}
]
TraceSinkParams Configuration
const traceSinkParams: TraceSinkParams = {
environment: LangfuseInternalTraceEnvironment.PromptExperiments,
traceName: `dataset-run-item-${runItemId.slice(0, 5)}`,
traceId,
targetProjectId: config.projectId,
metadata: {
dataset_id: datasetItem.datasetId,
dataset_item_id: datasetItem.id,
structured_output_schema: config.structuredOutputSchema,
experiment_name: config.experimentName,
experiment_run_name: config.experimentRunName,
},
prompt: config.prompt,
onGenerationComplete: (details) => {
generationDetails = details;
},
};
fetchLLMCompletion Call
await fetchLLMCompletion({
streaming: false,
llmConnection: config.validatedApiKey,
maxRetries: 1,
messages,
modelParams: {
provider: config.provider,
model: config.model,
adapter: config.validatedApiKey.adapter,
...config.model_params,
},
structuredOutputSchema: config.structuredOutputSchema,
traceSinkParams,
}).catch(); // Swallow errors to avoid halting the experiment loop