Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Heuristic:Datajuicer Data juicer Checkpoint Resumption Strategy

From Leeroopedia
Knowledge Sources
Domains Fault_Tolerance, Distributed_Computing
Last Updated 2026-02-14 17:00 GMT

Overview

Checkpoint validation and resumption strategy using strict prefix-matching of operator configurations to determine whether a pipeline can resume from a checkpoint or must restart from scratch.

Description

Data-Juicer's checkpoint system records the complete configuration of each executed operator as a JSON list. When resuming, it compares this recorded list against the current pipeline configuration using strict prefix matching. If the recorded ops form an exact prefix of the current pipeline, execution resumes from the checkpoint point. Any configuration change (even a single parameter) or reduction in operator count invalidates the entire checkpoint, forcing a full re-run. This conservative approach prevents subtle data corruption from partial configuration changes.

Usage

Use this heuristic when designing long-running pipelines that may need interruption and resumption. Understanding the strict matching rules helps avoid accidental checkpoint invalidation. Add new operators at the end of the pipeline to preserve checkpoint validity. Never modify the configuration of already-checkpointed operators. The system supports four checkpoint strategies: `EVERY_OP` (safest), `EVERY_N_OPS` (balanced), `MANUAL` (selective), and `DISABLED`.

The Insight (Rule of Thumb)

  • Prefix Matching Rule: Checkpoint is valid if and only if recorded ops are an exact prefix of current config ops. Full dictionary equality is checked per operator (name + all parameters).
  • Invalidation Triggers:
    • Any parameter change in an already-executed operator → full restart
    • Removing an operator from the pipeline → full restart (fewer ops than recorded)
    • Reordering operators → full restart (prefix match fails)
    • Adding operators at the end → checkpoint valid (new ops appended after prefix)
  • Checkpoint Strategies:
    • `EVERY_OP`: Checkpoint after every operator (safest, most storage)
    • `EVERY_N_OPS`: Checkpoint every N operators (balanced; uses `(op_idx + 1) % N == 0`)
    • `MANUAL`: Checkpoint only after named operators
    • `DISABLED`: No checkpointing
  • Validation Requirements (ALL must be true):
    • Checkpoint dataset directory exists and is a directory
    • Checkpoint op record file exists and is a file
    • Op records pass prefix match with current config
  • Default Strategy: Unknown strategies fall back to `EVERY_OP` (safe default)
  • Per-Partition Checkpoints: In Ray mode, checkpoints are per-partition with naming pattern `checkpoint_op_XXXX_partition_YYYY.parquet`

Reasoning

The strict prefix-matching approach is intentionally conservative. Data processing pipelines have sequential dependencies: each operator receives the output of all previous operators. If operator 3 changes its parameters, the output at operator 3 differs, making all subsequent checkpoints invalid. Allowing partial reuse would risk producing data that is inconsistent with what a fresh run would produce.

The `(op_idx + 1) % N` formula in EVERY_N_OPS adds 1 to make checkpoints at ops 1, 2, 3... (1-indexed), ensuring the first N ops get a checkpoint.

Per-partition checkpointing in Ray mode enables fine-grained fault recovery: if partition 5 fails at operator 3, only that partition restarts from operator 3, while other partitions continue from their last checkpoint.

Code Evidence

Prefix-matching checkpoint validation from `ckpt_utils.py:119-173`:

def check_ops_to_skip(self):
    with open(self.ckpt_op_record, "r") as fin:
        self.op_record = json.load(fin)

    recorded_op_num = len(self.op_record)
    process_op_num = len(self.process_list)

    # Fewer ops invalidates checkpoint
    if process_op_num < recorded_op_num:
        logger.warning(
            f"Current config ops ({process_op_num}) are fewer than "
            f"checkpoint ops ({recorded_op_num}). Cannot reuse checkpoint."
        )
        self.op_record = []
        return False

    # Check prefix match
    for record_op, config_op in zip(self.op_record, prefix_process):
        if record_op != config_op:
            all_the_same = False
            break

    if all_the_same:
        self.process_list = self.process_list[recorded_op_num:]
        return True
    else:
        logger.warning("Processed ops of checkpoint are different from current configs.")
        self.op_record = []
        return False

Five-condition checkpoint availability check from `ckpt_utils.py:96-112`:

def check_ckpt(self):
    if (
        os.path.exists(self.ckpt_ds_dir)
        and os.path.isdir(self.ckpt_ds_dir)
        and os.path.exists(self.ckpt_op_record)
        and os.path.isfile(self.ckpt_op_record)
        and self.check_ops_to_skip()
    ):
        return True

Checkpoint strategy decision from `ckpt_utils.py:273-288`:

def should_checkpoint(self, op_idx: int, op_name: str) -> bool:
    if self.checkpoint_strategy == CheckpointStrategy.EVERY_OP:
        return True
    elif self.checkpoint_strategy == CheckpointStrategy.EVERY_N_OPS:
        return (op_idx + 1) % self.checkpoint_n_ops == 0
    elif self.checkpoint_strategy == CheckpointStrategy.MANUAL:
        return op_name in self.checkpoint_op_names
    elif self.checkpoint_strategy == CheckpointStrategy.DISABLED:
        return False
    else:
        logger.warning(f"Unknown checkpoint strategy: {self.checkpoint_strategy}, defaulting to every_op")
        return True

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment