Implementation:Spotify Luigi HdfsTarget
Appearance
Domains: Pipeline_Orchestration, Big_Data
Last Updated: 2026-02-10 00:00 GMT
Overview
Concrete tool for declaring data sources and outputs on HDFS provided by Luigi.
Description
The luigi.contrib.hdfs.target module provides two target classes for representing HDFS paths in Luigi pipelines:
HdfsTarget-- A subclass ofFileSystemTargetthat wraps an HDFS path with methods for reading, writing, existence checking, renaming, moving, and copying. It automatically selects an HDFS client viaget_autoconfig_client(), negotiates data formats through the pipe-format system, and supports temporary targets that self-delete when garbage collected.HdfsFlagTarget-- A subclass ofHdfsTargetfor directory-based outputs. It considers the output to exist only when both the directory and a flag file (default_SUCCESS) are present, matching Hadoop MapReduce's completion protocol.
Both classes validate that paths do not contain colons (illegal in HDFS filenames) and provide a consistent interface for the Luigi scheduler to determine task completion.
Usage
Use these classes when:
- Defining the
output()of any Luigi task that writes to HDFS. - Declaring an
ExternalTaskwhose data already exists on HDFS. - You need to read from or write to HDFS paths with proper format handling.
- Your MapReduce job produces a directory and you need flag-file-based existence checking.
- You need temporary HDFS targets that are automatically cleaned up.
Code Reference
Source Location
luigi/contrib/hdfs/target.py, lines 31--222.
Key Signatures
class HdfsTarget(FileSystemTarget):
def __init__(self, path=None, format=None, is_tmp=False, fs=None):
...
def open(self, mode='r'):
"""Open target for reading ('r') or writing ('w')."""
...
def exists(self):
"""Check if the HDFS path exists (inherited from FileSystemTarget)."""
...
def remove(self, skip_trash=False):
"""Delete the HDFS path."""
...
def rename(self, path, raise_if_exists=False):
"""Rename (move) to a new path. Does not change self.path."""
...
def move(self, path, raise_if_exists=False):
"""Alias for rename()."""
...
def move_dir(self, path):
"""Move using rename_dont_move (avoids nested directories)."""
...
def glob_exists(self, expected_files):
"""Check if directory contains exactly expected_files entries."""
...
class HdfsFlagTarget(HdfsTarget):
def __init__(self, path, format=None, client=None, flag='_SUCCESS'):
...
def exists(self):
"""Returns True only if path + flag file exists on HDFS."""
...
Import
from luigi.contrib.hdfs.target import HdfsTarget, HdfsFlagTarget
# or via the hdfs package:
import luigi.contrib.hdfs
luigi.contrib.hdfs.HdfsTarget('/data/output/2026-02-10')
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| path | str or None | HDFS path for the target. If None, is_tmp must be True and a random temp path is generated.
|
| format | Format or None | Luigi format object for reading/writing. Defaults to default format piped through hdfs_format.Plain.
|
| is_tmp | bool | If True, the target is temporary and will be deleted when the object is garbage collected.
|
| fs | HdfsFileSystem or None | HDFS client instance. If None, auto-configured from settings.
|
| flag (HdfsFlagTarget) | str | Name of the flag file to check for existence (default: _SUCCESS).
|
Outputs
| Name | Type | Description |
|---|---|---|
| open('r') | file-like | A readable stream over the HDFS file content, decoded through the configured format. |
| open('w') | file-like | A writable stream that will upload content to the HDFS path upon closing. |
| exists() | bool | True if the HDFS path exists (or path + flag for HdfsFlagTarget).
|
Usage Examples
Example 1: Defining a task output on HDFS
import luigi
import luigi.contrib.hdfs
class MyExternalData(luigi.ExternalTask):
date = luigi.DateParameter()
def output(self):
return luigi.contrib.hdfs.HdfsTarget(
self.date.strftime('/data/raw/%Y-%m-%d.csv')
)
Example 2: Using HdfsFlagTarget for MapReduce output
import luigi.contrib.hdfs
# The path must end with a slash for HdfsFlagTarget
target = luigi.contrib.hdfs.HdfsFlagTarget('/data/output/wordcount/', flag='_SUCCESS')
# exists() returns True only if /data/output/wordcount/_SUCCESS is present
if target.exists():
print("Job completed successfully")
Example 3: Reading and writing HDFS data
import luigi.contrib.hdfs
target = luigi.contrib.hdfs.HdfsTarget('/data/output/results.txt')
# Write data
with target.open('w') as f:
f.write("hello world\n")
# Read data
with target.open('r') as f:
for line in f:
print(line.strip())
Example 4: Temporary HDFS target
import luigi.contrib.hdfs
# Creates a random temp path; auto-deleted when object is garbage collected
tmp_target = luigi.contrib.hdfs.HdfsTarget(is_tmp=True)
with tmp_target.open('w') as f:
f.write("intermediate data\n")
print(tmp_target.path) # e.g., /tmp/myuser/luigitemp-482937152
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment