Implementation:Datajuicer Data juicer JobUtils
| Knowledge Sources | |
|---|---|
| Domains | Job Management, Monitoring |
| Last Updated | 2026-02-14 16:00 GMT |
Overview
Shared utilities for Data-Juicer job operations, providing functions for loading job summaries, event logs, tracking partition status, process management, and calculating overall progress.
Description
The common module in the job utilities package provides the JobUtils class, which serves as the core shared utility for all job management tools. Key capabilities include:
Job State Loading:
load_job_summary-- Loadsjob_summary.jsonfrom the work directory.load_dataset_mapping-- Loadsmetadata/dataset_mapping.jsoncontaining partition information.load_event_logs-- Parsesevents_*.jsonlfiles (with backward compatibility forevents.jsonl) into a list of event dictionaries.
Process and Thread Management:
extract_process_thread_ids-- Extracts unique process and thread IDs from event logs.find_processes_by_ids-- Uses psutil to locate running processes by PID, skipping the current process.find_threads_by_ids-- Placeholder for future thread management.
Progress Tracking:
get_partition_status-- Reconstructs per-partition status from dataset mapping and event logs, tracking current operations, completed operations, and checkpoints.calculate_overall_progress-- Aggregates partition statuses into overall job progress metrics including completion percentage, sample counts, and estimated time remaining.get_operation_pipeline-- Parses the YAML config to extract the operation pipeline definition.
Module-Level Function:
list_running_jobs-- Scans a base directory for job directories, reads their summaries, and checks which processes are still running via psutil.
Usage
Use this module as the foundation for building job monitoring, stopping, and status reporting tools. It provides the common job state inspection logic that all job management tools depend on.
Code Reference
Source Location
- Repository: Datajuicer_Data_juicer
- File:
data_juicer/utils/job/common.py
Signature
class JobUtils:
def __init__(self, job_id: str, work_dir: str = None,
base_dir: str = None): ...
def load_job_summary(self) -> Optional[Dict[str, Any]]: ...
def load_dataset_mapping(self) -> Dict[str, Any]: ...
def load_event_logs(self) -> List[Dict[str, Any]]: ...
def extract_process_thread_ids(self) -> Dict[str, Set[int]]: ...
def find_processes_by_ids(self, process_ids: Set[int]) -> List[psutil.Process]: ...
def get_partition_status(self) -> Dict[int, Dict[str, Any]]: ...
def calculate_overall_progress(self) -> Dict[str, Any]: ...
def get_operation_pipeline(self) -> List[Dict[str, Any]]: ...
def list_running_jobs(base_dir: str = "outputs/partition-checkpoint-eventlog"
) -> List[Dict[str, Any]]: ...
Import
from data_juicer.utils.job.common import JobUtils, list_running_jobs
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| job_id | str | Yes | The job identifier. |
| work_dir | str | No | Work directory that already includes the job_id. Preferred over base_dir. |
| base_dir | str | No | Base directory containing job outputs. Legacy parameter. |
Outputs
| Name | Type | Description |
|---|---|---|
| job_summary | Dict[str, Any] | Parsed job summary JSON with status, start_time, and configuration. |
| partition_status | Dict[int, Dict] | Per-partition status including current_op, completed_ops, checkpoints, and error messages. |
| overall_progress | Dict[str, Any] | Aggregated progress with total/completed/failed partitions, samples, and estimated remaining time. |
| running_jobs | List[Dict] | List of job entries with job_id, status, start_time, running process count, and work_dir. |
Usage Examples
from data_juicer.utils.job.common import JobUtils, list_running_jobs
# List all jobs and their status
jobs = list_running_jobs("/outputs/partition-checkpoint-eventlog")
for job in jobs:
print(f"Job {job['job_id']}: {job['status']} "
f"({job['processes']} running processes)")
# Inspect a specific job
utils = JobUtils(job_id="20250808_230030_501c9d",
work_dir="/outputs/partition-checkpoint-eventlog/20250808_230030_501c9d")
# Get overall progress
progress = utils.calculate_overall_progress()
print(f"Progress: {progress['progress_percentage']:.1f}%")
print(f"Completed: {progress['completed_partitions']}/{progress['total_partitions']}")
# Get partition-level status
partitions = utils.get_partition_status()
for pid, status in partitions.items():
print(f"Partition {pid}: {status['status']}")
Related Pages
- Datajuicer_Data_juicer_JobSnapshot -- Advanced snapshot analysis built on top of JobUtils