Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Langfuse Langfuse S3 Payload Upload

From Leeroopedia
Knowledge Sources
Domains Blob Storage, Trace Ingestion
Last Updated 2026-02-14 00:00 GMT

Overview

S3 payload upload is the practice of persisting ingestion event data to durable blob storage before enqueuing lightweight references for asynchronous processing, decoupling ingestion throughput from downstream processing capacity.

Description

When trace events arrive at the Langfuse API, they must be durably stored before the API can acknowledge the request. Directly writing to the final analytics database (ClickHouse) on the hot path would create tight coupling between API latency and database write performance. Instead, Langfuse follows a store-then-queue pattern:

  1. Persist to blob storage: The validated event batch is serialized to JSON and uploaded to S3-compatible object storage. Events are grouped by their entity body ID so that multiple updates to the same observation within a single batch are stored as a single file, reducing the number of S3 write operations.
  1. Enqueue a reference: After successful upload, a lightweight job is enqueued to a BullMQ queue containing only the S3 file key and auth context. The actual event payload is not placed on the queue, keeping queue memory usage low.
  1. Worker retrieval: The background worker retrieves the event data by downloading the JSON from S3 using the file key from the queue job.

This architecture provides several benefits:

  • Durability: S3 provides 11 nines of durability, ensuring events survive even if the queue or worker crashes.
  • Decoupling: The API response time is bounded by S3 upload latency (typically tens of milliseconds), not by ClickHouse write latency.
  • Scalability: S3 scales horizontally without configuration, and multiple workers can retrieve and process events concurrently.
  • Cost efficiency: S3 storage is inexpensive compared to keeping large payloads in Redis memory.

The storage layer is abstracted behind a StorageService interface, allowing the same code to work with AWS S3, Azure Blob Storage, Google Cloud Storage, or MinIO for local development. The factory pattern selects the appropriate implementation based on environment configuration.

Usage

Apply this principle whenever:

  • Ingestion event payloads need to be durably stored before acknowledgment.
  • Event payloads are too large to fit efficiently in Redis queue memory.
  • The system needs to support multiple cloud storage backends.
  • Multipart upload is needed for large files (e.g., trace data with large input/output payloads).

Theoretical Basis

The store-then-queue pattern is an application of the transactional outbox concept adapted for blob storage:

Storage Path Convention

Path format:
  {prefix}/{projectId}/{entityType}/{eventBodyId}/{eventId}.json

Where:
  prefix      = LANGFUSE_S3_EVENT_UPLOAD_PREFIX (configurable, e.g., "ingestion-events/")
  projectId   = The authenticated project's ID
  entityType  = One of: "trace", "observation", "score", "dataset_run_item"
  eventBodyId = The body.id field of the event (the trace/observation/score entity ID)
  eventId     = The event.id field (unique per ingestion event)

Example:
  ingestion-events/proj_abc123/observation/obs_xyz789/evt_001.json

Event Grouping

Given a batch of N events:
  1. Group events by key = "{entityType}-{eventBodyId}"
  2. For each group:
     a. Serialize all events in the group as a JSON array.
     b. Upload to S3 at the path derived from the group's entity type and body ID.
  3. Result: M uploads where M <= N (events for the same entity are batched).

This reduces S3 PUT operations when an SDK sends multiple updates
for the same observation in a single batch request.

Multi-Backend Abstraction

StorageServiceFactory.getInstance(config):
    IF config.useAzureBlob OR env.LANGFUSE_USE_AZURE_BLOB == "true":
        RETURN new AzureBlobStorageService(config)
    ELSE IF config.useGoogleCloudStorage OR env.LANGFUSE_USE_GOOGLE_CLOUD_STORAGE == "true":
        RETURN new GoogleCloudStorageService(config)
    ELSE:
        RETURN new S3StorageService(config)

Multipart Upload

For large payloads, the S3 implementation uses the AWS SDK's managed Upload utility which automatically splits data into parts and uploads them concurrently:

Upload configuration:
  partSize  = Configurable (default: 5 MB, supports up to ~50 GB files)
  queueSize = Configurable (controls concurrent part upload count)

The default 5 MB part size supports files up to approximately 50 GB
(5 MB x 10,000 maximum parts per S3 multipart upload).

Error Handling

IF any S3 upload in the batch fails:
    IF error is S3 SlowDown (HTTP 503):
        Mark the project for secondary queue routing.
        Log warning.
    Log error.
    Set s3UploadErrored = true.

IF s3UploadErrored:
    Throw error to prevent queue dispatch.
    API returns 500 to the client, which should retry.

The S3 upload step is blocking but non-failing at the individual event level -- Promise.allSettled is used so that a single event's upload failure does not prevent other events in the batch from being uploaded. However, if any upload fails, the entire batch is treated as failed and no queue jobs are dispatched.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment