Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datajuicer Data juicer JobUtils

From Leeroopedia
Knowledge Sources
Domains Job Management, Monitoring
Last Updated 2026-02-14 16:00 GMT

Overview

Shared utilities for Data-Juicer job operations, providing functions for loading job summaries, event logs, tracking partition status, process management, and calculating overall progress.

Description

The common module in the job utilities package provides the JobUtils class, which serves as the core shared utility for all job management tools. Key capabilities include:

Job State Loading:

  • load_job_summary -- Loads job_summary.json from the work directory.
  • load_dataset_mapping -- Loads metadata/dataset_mapping.json containing partition information.
  • load_event_logs -- Parses events_*.jsonl files (with backward compatibility for events.jsonl) into a list of event dictionaries.

Process and Thread Management:

  • extract_process_thread_ids -- Extracts unique process and thread IDs from event logs.
  • find_processes_by_ids -- Uses psutil to locate running processes by PID, skipping the current process.
  • find_threads_by_ids -- Placeholder for future thread management.

Progress Tracking:

  • get_partition_status -- Reconstructs per-partition status from dataset mapping and event logs, tracking current operations, completed operations, and checkpoints.
  • calculate_overall_progress -- Aggregates partition statuses into overall job progress metrics including completion percentage, sample counts, and estimated time remaining.
  • get_operation_pipeline -- Parses the YAML config to extract the operation pipeline definition.

Module-Level Function:

  • list_running_jobs -- Scans a base directory for job directories, reads their summaries, and checks which processes are still running via psutil.

Usage

Use this module as the foundation for building job monitoring, stopping, and status reporting tools. It provides the common job state inspection logic that all job management tools depend on.

Code Reference

Source Location

Signature

class JobUtils:
    def __init__(self, job_id: str, work_dir: str = None,
                 base_dir: str = None): ...
    def load_job_summary(self) -> Optional[Dict[str, Any]]: ...
    def load_dataset_mapping(self) -> Dict[str, Any]: ...
    def load_event_logs(self) -> List[Dict[str, Any]]: ...
    def extract_process_thread_ids(self) -> Dict[str, Set[int]]: ...
    def find_processes_by_ids(self, process_ids: Set[int]) -> List[psutil.Process]: ...
    def get_partition_status(self) -> Dict[int, Dict[str, Any]]: ...
    def calculate_overall_progress(self) -> Dict[str, Any]: ...
    def get_operation_pipeline(self) -> List[Dict[str, Any]]: ...

def list_running_jobs(base_dir: str = "outputs/partition-checkpoint-eventlog"
                      ) -> List[Dict[str, Any]]: ...

Import

from data_juicer.utils.job.common import JobUtils, list_running_jobs

I/O Contract

Inputs

Name Type Required Description
job_id str Yes The job identifier.
work_dir str No Work directory that already includes the job_id. Preferred over base_dir.
base_dir str No Base directory containing job outputs. Legacy parameter.

Outputs

Name Type Description
job_summary Dict[str, Any] Parsed job summary JSON with status, start_time, and configuration.
partition_status Dict[int, Dict] Per-partition status including current_op, completed_ops, checkpoints, and error messages.
overall_progress Dict[str, Any] Aggregated progress with total/completed/failed partitions, samples, and estimated remaining time.
running_jobs List[Dict] List of job entries with job_id, status, start_time, running process count, and work_dir.

Usage Examples

from data_juicer.utils.job.common import JobUtils, list_running_jobs

# List all jobs and their status
jobs = list_running_jobs("/outputs/partition-checkpoint-eventlog")
for job in jobs:
    print(f"Job {job['job_id']}: {job['status']} "
          f"({job['processes']} running processes)")

# Inspect a specific job
utils = JobUtils(job_id="20250808_230030_501c9d",
                 work_dir="/outputs/partition-checkpoint-eventlog/20250808_230030_501c9d")

# Get overall progress
progress = utils.calculate_overall_progress()
print(f"Progress: {progress['progress_percentage']:.1f}%")
print(f"Completed: {progress['completed_partitions']}/{progress['total_partitions']}")

# Get partition-level status
partitions = utils.get_partition_status()
for pid, status in partitions.items():
    print(f"Partition {pid}: {status['status']}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment