Implementation:ArroyoSystems Arroyo Two Phase Committer
Two Phase Committer Implementation
Implementation of the TwoPhaseCommitter trait and CommittingState.
The TwoPhaseCommitter trait defines the per-operator interface for two-phase commit, while CommittingState aggregates commit data across all subtasks on the controller side.
Code References
- File:
crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs(L40-L268) - File:
crates/arroyo-state/src/committing_state.rs(L5-L68)
Core Interfaces
TwoPhaseCommitter Trait
// TwoPhaseCommitter wraps an ArrowOperator and adds pre_commit/commit lifecycle
pub trait TwoPhaseCommitter {
fn pre_commit(&mut self, epoch: u32) -> Vec<u8>; // Stage pending writes, return commit data
fn commit(&mut self, epoch: u32, commit_data: HashMap<String, HashMap<u32, Vec<u8>>>); // Finalize
}
CommittingState Struct
pub struct CommittingState {
pub committing_data: HashMap<String, HashMap<u32, Vec<u8>>>,
}
Detailed Behavior
TwoPhaseCommitter::pre_commit
Called during the checkpoint phase (Phase 1). Each sink subtask stages its pending output and returns serialized commit metadata.
| Parameter | Type | Description |
|---|---|---|
epoch |
u32 |
The checkpoint epoch number |
| Return | Vec<u8> |
Serialized commit metadata describing staged writes |
Behavior:
- Flushes any buffered output records to a temporary location (e.g., temporary files in object storage, uncommitted Kafka transactions).
- Produces a serialized description of the staged output (e.g., temporary file paths, partition offsets, transaction IDs).
- The returned
Vec<u8>is included in the checkpoint state, ensuring it is durable.
TwoPhaseCommitter::commit
Called after the global checkpoint succeeds (Phase 2). The sink finalizes all staged writes.
| Parameter | Type | Description |
|---|---|---|
epoch |
u32 |
The checkpoint epoch being committed |
commit_data |
HashMap<String, HashMap<u32, Vec<u8>>> |
Aggregated commit metadata from all operators and subtasks |
Behavior:
- Deserializes the commit metadata for its own operator and subtask.
- Finalizes the staged writes (e.g., renames temporary files to final locations, commits Kafka transactions).
- This operation must be idempotent -- safe to retry in case of failure during commit.
CommittingState
The controller-side structure that aggregates commit data from all subtasks.
| Field | Type | Description |
|---|---|---|
committing_data |
HashMap<String, HashMap<u32, Vec<u8>>> |
Outer key: operator ID. Inner key: subtask index. Value: serialized commit metadata. |
Lifecycle:
- During checkpoint, each subtask's
pre_commit()output is collected and stored inCommittingState. - After global checkpoint completion, the controller passes the entire
committing_datamap to thecommit()method. - Each sink subtask extracts its own commit metadata from the map and finalizes its staged output.
Interaction with Checkpoint Lifecycle
| Phase | Component | Action |
|---|---|---|
| Barrier arrives | Sink operator | Calls pre_commit(epoch), producing commit metadata
|
| Checkpoint in progress | CommittingState |
Aggregates commit metadata from all subtasks |
| Global checkpoint complete | Controller | Sends commit signal with aggregated committing_data
|
| Commit phase | Sink operator | Calls commit(epoch, commit_data), finalizing output
|
I/O
pre_commit: Produces serialized commit metadata (Vec<u8>) describing staged writescommit: Finalizes pending writes using the aggregated commit metadataCommittingState: Aggregates commit data from all subtasks into a unified map keyed by operator ID and subtask index
Implements
Principle:ArroyoSystems_Arroyo_Two_Phase_Commit Environment:ArroyoSystems_Arroyo_Object_Storage