Implementation:Apache Hudi BucketAssignFunction ProcessElement
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for assigning records to file group buckets using index lookups and small file optimization provided by Apache Hudi.
Description
BucketAssignFunction.processElement() is a Flink KeyedProcessFunction that receives keyed HoodieFlinkInternalRow records and assigns each record to a specific file group (bucket). It uses an IndexBackend for record key lookups and a BucketAssigner for insert routing with small file packing.
The function performs the following logic for each record:
- Index record handling: If the record is an index update record, update the index backend and return without emitting.
- Location lookup: For changing records (UPSERT/DELETE), look up the record key in the index backend.
- Update routing: If the record exists, route it to the existing file group with bucket type "U" (UPDATE). If the partition has changed and global index is enabled, emit a DELETE for the old partition first.
- Insert routing: If the record does not exist (or is a pure INSERT), use the
BucketAssignerto find a small file or create a new file group. - Index refresh: Update the index backend when the record's location changes.
- Record tagging: Set the
fileIdandinstantTime("I" or "U") on the record, then emit it.
The companion MinibatchBucketAssignFunction buffers records and performs batch index lookups before delegating to the same processRecord() logic, improving throughput for record-level index (RLI) workloads.
The BucketAssigner class manages:
- A bucket info map tracking known buckets within the current checkpoint.
- A small file assign map that packs inserts into files below the target size.
- A new file assign state that tracks capacity of recently created file groups.
- Task affinity: uses
KeyGroupRangeAssignmentto ensure generated file IDs hash to the current task index.
Usage
This operator is used in the UPSERT pipeline path (non-append mode). It is placed after the source/bootstrap operator and before the stream write operator. Records are keyed by record key before entering this function.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java - Lines: 71-241
Also:
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java - Lines: 52-340
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java - Lines: 50-176
Signature
// BucketAssignFunction
@Override
public void processElement(
HoodieFlinkInternalRow value,
Context ctx,
Collector<HoodieFlinkInternalRow> out) throws Exception
// Internal processing method (also used by MinibatchBucketAssignFunction)
protected void processRecord(
HoodieFlinkInternalRow record,
String recordKey,
Collector<HoodieFlinkInternalRow> out) throws Exception
// MinibatchBucketAssignFunction
@Override
public void processElement(
HoodieFlinkInternalRow record,
Context ctx,
Collector<HoodieFlinkInternalRow> outCollector) throws Exception
Import
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssigner;
import org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| value | HoodieFlinkInternalRow | Yes | Record containing recordKey, partitionPath, and RowData payload; keyed by record key |
| ctx | KeyedProcessFunction.Context | Yes | Flink context providing the current key (record key string) |
| conf | Configuration | Yes (constructor) | Flink Configuration with FlinkOptions (OPERATION, INDEX_GLOBAL_ENABLED, CHANGELOG_ENABLED, TABLE_TYPE, INDEX_TYPE, etc.) |
Outputs
| Name | Type | Description |
|---|---|---|
| out (collected) | HoodieFlinkInternalRow | The input record with fileId and instantTime ("I" for INSERT bucket, "U" for UPDATE bucket) set; shuffled downstream by fileId |
| (side effect - global index) | HoodieFlinkInternalRow | A DELETE record emitted for the old partition path when a record's partition changes |
Usage Examples
// The BucketAssignFunction is typically not called directly;
// it is wired into the pipeline by Pipelines.hoodieStreamWrite().
// Manual pipeline construction for illustration:
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, "/tmp/hudi_table");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.set(FlinkOptions.OPERATION, "upsert");
conf.set(FlinkOptions.INDEX_TYPE, "FLINK_STATE");
DataStream<HoodieFlinkInternalRow> assigned = inputStream
.keyBy(HoodieFlinkInternalRow::getRecordKey)
.process(new BucketAssignFunction(conf))
.setParallelism(conf.get(FlinkOptions.BUCKET_ASSIGN_TASKS));
// The output stream is then keyed by fileId for the stream write operator:
DataStream<RowData> written = assigned
.keyBy(HoodieFlinkInternalRow::getFileId)
.transform("stream_write", TypeInformation.of(RowData.class),
StreamWriteOperator.getFactory(conf, rowType))
.setParallelism(conf.get(FlinkOptions.WRITE_TASKS));