Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Datajuicer Data juicer EventLoggingMixin CheckpointManager

From Leeroopedia
Knowledge Sources
Domains Distributed_Computing, Fault_Tolerance, Observability
Last Updated 2026-02-14 17:00 GMT

Overview

Concrete tools for structured event logging and checkpoint-based resumable processing provided by the Data-Juicer framework.

Description

EventLoggingMixin is a mixin class that executors inherit to gain structured event logging. It defines an EventType enum (JOB_START, JOB_COMPLETE, OP_START, OP_COMPLETE, CHECKPOINT_SAVE, DAG_* events, etc.) and an Event dataclass with timestamps, durations, and metadata. Events are written to JSON log files.

CheckpointManager saves dataset state to disk after each operator and tracks which operators have been completed. On restart, it compares the current process list with the recorded list to determine which operators can be skipped.

Usage

EventLoggingMixin is used by inheriting it in executor classes. CheckpointManager is instantiated by the executor when a work directory is available. Both are automatically integrated into the processing loop.

Code Reference

Source Location

  • Repository: data-juicer
  • File: data_juicer/core/executor/event_logging_mixin.py (EventLoggingMixin), data_juicer/utils/ckpt_utils.py (CheckpointManager)
  • Lines: event_logging_mixin.py:L1-1237, ckpt_utils.py:L59-445

Signature

class EventType(Enum):
    JOB_START = "job_start"
    JOB_COMPLETE = "job_complete"
    JOB_FAILED = "job_failed"
    OP_START = "op_start"
    OP_COMPLETE = "op_complete"
    OP_FAILED = "op_failed"
    CHECKPOINT_SAVE = "checkpoint_save"
    CHECKPOINT_LOAD = "checkpoint_load"
    # ... DAG events, PARTITION events

@dataclass
class Event:
    event_type: EventType
    timestamp: float
    message: str
    event_id: Optional[str] = None
    job_id: Optional[str] = None
    partition_id: Optional[int] = None
    operation_name: Optional[str] = None
    duration: Optional[float] = None
    error_message: Optional[str] = None

class CheckpointManager:
    def __init__(self, ckpt_dir, original_process_list, num_proc=1):
        """
        Args:
            ckpt_dir: Path to save and load checkpoints.
            original_process_list: Process list from config.
            num_proc: Workers for saving dataset.
        """

Import

from data_juicer.core.executor.event_logging_mixin import EventLoggingMixin, EventType, Event
from data_juicer.utils.ckpt_utils import CheckpointManager

I/O Contract

Inputs

Name Type Required Description
ckpt_dir str Yes (CheckpointManager) Directory for checkpoint files
original_process_list list Yes (CheckpointManager) Process list for config change detection
cfg Namespace Yes (EventLoggingMixin) Pipeline config for log directory

Outputs

Name Type Description
event_logs JSON files Structured event log records
checkpoints Files Serialized dataset state for resumable processing
ckpt_op.json JSON file Record of completed operators

Usage Examples

CheckpointManager Usage

from data_juicer.utils.ckpt_utils import CheckpointManager

ckpt_mgr = CheckpointManager(
    ckpt_dir='./work_dir/ckpt',
    original_process_list=cfg.process,
    num_proc=4
)

# Check if checkpoint exists and config matches
if ckpt_mgr.ckpt_available:
    dataset = ckpt_mgr.load_ckpt()
    remaining_ops = ckpt_mgr.get_left_process_list()
else:
    dataset = load_dataset()
    remaining_ops = cfg.process

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment