Heuristic:Datajuicer Data juicer Checkpoint Resumption Strategy
| Knowledge Sources | |
|---|---|
| Domains | Fault_Tolerance, Distributed_Computing |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
Checkpoint validation and resumption strategy using strict prefix-matching of operator configurations to determine whether a pipeline can resume from a checkpoint or must restart from scratch.
Description
Data-Juicer's checkpoint system records the complete configuration of each executed operator as a JSON list. When resuming, it compares this recorded list against the current pipeline configuration using strict prefix matching. If the recorded ops form an exact prefix of the current pipeline, execution resumes from the checkpoint point. Any configuration change (even a single parameter) or reduction in operator count invalidates the entire checkpoint, forcing a full re-run. This conservative approach prevents subtle data corruption from partial configuration changes.
Usage
Use this heuristic when designing long-running pipelines that may need interruption and resumption. Understanding the strict matching rules helps avoid accidental checkpoint invalidation. Add new operators at the end of the pipeline to preserve checkpoint validity. Never modify the configuration of already-checkpointed operators. The system supports four checkpoint strategies: `EVERY_OP` (safest), `EVERY_N_OPS` (balanced), `MANUAL` (selective), and `DISABLED`.
The Insight (Rule of Thumb)
- Prefix Matching Rule: Checkpoint is valid if and only if recorded ops are an exact prefix of current config ops. Full dictionary equality is checked per operator (name + all parameters).
- Invalidation Triggers:
- Any parameter change in an already-executed operator → full restart
- Removing an operator from the pipeline → full restart (fewer ops than recorded)
- Reordering operators → full restart (prefix match fails)
- Adding operators at the end → checkpoint valid (new ops appended after prefix)
- Checkpoint Strategies:
- `EVERY_OP`: Checkpoint after every operator (safest, most storage)
- `EVERY_N_OPS`: Checkpoint every N operators (balanced; uses `(op_idx + 1) % N == 0`)
- `MANUAL`: Checkpoint only after named operators
- `DISABLED`: No checkpointing
- Validation Requirements (ALL must be true):
- Checkpoint dataset directory exists and is a directory
- Checkpoint op record file exists and is a file
- Op records pass prefix match with current config
- Default Strategy: Unknown strategies fall back to `EVERY_OP` (safe default)
- Per-Partition Checkpoints: In Ray mode, checkpoints are per-partition with naming pattern `checkpoint_op_XXXX_partition_YYYY.parquet`
Reasoning
The strict prefix-matching approach is intentionally conservative. Data processing pipelines have sequential dependencies: each operator receives the output of all previous operators. If operator 3 changes its parameters, the output at operator 3 differs, making all subsequent checkpoints invalid. Allowing partial reuse would risk producing data that is inconsistent with what a fresh run would produce.
The `(op_idx + 1) % N` formula in EVERY_N_OPS adds 1 to make checkpoints at ops 1, 2, 3... (1-indexed), ensuring the first N ops get a checkpoint.
Per-partition checkpointing in Ray mode enables fine-grained fault recovery: if partition 5 fails at operator 3, only that partition restarts from operator 3, while other partitions continue from their last checkpoint.
Code Evidence
Prefix-matching checkpoint validation from `ckpt_utils.py:119-173`:
def check_ops_to_skip(self):
with open(self.ckpt_op_record, "r") as fin:
self.op_record = json.load(fin)
recorded_op_num = len(self.op_record)
process_op_num = len(self.process_list)
# Fewer ops invalidates checkpoint
if process_op_num < recorded_op_num:
logger.warning(
f"Current config ops ({process_op_num}) are fewer than "
f"checkpoint ops ({recorded_op_num}). Cannot reuse checkpoint."
)
self.op_record = []
return False
# Check prefix match
for record_op, config_op in zip(self.op_record, prefix_process):
if record_op != config_op:
all_the_same = False
break
if all_the_same:
self.process_list = self.process_list[recorded_op_num:]
return True
else:
logger.warning("Processed ops of checkpoint are different from current configs.")
self.op_record = []
return False
Five-condition checkpoint availability check from `ckpt_utils.py:96-112`:
def check_ckpt(self):
if (
os.path.exists(self.ckpt_ds_dir)
and os.path.isdir(self.ckpt_ds_dir)
and os.path.exists(self.ckpt_op_record)
and os.path.isfile(self.ckpt_op_record)
and self.check_ops_to_skip()
):
return True
Checkpoint strategy decision from `ckpt_utils.py:273-288`:
def should_checkpoint(self, op_idx: int, op_name: str) -> bool:
if self.checkpoint_strategy == CheckpointStrategy.EVERY_OP:
return True
elif self.checkpoint_strategy == CheckpointStrategy.EVERY_N_OPS:
return (op_idx + 1) % self.checkpoint_n_ops == 0
elif self.checkpoint_strategy == CheckpointStrategy.MANUAL:
return op_name in self.checkpoint_op_names
elif self.checkpoint_strategy == CheckpointStrategy.DISABLED:
return False
else:
logger.warning(f"Unknown checkpoint strategy: {self.checkpoint_strategy}, defaulting to every_op")
return True