Implementation:Spotify Luigi Deps Tool
Overview
The deps tool in luigi/tools/deps.py is a CLI utility and Python module for dependency path analysis in Luigi pipelines. It finds all tasks and their outputs on the dependency paths from a given downstream (sink) task up to a specified upstream (source) task family. This is particularly useful for debugging pipeline issues, identifying which upstream tasks need to be re-run after a bug fix, and understanding the full dependency chain between two tasks.
Source Location
| Property | Value |
|---|---|
| Source File | luigi/tools/deps.py
|
| Lines of Code | 133 |
| Module | luigi.tools.deps
|
| Domain | Debugging, Dependency_Analysis |
| CLI Entry Point | luigi-deps
|
Import Statement
from luigi.tools.deps import find_deps, get_task_output_description
Classes
upstream (Config)
upstream(luigi.task.Config)
Configuration class that provides the --upstream-family CLI parameter.
| Parameter | Type | Default | Description |
|---|---|---|---|
family |
OptionalParameter |
None |
The task family name of the upstream task to search for. If None, all upstream tasks on all dependency paths are returned.
|
Functions
get_task_requires
get_task_requires(task)
| Parameter | Type | Description |
|---|---|---|
task |
luigi.Task |
The task whose requirements to retrieve. |
Returns a set of all tasks returned by task.requires(), flattened from any nested structure using luigi.task.flatten().
dfs_paths
dfs_paths(start_task, goal_task_family, path=None)
| Parameter | Type | Default | Description |
|---|---|---|---|
start_task |
luigi.Task |
(required) | The task to start the depth-first search from. |
goal_task_family |
str or None |
(required) | The task family name to search for. If None, all tasks on all paths are yielded.
|
path |
list |
None (defaults to [start_task]) |
Internal accumulator for the current path in the DFS traversal. |
A generator that performs depth-first search through the task dependency graph. Yields every task on any dependency path that reaches a task with task_family == goal_task_family. If goal_task_family is None, yields all reachable tasks.
find_deps
find_deps(task, upstream_task_family)
| Parameter | Type | Description |
|---|---|---|
task |
luigi.Task |
The downstream (sink) task to start from. |
upstream_task_family |
str or None |
The upstream task family name. Pass None to find all upstream tasks.
|
Returns a set of all task instances on all dependency paths between task and any task whose family matches upstream_task_family. Delegates to dfs_paths().
find_deps_cli
find_deps_cli()
CLI wrapper that parses sys.argv[1:] using CmdlineParser, constructs the task object, reads the upstream().family parameter, and calls find_deps(). Returns the set of dependency tasks.
get_task_output_description
get_task_output_description(task_output)
| Parameter | Type | Description |
|---|---|---|
task_output |
luigi.Target |
A single task output target. |
Returns a human-readable string description of the task output with a type prefix:
| Target Type | Format | Example |
|---|---|---|
RemoteTarget (SSH) |
[SSH] {host}:{path} |
[SSH] server.example.com:/data/output.csv
|
S3Target |
[S3] {path} |
[S3] s3://bucket/data/output.csv
|
FileSystemTarget |
[FileSystem] {path} |
[FileSystem] /local/data/output.csv
|
PostgresTarget |
[DB] {host}:{table} |
[DB] db.example.com:analytics_table
|
| Other | to be determined |
to be determined
|
main
main()
The CLI entry point function. Calls find_deps_cli(), iterates over the result, and prints each task and its output descriptions. Handles dict outputs, iterable outputs, and single outputs.
Output format:
TASK: TaskName(param=value)
: [FileSystem] /path/to/output
CLI Usage
# Find all upstream dependencies of MyTask
luigi-deps --module my_module MyTask --my-param value
# Find dependencies between MyTask and a specific upstream family
luigi-deps --module my_module MyTask --my-param value --upstream-family UpstreamTask
# Using PYTHONPATH for custom modules
PYTHONPATH=$PYTHONPATH:/path/to/tasks luigi-deps \
--module my.tasks MyDownstreamTask \
--downstream_task_param1 123456 \
--upstream-family MyUpstreamTask
Practical Use Case
Suppose you have a pipeline: Daily -> ProcessA -> ProcessB -> Aggregate. You find a bug in Aggregate and fix it. To determine which task outputs need to be deleted for a clean re-run:
luigi-deps --module daily_module Aggregate --daily-param1 xxx --upstream-family Daily
This outputs all tasks between Daily and Aggregate, along with their output paths. You can then delete those outputs and re-trigger the pipeline.
Dependencies
- Luigi core:
luigi.interface,luigi.task.flatten,luigi.parameter,luigi.cmdline_parser.CmdlineParser - Luigi contrib:
luigi.contrib.ssh.RemoteTarget,luigi.contrib.postgres.PostgresTarget,luigi.contrib.s3.S3Target - Luigi targets:
luigi.target.FileSystemTarget - Python standard library:
sys,collections.abc.Iterable
Related Principles
See Also
- Spotify_Luigi_Deps_Tree_Tool - Visual tree representation of dependencies
- Spotify_Luigi_LuigiGrep_Tool - Search for tasks via the scheduler API