Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Hudi Streaming Write Execution

From Leeroopedia


Knowledge Sources
Domains Data_Lake, Stream_Processing
Last Updated 2026-02-08 00:00 GMT

Overview

Streaming Write Execution is the principle of buffering incoming records in memory by bucket, flushing them to the storage layer when size or checkpoint thresholds are reached, and coordinating commits across parallel write tasks to achieve exactly-once semantics.

Description

After records have been assigned to file group buckets, the streaming write execution phase handles the actual data persistence. This phase encompasses two cooperating components:

  • Stream Write Function: A Flink operator running in each parallel task that buffers records in a binary in-memory buffer (organized by bucket ID), flushes them to Hudi's write client when thresholds are met, and reports write metadata events to the coordinator.
  • Stream Write Operator Coordinator: A singleton coordinator (one per job) that manages the Hudi timeline, starts new instants, collects write metadata events from all tasks, and commits instants when a checkpoint completes.

The execution follows an exactly-once protocol tied to Flink's checkpoint mechanism:

  1. Checkpoint trigger: The coordinator starts a new Hudi instant on the timeline when a checkpoint begins.
  2. Data flushing: Each write task flushes all buffered data during the checkpoint's snapshotState() call, producing WriteStatus results.
  3. Event reporting: Each task sends a WriteMetadataEvent containing its write statuses to the coordinator.
  4. Commit: When the coordinator receives notifyCheckpointComplete(), it commits the instant with all collected write statuses.

The buffering strategy uses a binary memory pool (based on Flink's MemorySegmentPool) to minimize GC pressure. Records are organized into per-bucket buffers (RowDataBucket), and flushes are triggered by:

  • Batch size threshold: When a single bucket's buffer exceeds WRITE_BATCH_SIZE.
  • Memory pressure: When the total buffer size across all buckets exceeds WRITE_TASK_MAX_SIZE, the largest bucket is flushed first.
  • Checkpoint barrier: All remaining data is flushed during snapshotState().

The coordinator also handles:

  • Table service scheduling: After each commit, the coordinator schedules compaction (for MOR tables), clustering, or metadata table compaction as configured.
  • Hive sync: Optionally synchronizes committed data to the Hive metastore.
  • Failure recovery: On job restart, the coordinator attempts to recommit any inflight instants from the previous run.

Usage

Use this principle for all Hudi streaming write pipelines (both append and upsert modes). It is the core execution mechanism that guarantees data durability and exactly-once semantics. Key scenarios include:

  • Continuous Kafka ingestion: Records flow continuously; commits happen at each checkpoint boundary.
  • Batch execution: In batch mode, the endInput() signal triggers a final flush and commit.
  • Multi-writer environments: The coordinator manages client heartbeats and optimistic concurrency control.

Theoretical Basis

The streaming write execution follows a two-phase commit protocol aligned with Flink checkpoints:

COORDINATOR (singleton):
  ON start():
    Initialize table if not exists
    Create HoodieFlinkWriteClient
    Initialize metadata table
    Start executor threads

  ON checkpoint request from task:
    Wait for previous instants to commit
    startInstant() -> create REQUESTED instant on timeline
    Transition to INFLIGHT
    Return instant time to task

  ON notifyCheckpointComplete(checkpointId):
    Collect all WriteMetadataEvents for this checkpoint
    IF any data was written:
      writeClient.commit(instant, allWriteStatuses)
      Schedule compaction/clustering if needed
      Sync Hive if enabled
    Reset event buffer

WRITE TASK (parallel):
  ON processElement(record):
    bucketID = partitionPath + "_" + fileId
    Buffer record into RowDataBucket[bucketID]
    IF bucket is full (>= WRITE_BATCH_SIZE):
      Flush bucket -> List<WriteStatus>
      Send WriteMetadataEvent to coordinator
    IF total buffer exceeds WRITE_TASK_MAX_SIZE:
      Flush largest bucket

  ON snapshotState() (checkpoint):
    Request instant time from coordinator (if not yet obtained)
    Flush ALL remaining buckets
    Send final WriteMetadataEvent (lastBatch=true) to coordinator

  ON endInput() (batch mode):
    Flush ALL remaining buckets
    Send WriteMetadataEvent (endInput=true)
    Clean write handles

The blocking semantics between checkpoints ensure that:

  • Each Hudi instant spans exactly one Flink checkpoint interval.
  • No data is buffered across checkpoint boundaries.
  • The coordinator can safely roll back any inflight instant if a checkpoint fails.

The deduplication step (if PRE_COMBINE is enabled) merges records with the same key within a bucket before writing, using the configured BufferedRecordMerger.

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment