Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Hudi StreamWriteFunction ProcessElement

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for buffering records by bucket, flushing writes to storage, and coordinating commits across parallel tasks provided by Apache Hudi.

Description

StreamWriteFunction.processElement() is the core Flink sink function that receives bucketed HoodieFlinkInternalRow records, buffers them in per-bucket binary memory buffers, and flushes them to Hudi's storage layer through HoodieFlinkWriteClient.

StreamWriteFunction responsibilities:

  • Record buffering: Each record is buffered into a RowDataBucket identified by partitionPath_fileId. The bucket uses a binary BinaryInMemorySortBuffer backed by Flink's MemorySegmentPool for efficient memory management.
  • Threshold-based flushing: Buckets are flushed when they exceed WRITE_BATCH_SIZE or when total memory exceeds WRITE_TASK_MAX_SIZE.
  • Checkpoint flushing: On snapshotState(), all remaining buckets are flushed via flushRemaining().
  • Write delegation: Depending on the operation type (INSERT, UPSERT, DELETE, INSERT_OVERWRITE), the function delegates to the appropriate HoodieFlinkWriteClient method.
  • Deduplication: If PRE_COMBINE is enabled, records within a bucket are deduplicated using FlinkWriteHelper and BufferedRecordMerger before writing.
  • Index record emission: If streaming index write is enabled, emits index records to a downstream index write operator.
  • Event reporting: After each flush, sends a WriteMetadataEvent to the StreamWriteOperatorCoordinator.

StreamWriteOperatorCoordinator responsibilities:

  • Instant management: Starts new Hudi instants on the timeline, transitions them from REQUESTED to INFLIGHT, and commits them on checkpoint completion.
  • Event collection: Collects WriteMetadataEvents from all write tasks using EventBuffers.
  • Commit execution: On notifyCheckpointComplete(), commits the instant with all collected WriteStatus results via writeClient.commit().
  • Table services: Schedules compaction, clustering, and metadata table compaction after each commit.
  • Failure recovery: On restart, recommits any previously checkpointed but uncommitted instants.

Usage

These components are wired into the pipeline by Pipelines.hoodieStreamWrite() (for upsert mode) or Pipelines.append() (for append mode). They are not directly instantiated by users.

Code Reference

Source Location

  • Repository: Apache Hudi
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
  • Lines: 112-521

Also:

  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
  • Lines: 114-755
  • File: hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
  • Lines: 345-419

Signature

// StreamWriteFunction - record processing
@Override
public void processElement(
    HoodieFlinkInternalRow record,
    Context ctx,
    Collector<RowData> out) throws Exception

// StreamWriteOperatorCoordinator - commit on checkpoint
@Override
public void notifyCheckpointComplete(long checkpointId)

Import

import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.utils.Pipelines;

I/O Contract

Inputs

Name Type Required Description
record HoodieFlinkInternalRow Yes Bucketed record with fileId and instantTime ("I"/"U") set by the bucket assign function
config Configuration Yes (constructor) Flink Configuration with FlinkOptions (PATH, TABLE_TYPE, OPERATION, WRITE_BATCH_SIZE, WRITE_TASK_MAX_SIZE, PRE_COMBINE, etc.)
rowType RowType Yes (constructor) The Flink RowType schema of the data being written
checkpointId long Yes (coordinator) Flink checkpoint ID triggering the commit

Outputs

Name Type Description
writeStatuses List<WriteStatus> Per-bucket write results containing file path, partition path, total records, error records, and file size
(side effect) WriteMetadataEvent Event sent to coordinator containing taskID, checkpointId, instantTime, and writeStatuses
(side effect - coordinator) Committed Hudi instant The coordinator commits the instant on the Hudi timeline with all collected write statuses
out (optional) RowData Index records emitted to downstream index write operator when streaming index write is enabled

Usage Examples

// StreamWriteFunction and StreamWriteOperatorCoordinator are wired by Pipelines:
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, "/tmp/hudi_table");
conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.set(FlinkOptions.OPERATION, "upsert");
conf.set(FlinkOptions.WRITE_BATCH_SIZE, 64.0);     // 64 MB per bucket
conf.set(FlinkOptions.WRITE_TASK_MAX_SIZE, 1024.0); // 1 GB total buffer

// Build the full upsert pipeline
DataStream<HoodieFlinkInternalRow> bootstrapped =
    Pipelines.bootstrap(conf, rowType, sourceStream);
DataStream<RowData> written =
    Pipelines.hoodieStreamWrite(conf, rowType, bootstrapped);

// Internally, hoodieStreamWrite creates:
// 1. BucketAssignFunction (or MinibatchBucketAssignFunction)
// 2. keyBy(fileId)
// 3. StreamWriteOperator (wrapping StreamWriteFunction)
//    with StreamWriteOperatorCoordinator for instant management

// The coordinator commits on each checkpoint:
// coordinator.notifyCheckpointComplete(checkpointId)
//   -> writeClient.commit(instant, allWriteStatuses)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment