Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Hudi Bucket Assignment

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Bucket Assignment is the principle of routing each incoming record to a specific file group (bucket) based on index lookups, small file optimization, and partition-aware load balancing, so that downstream write tasks can perform scalable, parallel writes.

Description

In a Hudi streaming write pipeline, bucket assignment is the critical step between record ingestion and data writing. Each record must be tagged with a file ID and a bucket type (INSERT or UPDATE) that determines which physical file group the record will be written to. This tagging enables:

  • Scalable writes: Records are shuffled by file ID to downstream write tasks, distributing the write load evenly.
  • Update-in-place semantics: Existing records are routed to their current file group, enabling efficient upserts without full-table scans at read time.
  • Small file optimization: New insert records are first packed into existing small files (files below the target size) before creating new file groups, preventing file proliferation.

The assignment process uses an index backend (state-based or record-level) to look up whether a record's key already exists in the table:

  • If the record key is found, the record is tagged as an UPDATE and routed to the file group where it currently resides.
  • If the record key is not found, the record is tagged as an INSERT and assigned to either a small file with remaining capacity or a new file group.

For global index scenarios, if a record's partition path changes, the system emits a DELETE record for the old partition path before creating an INSERT in the new partition. This ensures cross-partition upsert correctness.

The assignment also accounts for task affinity: file IDs are deterministically mapped to task indices using Flink's KeyGroupRangeAssignment, ensuring that each write task only writes to files it owns.

Usage

Use this principle for any Hudi streaming write pipeline that performs UPSERT operations (or INSERT with deduplication). It is not used in pure append mode. Key scenarios include:

  • UPSERT on MERGE_ON_READ tables: Records are looked up in the index and routed to existing file groups or new buckets.
  • UPSERT on COPY_ON_WRITE tables: Same logic, but the downstream writer performs copy-on-write merges.
  • INSERT with global record-level index: Uses a minibatch variant that batches index lookups for performance.

Theoretical Basis

The bucket assignment algorithm processes records one at a time within a checkpoint interval:

STATE: indexBackend (recordKey -> HoodieRecordGlobalLocation)
STATE: bucketAssigner (maintains bucket map, small file map, new file state)

FOR EACH record IN checkpoint:
  IF record is an index update record:
    indexBackend.update(recordKey, location)
    SKIP to next record

  partitionPath = record.getPartitionPath()

  IF operation is UPSERT or DELETE:
    oldLocation = indexBackend.get(recordKey)
    IF oldLocation exists:
      IF oldLocation.partition != partitionPath AND globalIndex:
        EMIT delete record for (oldLocation.partition, oldLocation.fileId)
        location = bucketAssigner.addInsert(partitionPath)  // new location
      ELSE:
        location = (oldLocation.fileId, "U")  // UPDATE bucket
        bucketAssigner.addUpdate(partitionPath, fileId)
    ELSE:
      location = bucketAssigner.addInsert(partitionPath)

    IF location changed:
      indexBackend.update(recordKey, newLocation)
  ELSE:
    location = bucketAssigner.addInsert(partitionPath)

  record.setFileId(location.fileId)
  record.setInstantTime(location.tag)  // "I" for INSERT, "U" for UPDATE
  EMIT record

ON CHECKPOINT:
  bucketAssigner.reset()  // clear per-checkpoint state
  indexBackend.onCheckpoint()

ON CHECKPOINT COMPLETE:
  bucketAssigner.reload()  // refresh write profile and small file list
  indexBackend.onCheckpointComplete()

The addInsert subroutine in the bucket assigner:

FUNCTION addInsert(partitionPath):
  // 1. Try to pack into an existing small file
  smallFile = findSmallFileForTask(partitionPath)
  IF smallFile exists AND smallFile.hasCapacity():
    RETURN (UPDATE, smallFile.fileId)

  // 2. Try to reuse a recently created new file
  IF newFileState exists for partitionPath AND newFileState.hasCapacity():
    RETURN (UPDATE, newFileState.fileId)

  // 3. Create a brand new file group
  newFileId = generateFileIdOwnedByThisTask()
  RETURN (INSERT, newFileId)

The MinibatchBucketAssignFunction variant buffers records and performs batch index lookups to reduce the overhead of individual record-level index (RLI) queries.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment