Implementation:Datajuicer Data juicer EventLoggingMixin CheckpointManager
| Knowledge Sources | |
|---|---|
| Domains | Distributed_Computing, Fault_Tolerance, Observability |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
Concrete tools for structured event logging and checkpoint-based resumable processing provided by the Data-Juicer framework.
Description
EventLoggingMixin is a mixin class that executors inherit to gain structured event logging. It defines an EventType enum (JOB_START, JOB_COMPLETE, OP_START, OP_COMPLETE, CHECKPOINT_SAVE, DAG_* events, etc.) and an Event dataclass with timestamps, durations, and metadata. Events are written to JSON log files.
CheckpointManager saves dataset state to disk after each operator and tracks which operators have been completed. On restart, it compares the current process list with the recorded list to determine which operators can be skipped.
Usage
EventLoggingMixin is used by inheriting it in executor classes. CheckpointManager is instantiated by the executor when a work directory is available. Both are automatically integrated into the processing loop.
Code Reference
Source Location
- Repository: data-juicer
- File: data_juicer/core/executor/event_logging_mixin.py (EventLoggingMixin), data_juicer/utils/ckpt_utils.py (CheckpointManager)
- Lines: event_logging_mixin.py:L1-1237, ckpt_utils.py:L59-445
Signature
class EventType(Enum):
JOB_START = "job_start"
JOB_COMPLETE = "job_complete"
JOB_FAILED = "job_failed"
OP_START = "op_start"
OP_COMPLETE = "op_complete"
OP_FAILED = "op_failed"
CHECKPOINT_SAVE = "checkpoint_save"
CHECKPOINT_LOAD = "checkpoint_load"
# ... DAG events, PARTITION events
@dataclass
class Event:
event_type: EventType
timestamp: float
message: str
event_id: Optional[str] = None
job_id: Optional[str] = None
partition_id: Optional[int] = None
operation_name: Optional[str] = None
duration: Optional[float] = None
error_message: Optional[str] = None
class CheckpointManager:
def __init__(self, ckpt_dir, original_process_list, num_proc=1):
"""
Args:
ckpt_dir: Path to save and load checkpoints.
original_process_list: Process list from config.
num_proc: Workers for saving dataset.
"""
Import
from data_juicer.core.executor.event_logging_mixin import EventLoggingMixin, EventType, Event
from data_juicer.utils.ckpt_utils import CheckpointManager
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ckpt_dir | str | Yes (CheckpointManager) | Directory for checkpoint files |
| original_process_list | list | Yes (CheckpointManager) | Process list for config change detection |
| cfg | Namespace | Yes (EventLoggingMixin) | Pipeline config for log directory |
Outputs
| Name | Type | Description |
|---|---|---|
| event_logs | JSON files | Structured event log records |
| checkpoints | Files | Serialized dataset state for resumable processing |
| ckpt_op.json | JSON file | Record of completed operators |
Usage Examples
CheckpointManager Usage
from data_juicer.utils.ckpt_utils import CheckpointManager
ckpt_mgr = CheckpointManager(
ckpt_dir='./work_dir/ckpt',
original_process_list=cfg.process,
num_proc=4
)
# Check if checkpoint exists and config matches
if ckpt_mgr.ckpt_available:
dataset = ckpt_mgr.load_ckpt()
remaining_ops = ckpt_mgr.get_left_process_list()
else:
dataset = load_dataset()
remaining_ops = cfg.process