Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi Deps Tool

From Leeroopedia


Overview

The deps tool in luigi/tools/deps.py is a CLI utility and Python module for dependency path analysis in Luigi pipelines. It finds all tasks and their outputs on the dependency paths from a given downstream (sink) task up to a specified upstream (source) task family. This is particularly useful for debugging pipeline issues, identifying which upstream tasks need to be re-run after a bug fix, and understanding the full dependency chain between two tasks.

Source Location

Property Value
Source File luigi/tools/deps.py
Lines of Code 133
Module luigi.tools.deps
Domain Debugging, Dependency_Analysis
CLI Entry Point luigi-deps

Import Statement

from luigi.tools.deps import find_deps, get_task_output_description

Classes

upstream (Config)

upstream(luigi.task.Config)

Configuration class that provides the --upstream-family CLI parameter.

Parameter Type Default Description
family OptionalParameter None The task family name of the upstream task to search for. If None, all upstream tasks on all dependency paths are returned.

Functions

get_task_requires

get_task_requires(task)
Parameter Type Description
task luigi.Task The task whose requirements to retrieve.

Returns a set of all tasks returned by task.requires(), flattened from any nested structure using luigi.task.flatten().

dfs_paths

dfs_paths(start_task, goal_task_family, path=None)
Parameter Type Default Description
start_task luigi.Task (required) The task to start the depth-first search from.
goal_task_family str or None (required) The task family name to search for. If None, all tasks on all paths are yielded.
path list None (defaults to [start_task]) Internal accumulator for the current path in the DFS traversal.

A generator that performs depth-first search through the task dependency graph. Yields every task on any dependency path that reaches a task with task_family == goal_task_family. If goal_task_family is None, yields all reachable tasks.

find_deps

find_deps(task, upstream_task_family)
Parameter Type Description
task luigi.Task The downstream (sink) task to start from.
upstream_task_family str or None The upstream task family name. Pass None to find all upstream tasks.

Returns a set of all task instances on all dependency paths between task and any task whose family matches upstream_task_family. Delegates to dfs_paths().

find_deps_cli

find_deps_cli()

CLI wrapper that parses sys.argv[1:] using CmdlineParser, constructs the task object, reads the upstream().family parameter, and calls find_deps(). Returns the set of dependency tasks.

get_task_output_description

get_task_output_description(task_output)
Parameter Type Description
task_output luigi.Target A single task output target.

Returns a human-readable string description of the task output with a type prefix:

Target Type Format Example
RemoteTarget (SSH) [SSH] {host}:{path} [SSH] server.example.com:/data/output.csv
S3Target [S3] {path} [S3] s3://bucket/data/output.csv
FileSystemTarget [FileSystem] {path} [FileSystem] /local/data/output.csv
PostgresTarget [DB] {host}:{table} [DB] db.example.com:analytics_table
Other to be determined to be determined

main

main()

The CLI entry point function. Calls find_deps_cli(), iterates over the result, and prints each task and its output descriptions. Handles dict outputs, iterable outputs, and single outputs.

Output format:

   TASK: TaskName(param=value)
                       : [FileSystem] /path/to/output

CLI Usage

# Find all upstream dependencies of MyTask
luigi-deps --module my_module MyTask --my-param value

# Find dependencies between MyTask and a specific upstream family
luigi-deps --module my_module MyTask --my-param value --upstream-family UpstreamTask

# Using PYTHONPATH for custom modules
PYTHONPATH=$PYTHONPATH:/path/to/tasks luigi-deps \
    --module my.tasks MyDownstreamTask \
    --downstream_task_param1 123456 \
    --upstream-family MyUpstreamTask

Practical Use Case

Suppose you have a pipeline: Daily -> ProcessA -> ProcessB -> Aggregate. You find a bug in Aggregate and fix it. To determine which task outputs need to be deleted for a clean re-run:

luigi-deps --module daily_module Aggregate --daily-param1 xxx --upstream-family Daily

This outputs all tasks between Daily and Aggregate, along with their output paths. You can then delete those outputs and re-trigger the pipeline.

Dependencies

  • Luigi core: luigi.interface, luigi.task.flatten, luigi.parameter, luigi.cmdline_parser.CmdlineParser
  • Luigi contrib: luigi.contrib.ssh.RemoteTarget, luigi.contrib.postgres.PostgresTarget, luigi.contrib.s3.S3Target
  • Luigi targets: luigi.target.FileSystemTarget
  • Python standard library: sys, collections.abc.Iterable

Related Principles

See Also

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment