Implementation:Apache Hudi StreamWriteFunction ProcessElement
| Knowledge Sources | |
|---|---|
| Domains | Data_Lake, Stream_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for buffering records by bucket, flushing writes to storage, and coordinating commits across parallel tasks provided by Apache Hudi.
Description
StreamWriteFunction.processElement() is the core Flink sink function that receives bucketed HoodieFlinkInternalRow records, buffers them in per-bucket binary memory buffers, and flushes them to Hudi's storage layer through HoodieFlinkWriteClient.
StreamWriteFunction responsibilities:
- Record buffering: Each record is buffered into a
RowDataBucketidentified bypartitionPath_fileId. The bucket uses a binaryBinaryInMemorySortBufferbacked by Flink'sMemorySegmentPoolfor efficient memory management. - Threshold-based flushing: Buckets are flushed when they exceed
WRITE_BATCH_SIZEor when total memory exceedsWRITE_TASK_MAX_SIZE. - Checkpoint flushing: On
snapshotState(), all remaining buckets are flushed viaflushRemaining(). - Write delegation: Depending on the operation type (INSERT, UPSERT, DELETE, INSERT_OVERWRITE), the function delegates to the appropriate
HoodieFlinkWriteClientmethod. - Deduplication: If
PRE_COMBINEis enabled, records within a bucket are deduplicated usingFlinkWriteHelperandBufferedRecordMergerbefore writing. - Index record emission: If streaming index write is enabled, emits index records to a downstream index write operator.
- Event reporting: After each flush, sends a
WriteMetadataEventto theStreamWriteOperatorCoordinator.
StreamWriteOperatorCoordinator responsibilities:
- Instant management: Starts new Hudi instants on the timeline, transitions them from REQUESTED to INFLIGHT, and commits them on checkpoint completion.
- Event collection: Collects
WriteMetadataEvents from all write tasks usingEventBuffers. - Commit execution: On
notifyCheckpointComplete(), commits the instant with all collectedWriteStatusresults viawriteClient.commit(). - Table services: Schedules compaction, clustering, and metadata table compaction after each commit.
- Failure recovery: On restart, recommits any previously checkpointed but uncommitted instants.
Usage
These components are wired into the pipeline by Pipelines.hoodieStreamWrite() (for upsert mode) or Pipelines.append() (for append mode). They are not directly instantiated by users.
Code Reference
Source Location
- Repository: Apache Hudi
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java - Lines: 112-521
Also:
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java - Lines: 114-755
- File:
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java - Lines: 345-419
Signature
// StreamWriteFunction - record processing
@Override
public void processElement(
HoodieFlinkInternalRow record,
Context ctx,
Collector<RowData> out) throws Exception
// StreamWriteOperatorCoordinator - commit on checkpoint
@Override
public void notifyCheckpointComplete(long checkpointId)
Import
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.utils.Pipelines;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| record | HoodieFlinkInternalRow | Yes | Bucketed record with fileId and instantTime ("I"/"U") set by the bucket assign function |
| config | Configuration | Yes (constructor) | Flink Configuration with FlinkOptions (PATH, TABLE_TYPE, OPERATION, WRITE_BATCH_SIZE, WRITE_TASK_MAX_SIZE, PRE_COMBINE, etc.) |
| rowType | RowType | Yes (constructor) | The Flink RowType schema of the data being written |
| checkpointId | long | Yes (coordinator) | Flink checkpoint ID triggering the commit |
Outputs
| Name | Type | Description |
|---|---|---|
| writeStatuses | List<WriteStatus> | Per-bucket write results containing file path, partition path, total records, error records, and file size |
| (side effect) | WriteMetadataEvent | Event sent to coordinator containing taskID, checkpointId, instantTime, and writeStatuses |
| (side effect - coordinator) | Committed Hudi instant | The coordinator commits the instant on the Hudi timeline with all collected write statuses |
| out (optional) | RowData | Index records emitted to downstream index write operator when streaming index write is enabled |
Usage Examples
// StreamWriteFunction and StreamWriteOperatorCoordinator are wired by Pipelines:
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, "/tmp/hudi_table");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.set(FlinkOptions.OPERATION, "upsert");
conf.set(FlinkOptions.WRITE_BATCH_SIZE, 64.0); // 64 MB per bucket
conf.set(FlinkOptions.WRITE_TASK_MAX_SIZE, 1024.0); // 1 GB total buffer
// Build the full upsert pipeline
DataStream<HoodieFlinkInternalRow> bootstrapped =
Pipelines.bootstrap(conf, rowType, sourceStream);
DataStream<RowData> written =
Pipelines.hoodieStreamWrite(conf, rowType, bootstrapped);
// Internally, hoodieStreamWrite creates:
// 1. BucketAssignFunction (or MinibatchBucketAssignFunction)
// 2. keyBy(fileId)
// 3. StreamWriteOperator (wrapping StreamWriteFunction)
// with StreamWriteOperatorCoordinator for instant management
// The coordinator commits on each checkpoint:
// coordinator.notifyCheckpointComplete(checkpointId)
// -> writeClient.commit(instant, allWriteStatuses)