Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi BucketAssignFunction ProcessElement

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for assigning records to file group buckets using index lookups and small file optimization provided by Apache Hudi.

Description

BucketAssignFunction.processElement() is a Flink KeyedProcessFunction that receives keyed HoodieFlinkInternalRow records and assigns each record to a specific file group (bucket). It uses an IndexBackend for record key lookups and a BucketAssigner for insert routing with small file packing.

The function performs the following logic for each record:

  1. Index record handling: If the record is an index update record, update the index backend and return without emitting.
  2. Location lookup: For changing records (UPSERT/DELETE), look up the record key in the index backend.
  3. Update routing: If the record exists, route it to the existing file group with bucket type "U" (UPDATE). If the partition has changed and global index is enabled, emit a DELETE for the old partition first.
  4. Insert routing: If the record does not exist (or is a pure INSERT), use the BucketAssigner to find a small file or create a new file group.
  5. Index refresh: Update the index backend when the record's location changes.
  6. Record tagging: Set the fileId and instantTime ("I" or "U") on the record, then emit it.

The companion MinibatchBucketAssignFunction buffers records and performs batch index lookups before delegating to the same processRecord() logic, improving throughput for record-level index (RLI) workloads.

The BucketAssigner class manages:

  • A bucket info map tracking known buckets within the current checkpoint.
  • A small file assign map that packs inserts into files below the target size.
  • A new file assign state that tracks capacity of recently created file groups.
  • Task affinity: uses KeyGroupRangeAssignment to ensure generated file IDs hash to the current task index.

Usage

This operator is used in the UPSERT pipeline path (non-append mode). It is placed after the source/bootstrap operator and before the stream write operator. Records are keyed by record key before entering this function.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
  • Lines: 71-241

Also:

  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
  • Lines: 52-340
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
  • Lines: 50-176

Signature

// BucketAssignFunction
@Override
public void processElement(
    HoodieFlinkInternalRow value,
    Context ctx,
    Collector<HoodieFlinkInternalRow> out) throws Exception

// Internal processing method (also used by MinibatchBucketAssignFunction)
protected void processRecord(
    HoodieFlinkInternalRow record,
    String recordKey,
    Collector<HoodieFlinkInternalRow> out) throws Exception

// MinibatchBucketAssignFunction
@Override
public void processElement(
    HoodieFlinkInternalRow record,
    Context ctx,
    Collector<HoodieFlinkInternalRow> outCollector) throws Exception

Import

import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssigner;
import org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction;

I/O Contract

Inputs

Name Type Required Description
value HoodieFlinkInternalRow Yes Record containing recordKey, partitionPath, and RowData payload; keyed by record key
ctx KeyedProcessFunction.Context Yes Flink context providing the current key (record key string)
conf Configuration Yes (constructor) Flink Configuration with FlinkOptions (OPERATION, INDEX_GLOBAL_ENABLED, CHANGELOG_ENABLED, TABLE_TYPE, INDEX_TYPE, etc.)

Outputs

Name Type Description
out (collected) HoodieFlinkInternalRow The input record with fileId and instantTime ("I" for INSERT bucket, "U" for UPDATE bucket) set; shuffled downstream by fileId
(side effect - global index) HoodieFlinkInternalRow A DELETE record emitted for the old partition path when a record's partition changes

Usage Examples

// The BucketAssignFunction is typically not called directly;
// it is wired into the pipeline by Pipelines.hoodieStreamWrite().

// Manual pipeline construction for illustration:
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, "/tmp/hudi_table");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.set(FlinkOptions.OPERATION, "upsert");
conf.set(FlinkOptions.INDEX_TYPE, "FLINK_STATE");

DataStream<HoodieFlinkInternalRow> assigned = inputStream
    .keyBy(HoodieFlinkInternalRow::getRecordKey)
    .process(new BucketAssignFunction(conf))
    .setParallelism(conf.get(FlinkOptions.BUCKET_ASSIGN_TASKS));

// The output stream is then keyed by fileId for the stream write operator:
DataStream<RowData> written = assigned
    .keyBy(HoodieFlinkInternalRow::getFileId)
    .transform("stream_write", TypeInformation.of(RowData.class),
        StreamWriteOperator.getFactory(conf, rowType))
    .setParallelism(conf.get(FlinkOptions.WRITE_TASKS));

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment