Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langfuse Langfuse BlobStorageLog Repository

From Leeroopedia
Knowledge Sources
Domains Repository, ClickHouse, Blob Storage
Last Updated 2026-02-14 00:00 GMT

Overview

ClickHouse repository for querying and managing blob storage file reference records, providing functions to look up S3/MinIO file references by project, entity, trace, and primary key.

Description

This repository module encapsulates all ClickHouse queries against the blob_storage_file_log table, which tracks references to files stored in S3-compatible blob storage (e.g., large trace payloads, observation data). It provides both single-result queries and streaming queries for efficient processing of large result sets.

Query functions:

  • getBlobStorageByProjectAndEntityId: Returns all blob storage records for a specific project/entity type/entity ID combination. Uses FINAL modifier for deduplication.
  • getBlobStorageByProjectId: Streams all blob storage records for a project. Returns an AsyncGenerator for memory-efficient iteration.
  • getBlobStorageByProjectIdBeforeDate: Streams records created before a specified date, useful for data retention and cleanup operations.
  • getBlobStorageByProjectIdAndEntityIds: Streams records for a batch of entity IDs of a specific type (observation, trace, or score). Uses a 2-minute request timeout for large batches.
  • getBlobStorageByProjectIdAndTraceIds: Streams all blob storage records associated with a set of trace IDs, including related observations and scores. Uses a complex CTE with LEFT SEMI JOIN to efficiently filter without building a Cartesian product.

Migration functions:

  • insertIntoS3RefsTableFromEventLog: Migrates data from the legacy event_log table to blob_storage_file_log in batches.
  • getLastEventLogPrimaryKey: Retrieves the first primary key from the legacy event_log table for migration tracking.
  • findS3RefsByPrimaryKey: Looks up blob storage records by their composite primary key for migration verification.

Usage

Use these functions whenever you need to find or iterate over blob storage file references. Streaming functions (AsyncGenerator) are preferred for large result sets to avoid memory pressure. The migration functions are only used during the background migration from event_log to blob_storage_file_log.

Code Reference

Source Location

Signature

export const getBlobStorageByProjectAndEntityId = async (
  projectId: string,
  entityType: string,
  entityId: string,
): Promise<BlobStorageFileRefRecordReadType[]>;

export const getBlobStorageByProjectId = (
  projectId: string,
): AsyncGenerator<BlobStorageFileRefRecordReadType>;

export const getBlobStorageByProjectIdBeforeDate = (
  projectId: string,
  beforeDate: Date,
): AsyncGenerator<BlobStorageFileRefRecordReadType>;

export const getBlobStorageByProjectIdAndEntityIds = (
  projectId: string,
  entityType: "observation" | "trace" | "score",
  entityIds: string[],
): AsyncGenerator<BlobStorageFileRefRecordReadType>;

export const getBlobStorageByProjectIdAndTraceIds = (
  projectId: string,
  traceIds: string[],
): AsyncGenerator<BlobStorageFileRefRecordReadType>;

export const insertIntoS3RefsTableFromEventLog = async (
  limit: number,
  offset: number,
) => Promise<void>;

export const getLastEventLogPrimaryKey = async () =>
  Promise<{ project_id: string; entity_type: string; entity_id: string; bucket_path: string } | undefined>;

export const findS3RefsByPrimaryKey = async (primaryKey: {
  project_id: string;
  entity_type: string;
  entity_id: string;
  bucket_path: string;
}) => Promise<BlobStorageFileRefRecordReadType[]>;

Import

import {
  getBlobStorageByProjectAndEntityId,
  getBlobStorageByProjectId,
  getBlobStorageByProjectIdBeforeDate,
  getBlobStorageByProjectIdAndEntityIds,
  getBlobStorageByProjectIdAndTraceIds,
} from "@langfuse/shared/src/server/repositories/blobStorageLog";

I/O Contract

Inputs

Name Type Required Description
projectId string Yes The project ID to scope queries
entityType "trace" | "score" Varies The entity type to filter on
entityId string Varies The specific entity ID to look up
entityIds string[] Varies Array of entity IDs for batch queries
traceIds string[] Varies Array of trace IDs; will also match related observations and scores
beforeDate Date Varies Upper date bound for filtering records

Outputs

Name Type Description
(return) BlobStorageFileRefRecordReadType[] or AsyncGenerator<BlobStorageFileRefRecordReadType> Blob storage file reference records, either as an array or async generator for streaming

Usage Examples

import {
  getBlobStorageByProjectAndEntityId,
  getBlobStorageByProjectIdAndTraceIds,
} from "./blobStorageLog";

// Get blob refs for a specific observation
const refs = await getBlobStorageByProjectAndEntityId(
  "proj-123",
  "observation",
  "obs-456",
);

// Stream all blob refs for a set of traces (including related obs/scores)
for await (const ref of getBlobStorageByProjectIdAndTraceIds("proj-123", ["t1", "t2"])) {
  await deleteFromS3(ref.bucket_name, ref.bucket_path);
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment