Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Two Phase Committer

From Leeroopedia


Template:Implementation

Two Phase Committer Implementation

Implementation of the TwoPhaseCommitter trait and CommittingState.

The TwoPhaseCommitter trait defines the per-operator interface for two-phase commit, while CommittingState aggregates commit data across all subtasks on the controller side.

Code References

  • File: crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs (L40-L268)
  • File: crates/arroyo-state/src/committing_state.rs (L5-L68)

Core Interfaces

TwoPhaseCommitter Trait

// TwoPhaseCommitter wraps an ArrowOperator and adds pre_commit/commit lifecycle
pub trait TwoPhaseCommitter {
    fn pre_commit(&mut self, epoch: u32) -> Vec<u8>;  // Stage pending writes, return commit data
    fn commit(&mut self, epoch: u32, commit_data: HashMap<String, HashMap<u32, Vec<u8>>>);  // Finalize
}

CommittingState Struct

pub struct CommittingState {
    pub committing_data: HashMap<String, HashMap<u32, Vec<u8>>>,
}

Detailed Behavior

TwoPhaseCommitter::pre_commit

Called during the checkpoint phase (Phase 1). Each sink subtask stages its pending output and returns serialized commit metadata.

Parameter Type Description
epoch u32 The checkpoint epoch number
Return Vec<u8> Serialized commit metadata describing staged writes

Behavior:

  • Flushes any buffered output records to a temporary location (e.g., temporary files in object storage, uncommitted Kafka transactions).
  • Produces a serialized description of the staged output (e.g., temporary file paths, partition offsets, transaction IDs).
  • The returned Vec<u8> is included in the checkpoint state, ensuring it is durable.

TwoPhaseCommitter::commit

Called after the global checkpoint succeeds (Phase 2). The sink finalizes all staged writes.

Parameter Type Description
epoch u32 The checkpoint epoch being committed
commit_data HashMap<String, HashMap<u32, Vec<u8>>> Aggregated commit metadata from all operators and subtasks

Behavior:

  • Deserializes the commit metadata for its own operator and subtask.
  • Finalizes the staged writes (e.g., renames temporary files to final locations, commits Kafka transactions).
  • This operation must be idempotent -- safe to retry in case of failure during commit.

CommittingState

The controller-side structure that aggregates commit data from all subtasks.

Field Type Description
committing_data HashMap<String, HashMap<u32, Vec<u8>>> Outer key: operator ID. Inner key: subtask index. Value: serialized commit metadata.

Lifecycle:

  1. During checkpoint, each subtask's pre_commit() output is collected and stored in CommittingState.
  2. After global checkpoint completion, the controller passes the entire committing_data map to the commit() method.
  3. Each sink subtask extracts its own commit metadata from the map and finalizes its staged output.

Interaction with Checkpoint Lifecycle

Phase Component Action
Barrier arrives Sink operator Calls pre_commit(epoch), producing commit metadata
Checkpoint in progress CommittingState Aggregates commit metadata from all subtasks
Global checkpoint complete Controller Sends commit signal with aggregated committing_data
Commit phase Sink operator Calls commit(epoch, commit_data), finalizing output

I/O

  • pre_commit: Produces serialized commit metadata (Vec<u8>) describing staged writes
  • commit: Finalizes pending writes using the aggregated commit metadata
  • CommittingState: Aggregates commit data from all subtasks into a unified map keyed by operator ID and subtask index

Implements

Principle:ArroyoSystems_Arroyo_Two_Phase_Commit Environment:ArroyoSystems_Arroyo_Object_Storage

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment