Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Spotify Luigi HdfsTarget

From Leeroopedia


Template:Knowledge Source

Domains: Pipeline_Orchestration, Big_Data

Last Updated: 2026-02-10 00:00 GMT

Overview

Concrete tool for declaring data sources and outputs on HDFS provided by Luigi.

Description

The luigi.contrib.hdfs.target module provides two target classes for representing HDFS paths in Luigi pipelines:

  • HdfsTarget -- A subclass of FileSystemTarget that wraps an HDFS path with methods for reading, writing, existence checking, renaming, moving, and copying. It automatically selects an HDFS client via get_autoconfig_client(), negotiates data formats through the pipe-format system, and supports temporary targets that self-delete when garbage collected.
  • HdfsFlagTarget -- A subclass of HdfsTarget for directory-based outputs. It considers the output to exist only when both the directory and a flag file (default _SUCCESS) are present, matching Hadoop MapReduce's completion protocol.

Both classes validate that paths do not contain colons (illegal in HDFS filenames) and provide a consistent interface for the Luigi scheduler to determine task completion.

Usage

Use these classes when:

  • Defining the output() of any Luigi task that writes to HDFS.
  • Declaring an ExternalTask whose data already exists on HDFS.
  • You need to read from or write to HDFS paths with proper format handling.
  • Your MapReduce job produces a directory and you need flag-file-based existence checking.
  • You need temporary HDFS targets that are automatically cleaned up.

Code Reference

Source Location

luigi/contrib/hdfs/target.py, lines 31--222.

Key Signatures

class HdfsTarget(FileSystemTarget):
    def __init__(self, path=None, format=None, is_tmp=False, fs=None):
        ...

    def open(self, mode='r'):
        """Open target for reading ('r') or writing ('w')."""
        ...

    def exists(self):
        """Check if the HDFS path exists (inherited from FileSystemTarget)."""
        ...

    def remove(self, skip_trash=False):
        """Delete the HDFS path."""
        ...

    def rename(self, path, raise_if_exists=False):
        """Rename (move) to a new path. Does not change self.path."""
        ...

    def move(self, path, raise_if_exists=False):
        """Alias for rename()."""
        ...

    def move_dir(self, path):
        """Move using rename_dont_move (avoids nested directories)."""
        ...

    def glob_exists(self, expected_files):
        """Check if directory contains exactly expected_files entries."""
        ...


class HdfsFlagTarget(HdfsTarget):
    def __init__(self, path, format=None, client=None, flag='_SUCCESS'):
        ...

    def exists(self):
        """Returns True only if path + flag file exists on HDFS."""
        ...

Import

from luigi.contrib.hdfs.target import HdfsTarget, HdfsFlagTarget
# or via the hdfs package:
import luigi.contrib.hdfs
luigi.contrib.hdfs.HdfsTarget('/data/output/2026-02-10')

I/O Contract

Inputs

Name Type Description
path str or None HDFS path for the target. If None, is_tmp must be True and a random temp path is generated.
format Format or None Luigi format object for reading/writing. Defaults to default format piped through hdfs_format.Plain.
is_tmp bool If True, the target is temporary and will be deleted when the object is garbage collected.
fs HdfsFileSystem or None HDFS client instance. If None, auto-configured from settings.
flag (HdfsFlagTarget) str Name of the flag file to check for existence (default: _SUCCESS).

Outputs

Name Type Description
open('r') file-like A readable stream over the HDFS file content, decoded through the configured format.
open('w') file-like A writable stream that will upload content to the HDFS path upon closing.
exists() bool True if the HDFS path exists (or path + flag for HdfsFlagTarget).

Usage Examples

Example 1: Defining a task output on HDFS

import luigi
import luigi.contrib.hdfs


class MyExternalData(luigi.ExternalTask):
    date = luigi.DateParameter()

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(
            self.date.strftime('/data/raw/%Y-%m-%d.csv')
        )

Example 2: Using HdfsFlagTarget for MapReduce output

import luigi.contrib.hdfs

# The path must end with a slash for HdfsFlagTarget
target = luigi.contrib.hdfs.HdfsFlagTarget('/data/output/wordcount/', flag='_SUCCESS')

# exists() returns True only if /data/output/wordcount/_SUCCESS is present
if target.exists():
    print("Job completed successfully")

Example 3: Reading and writing HDFS data

import luigi.contrib.hdfs

target = luigi.contrib.hdfs.HdfsTarget('/data/output/results.txt')

# Write data
with target.open('w') as f:
    f.write("hello world\n")

# Read data
with target.open('r') as f:
    for line in f:
        print(line.strip())

Example 4: Temporary HDFS target

import luigi.contrib.hdfs

# Creates a random temp path; auto-deleted when object is garbage collected
tmp_target = luigi.contrib.hdfs.HdfsTarget(is_tmp=True)
with tmp_target.open('w') as f:
    f.write("intermediate data\n")
print(tmp_target.path)  # e.g., /tmp/myuser/luigitemp-482937152

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment