Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi LuigiGrep Tool

From Leeroopedia


Overview

The LuigiGrep tool in luigi/tools/luigi_grep.py is a CLI utility and Python class for searching tasks registered with the Luigi scheduler daemon. It queries the scheduler's JSON REST API to find tasks by name prefix or by status, and displays detailed information about each matching task including its dependencies grouped by status. This is a valuable debugging and monitoring tool for identifying and inspecting tasks in a running Luigi scheduler.

Source Location

Property Value
Source File luigi/tools/luigi_grep.py
Lines of Code 83
Module luigi.tools.luigi_grep
Domain Debugging, Search
CLI Entry Point luigi-grep

Import Statement

from luigi.tools.luigi_grep import LuigiGrep

Class: LuigiGrep

LuigiGrep

A client for searching the Luigi scheduler's dependency graph API.

Constructor

LuigiGrep.__init__(self, host, port)
Parameter Type Description
host str Hostname of the Luigi scheduler.
port str or int Port number of the Luigi scheduler.

Properties

Property Return Type Description
graph_url str Returns the full URL to the scheduler's graph API endpoint in the format http://{host}:{port}/api/graph.

Methods

Method Signature Return Type Description
prefix_search prefix_search(self, job_name_prefix) generator of dict Searches for all jobs whose name starts with job_name_prefix. Yields a result dict for each match.
status_search status_search(self, status) generator of dict Searches for all jobs whose status matches the given status string (case-insensitive comparison). Yields a result dict for each match.
_fetch_json _fetch_json(self) dict Internal method that fetches the JSON dependency graph from the scheduler's REST API using urllib.request.urlopen.
_build_results _build_results(self, jobs, job) dict Internal method that constructs a result dictionary for a given job.

Result Dictionary Format

Each result yielded by prefix_search() and status_search() has the following structure:

{
    "name": "TaskName(param=value)",
    "status": "DONE",
    "deps_by_status": {
        "DONE": ["DepTask1(param=x)", "DepTask2(param=y)"],
        "PENDING": ["DepTask3(param=z)"],
        "UNKNOWN": ["MissingTask(param=w)"]
    }
}
Field Type Description
name str The full task name/identifier.
status str The task's current status (e.g., DONE, PENDING, FAILED).
deps_by_status defaultdict(list) The task's dependencies grouped by their status. Dependencies not found in the graph are listed under UNKNOWN.

CLI Usage

# Search for tasks by name prefix
luigi-grep --prefix MyTask

# Search for tasks by status
luigi-grep --status FAILED

# Specify a custom scheduler host and port
luigi-grep --scheduler-host scheduler.example.com --scheduler-port 8082 --prefix DailyJob

# Combine with custom port
luigi-grep --scheduler-port 8083 --status PENDING

CLI Arguments

Argument Default Description
--scheduler-host localhost Hostname of the Luigi scheduler.
--scheduler-port 8082 Port of the Luigi scheduler.
--prefix None Prefix of a task name to search for. Mutually exclusive with --status in practice.
--status None Search for jobs with the given status (e.g., DONE, PENDING, FAILED).

CLI Output Format

MyDailyTask(date=2024-01-15): DONE, Dependencies:
  status=DONE
    ExtractTask(date=2024-01-15)
    TransformTask(date=2024-01-15)
  status=PENDING
    LoadTask(date=2024-01-15)

Programmatic Usage

from luigi.tools.luigi_grep import LuigiGrep

grep = LuigiGrep('scheduler.example.com', 8082)

# Find all tasks starting with "Daily"
for result in grep.prefix_search('Daily'):
    print("Task: %s, Status: %s" % (result['name'], result['status']))
    for status, deps in result['deps_by_status'].items():
        print("  %s dependencies: %s" % (status, deps))

# Find all failed tasks
for result in grep.status_search('FAILED'):
    print("Failed task: %s" % result['name'])

Scheduler API

The tool queries the Luigi scheduler's /api/graph endpoint, which returns a JSON response containing the full dependency graph. The response structure is:

{
    "response": {
        "TaskName(param=value)": {
            "status": "DONE",
            "deps": ["DepTask1(param=x)", "DepTask2(param=y)"]
        },
        ...
    }
}

The LuigiGrep class filters and enriches this data based on the search criteria.

Dependencies

  • Python standard library: argparse, json, collections.defaultdict, urllib.request.urlopen
  • External: Requires a running Luigi scheduler daemon accessible at the specified host and port.

Related Principles

See Also

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment