Principle:ArroyoSystems Arroyo Two Phase Commit
Two Phase Commit
Principle: Exactly-once delivery to external sinks via two-phase commit integrated with checkpointing. During checkpoint, sinks prepare (stage) their pending writes. After global checkpoint completion, the controller sends a commit signal to finalize the staged writes.
Theoretical Basis
Two-phase commit (2PC) is a classical distributed transaction protocol adapted here for streaming output semantics. In the streaming context, 2PC is tightly integrated with the checkpoint protocol to achieve exactly-once output semantics -- guaranteeing that each record is written to the external sink exactly once, even in the presence of failures and retries.
Phase 1: Pre-Commit (During Checkpoint)
During the checkpoint phase, each sink operator stages its pending output:
- Writes are directed to temporary locations (e.g., temporary files, staging areas, uncommitted transactions).
- The sink produces commit metadata -- a serialized description of what was staged (e.g., temporary file paths, transaction IDs).
- This commit metadata is included in the checkpoint state, ensuring it survives failures.
Phase 2: Commit (After Global Checkpoint)
After the controller confirms that the global checkpoint is complete (all operators have successfully checkpointed):
- The controller sends a commit signal to all sink operators.
- Each sink operator finalizes its staged writes using the commit metadata (e.g., renames temporary files to final locations, commits transactions).
- Once committed, the output is visible to downstream consumers.
Failure Semantics
The two-phase design ensures exactly-once output in all failure scenarios:
| Failure Timing | Behavior | Result |
|---|---|---|
| Before checkpoint completes | Staged writes are abandoned; state rolls back to previous checkpoint | No duplicate output |
| After checkpoint, before commit | On recovery, commit metadata is loaded from checkpoint; commit is retried | Exactly-once output (idempotent commit) |
| After commit | Output is already finalized; recovery resumes from committed state | No duplicate output |
Integration with Checkpointing
The 2PC protocol is not a standalone mechanism but is deeply integrated with the checkpoint lifecycle:
- Checkpoint initiation: Barriers propagate through the graph.
- Pre-commit: Sinks stage pending output and produce commit metadata.
- Checkpoint completion: Commit metadata is persisted as part of the checkpoint.
- Commit signal: Controller broadcasts commit after global checkpoint success.
- Finalization: Sinks finalize staged output.
Requirements for Sink Implementations
Sinks that implement 2PC must support:
- Atomic staging: The ability to write output to a temporary location without making it visible.
- Idempotent commit: The commit operation must be safe to retry (in case of failure during commit).
- Rollback capability: Staged but uncommitted output must be discardable on recovery.
Domains
- Stream_Processing: 2PC extends the checkpoint protocol to cover output correctness.
- Fault_Tolerance: Ensures that failure recovery does not produce duplicate output.
- Exactly_Once: 2PC is the mechanism that upgrades at-least-once delivery to exactly-once semantics for external sinks.