Implementation:Spotify Luigi LuigiGrep Tool
Overview
The LuigiGrep tool in luigi/tools/luigi_grep.py is a CLI utility and Python class for searching tasks registered with the Luigi scheduler daemon. It queries the scheduler's JSON REST API to find tasks by name prefix or by status, and displays detailed information about each matching task including its dependencies grouped by status. This is a valuable debugging and monitoring tool for identifying and inspecting tasks in a running Luigi scheduler.
Source Location
| Property | Value |
|---|---|
| Source File | luigi/tools/luigi_grep.py
|
| Lines of Code | 83 |
| Module | luigi.tools.luigi_grep
|
| Domain | Debugging, Search |
| CLI Entry Point | luigi-grep
|
Import Statement
from luigi.tools.luigi_grep import LuigiGrep
Class: LuigiGrep
LuigiGrep
A client for searching the Luigi scheduler's dependency graph API.
Constructor
LuigiGrep.__init__(self, host, port)
| Parameter | Type | Description |
|---|---|---|
host |
str |
Hostname of the Luigi scheduler. |
port |
str or int |
Port number of the Luigi scheduler. |
Properties
| Property | Return Type | Description |
|---|---|---|
graph_url |
str |
Returns the full URL to the scheduler's graph API endpoint in the format http://{host}:{port}/api/graph.
|
Methods
| Method | Signature | Return Type | Description |
|---|---|---|---|
prefix_search |
prefix_search(self, job_name_prefix) |
generator of dict |
Searches for all jobs whose name starts with job_name_prefix. Yields a result dict for each match.
|
status_search |
status_search(self, status) |
generator of dict |
Searches for all jobs whose status matches the given status string (case-insensitive comparison). Yields a result dict for each match.
|
_fetch_json |
_fetch_json(self) |
dict |
Internal method that fetches the JSON dependency graph from the scheduler's REST API using urllib.request.urlopen.
|
_build_results |
_build_results(self, jobs, job) |
dict |
Internal method that constructs a result dictionary for a given job. |
Result Dictionary Format
Each result yielded by prefix_search() and status_search() has the following structure:
{
"name": "TaskName(param=value)",
"status": "DONE",
"deps_by_status": {
"DONE": ["DepTask1(param=x)", "DepTask2(param=y)"],
"PENDING": ["DepTask3(param=z)"],
"UNKNOWN": ["MissingTask(param=w)"]
}
}
| Field | Type | Description |
|---|---|---|
name |
str |
The full task name/identifier. |
status |
str |
The task's current status (e.g., DONE, PENDING, FAILED). |
deps_by_status |
defaultdict(list) |
The task's dependencies grouped by their status. Dependencies not found in the graph are listed under UNKNOWN.
|
CLI Usage
# Search for tasks by name prefix luigi-grep --prefix MyTask # Search for tasks by status luigi-grep --status FAILED # Specify a custom scheduler host and port luigi-grep --scheduler-host scheduler.example.com --scheduler-port 8082 --prefix DailyJob # Combine with custom port luigi-grep --scheduler-port 8083 --status PENDING
CLI Arguments
| Argument | Default | Description |
|---|---|---|
--scheduler-host |
localhost |
Hostname of the Luigi scheduler. |
--scheduler-port |
8082 |
Port of the Luigi scheduler. |
--prefix |
None |
Prefix of a task name to search for. Mutually exclusive with --status in practice.
|
--status |
None |
Search for jobs with the given status (e.g., DONE, PENDING, FAILED). |
CLI Output Format
MyDailyTask(date=2024-01-15): DONE, Dependencies:
status=DONE
ExtractTask(date=2024-01-15)
TransformTask(date=2024-01-15)
status=PENDING
LoadTask(date=2024-01-15)
Programmatic Usage
from luigi.tools.luigi_grep import LuigiGrep
grep = LuigiGrep('scheduler.example.com', 8082)
# Find all tasks starting with "Daily"
for result in grep.prefix_search('Daily'):
print("Task: %s, Status: %s" % (result['name'], result['status']))
for status, deps in result['deps_by_status'].items():
print(" %s dependencies: %s" % (status, deps))
# Find all failed tasks
for result in grep.status_search('FAILED'):
print("Failed task: %s" % result['name'])
Scheduler API
The tool queries the Luigi scheduler's /api/graph endpoint, which returns a JSON response containing the full dependency graph. The response structure is:
{
"response": {
"TaskName(param=value)": {
"status": "DONE",
"deps": ["DepTask1(param=x)", "DepTask2(param=y)"]
},
...
}
}
The LuigiGrep class filters and enriches this data based on the search criteria.
Dependencies
- Python standard library:
argparse,json,collections.defaultdict,urllib.request.urlopen - External: Requires a running Luigi scheduler daemon accessible at the specified host and port.
Related Principles
See Also
- Spotify_Luigi_Deps_Tool - Offline dependency path analysis
- Spotify_Luigi_Deps_Tree_Tool - Visual dependency tree rendering