Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Heuristic:Spotify Luigi Atomic File Writes

From Leeroopedia
Revision as of 10:55, 16 February 2026 by Admin (talk | contribs) (Auto-imported from heuristics/Spotify_Luigi_Atomic_File_Writes.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)



Knowledge Sources
Domains Data_Integrity, Pipeline_Framework
Last Updated 2026-02-10 07:00 GMT

Overview

Data integrity pattern using temporary files and atomic rename to prevent partial/corrupt output files from failed tasks.

Description

Luigi's atomic file write pattern ensures that task outputs are never left in a partially-written state. Instead of writing directly to the target path, data is first written to a temporary file (suffixed with `-luigi-tmp-XXXXXXXXX`). Only when the write completes successfully (no exception) is the temporary file atomically moved to its final destination using `os.replace()`. If an exception occurs during writing, the temporary file is left behind (or cleaned up) and the target path remains untouched.

This pattern is fundamental to Luigi's idempotency guarantees: a task's `complete()` method checks whether the output file exists, so a partially-written file would incorrectly signal completion.

Usage

Use this pattern always when writing task output files. Luigi provides three mechanisms:

  • `self.output().open('w')` on a `LocalTarget` (uses `atomic_file` automatically)
  • `with self.output().temporary_path() as tmp_path:` for non-stream writes
  • Direct use of `atomic_file(path)` context manager

The Insight (Rule of Thumb)

  • Action: Never write directly to the output path. Always use `LocalTarget.open('w')` or `temporary_path()` context manager.
  • Value: Guarantees output file integrity; partial writes are invisible to downstream tasks.
  • Trade-off: Requires roughly 2x disk space temporarily (original + temp file). The temp file uses a random 10-digit suffix to avoid collisions.

Reasoning

Luigi determines task completion by checking if the output target exists (via `Target.exists()`). If a task writes directly to its output path and fails mid-write, the partially-written file would cause `exists()` to return `True`, making the task appear complete. All downstream tasks would then consume corrupt data. The atomic write pattern prevents this by ensuring the output file only appears at its final path after a successful write.

The `__exit__` method in `AtomicLocalFile` explicitly does not move the file if an exception occurred:

def __exit__(self, exc_type, exc, traceback):
    " Close/commit the file if there are no exception "
    if exc_type:
        return
    return super(AtomicLocalFile, self).__exit__(exc_type, exc, traceback)

Code Evidence

atomic_file implementation from `luigi/local_target.py:34-43`:

class atomic_file(AtomicLocalFile):
    """Simple class that writes to a temp file and moves it on close()
    Also cleans up the temp file if close is not invoked
    """

    def move_to_final_destination(self):
        os.replace(self.tmp_path, self.path)

    def generate_tmp_path(self, path):
        return path + '-luigi-tmp-%09d' % random.randrange(0, 10_000_000_000)

Base class from `luigi/target.py:315-351`:

class AtomicLocalFile(io.BufferedWriter):
    """Abstract class to create a Target that creates
    a temporary file in the local filesystem before
    moving it to its final destination.
    """

    def __init__(self, path):
        self.__tmp_path = self.generate_tmp_path(path)
        self.path = path
        super(AtomicLocalFile, self).__init__(io.FileIO(self.__tmp_path, 'w'))

    def close(self):
        super(AtomicLocalFile, self).close()
        self.move_to_final_destination()

    def __exit__(self, exc_type, exc, traceback):
        " Close/commit the file if there are no exception "
        if exc_type:
            return
        return super(AtomicLocalFile, self).__exit__(exc_type, exc, traceback)

temporary_path context manager from `luigi/target.py:264-303`:

@contextmanager
def temporary_path(self):
    num = random.randrange(0, 10_000_000_000)
    slashless_path = self.path.rstrip('/').rstrip("\\")
    _temp_path = '{}-luigi-tmp-{:010}{}'.format(
        slashless_path, num, self._trailing_slash())
    tmp_dir = os.path.dirname(slashless_path)
    if tmp_dir:
        self.fs.mkdir(tmp_dir, parents=True, raise_if_exists=False)
    yield _temp_path
    # We won't reach here if there was a user exception.
    self.fs.rename_dont_move(_temp_path, self.path)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment