Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi FTPTarget

From Leeroopedia


Knowledge Sources
Domains File_Transfer, Networking
Last Updated 2026-02-10 08:00 GMT

Overview

Luigi contrib module providing FTP and SFTP file transfer integration through the RemoteFileSystem filesystem, RemoteTarget target, and AtomicFtpFile atomic write support.

Description

The ftp module is a wrapper around Python's ftplib and the pysftp library, providing Luigi's FileSystem and FileSystemTarget abstractions for remote file servers accessed via FTP, FTPS (FTP over TLS), or SFTP protocols.

Core Classes:

  • RemoteFileSystem (extends FileSystem): Manages connections and filesystem operations against a remote FTP/SFTP server. It supports:
    • FTP (plain): Uses Python's ftplib.FTP.
    • FTPS (FTP with TLS): Uses ftplib.FTP_TLS with prot_p() for data channel encryption.
    • SFTP: Uses pysftp.Connection (requires separate pysftp installation).
    • Operations: exists (with optional mtime check), remove (recursive), put (with atomic rename), get (download with atomic rename), and listdir.
    • Default ports: 21 for FTP/FTPS, 22 for SFTP.
  • RemoteTarget (extends FileSystemTarget): Represents a file on a remote FTP/SFTP server as a Luigi target. In read mode, it downloads the file to a local temporary location and returns a buffered reader wrapped through Luigi's format system. In write mode, it uses AtomicFtpFile for atomic uploads. The exists() method supports an optional mtime parameter for freshness checks.
  • AtomicFtpFile (extends AtomicLocalFile): Writes to a local temp file and uploads to the remote server via RemoteFileSystem.put() when closed. This ensures atomic writes -- the remote file is only created when the write completes successfully.

Atomic Upload Behavior: Both RemoteFileSystem.put() and _sftp_put()/_ftp_put() support atomic uploads by writing to a temporary file (prefixed with luigi-tmp-) on the remote server and then renaming it to the final path.

Usage

Use this module when your Luigi pipeline needs to read from or write to remote FTP/SFTP servers. Common use cases include fetching data from legacy systems that expose files via FTP, pushing processed results to file servers, or integrating with partners that use FTP-based data exchange.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/ftp.py
  • Lines: 1-444

Signature

class RemoteFileSystem(luigi.target.FileSystem):
    def __init__(self, host, username=None, password=None, port=None,
                 tls=False, timeout=60, sftp=False, pysftp_conn_kwargs=None):
        ...

class AtomicFtpFile(luigi.target.AtomicLocalFile):
    def __init__(self, fs, path):
        ...

class RemoteTarget(luigi.target.FileSystemTarget):
    def __init__(self, path, host, format=None, username=None,
                 password=None, port=None, mtime=None, tls=False,
                 timeout=60, sftp=False, pysftp_conn_kwargs=None):
        ...

Import

from luigi.contrib.ftp import RemoteFileSystem, RemoteTarget

I/O Contract

Inputs

Name Type Required Description
host str Yes Hostname or IP address of the remote FTP/SFTP server
path str Yes (Target) Path to the file on the remote server
username str No Username for FTP/SFTP authentication
password str No Password for FTP/SFTP authentication
port int No Server port number; defaults to 21 (FTP) or 22 (SFTP)
tls bool No Enable FTP over TLS (FTPS); defaults to False
timeout int No Connection timeout in seconds; defaults to 60
sftp bool No Use SFTP (via pysftp) instead of FTP; defaults to False
pysftp_conn_kwargs dict No Additional keyword arguments passed to pysftp.Connection
format luigi.format.Format No Luigi format for encoding/decoding; defaults to get_default_format()
mtime datetime No If set, exists() returns False if the remote file is older than this time

Outputs

Name Type Description
file-like (read) BufferedReader Returned when opening target in read mode ('r'); file is downloaded to local temp then read
AtomicFtpFile (write) file-like Returned when opening target in write mode ('w'); writes locally then uploads atomically
bool (exists) bool exists() returns True if the file exists on the remote server (optionally checking mtime)

Usage Examples

Basic Usage

import luigi
from luigi.contrib.ftp import RemoteTarget

class FetchFromFTP(luigi.Task):
    def output(self):
        return luigi.LocalTarget('/tmp/downloaded_data.csv')

    def run(self):
        remote = RemoteTarget(
            path='/data/exports/daily.csv',
            host='ftp.example.com',
            username='user',
            password='pass'
        )
        remote.get(self.output().path)

Writing to an FTP Server

class UploadToFTP(luigi.Task):
    def requires(self):
        return ProcessDataTask()

    def output(self):
        return RemoteTarget(
            path='/uploads/processed_data.csv',
            host='ftp.example.com',
            username='user',
            password='pass'
        )

    def run(self):
        with self.output().open('w') as f:
            f.write('processed,data,here\n')

Using SFTP with Key-Based Authentication

from luigi.contrib.ftp import RemoteFileSystem, RemoteTarget

# Using pysftp connection kwargs for private key authentication
target = RemoteTarget(
    path='/secure/data/output.csv',
    host='sftp.example.com',
    username='deploy',
    sftp=True,
    pysftp_conn_kwargs={
        'private_key': '/home/user/.ssh/id_rsa'
    }
)

Checking File Freshness with mtime

import datetime

target = RemoteTarget(
    path='/data/daily_feed.csv',
    host='ftp.example.com',
    username='user',
    password='pass',
    mtime=datetime.datetime(2024, 1, 1, 0, 0, 0)
)

# Returns True only if file exists AND was modified after 2024-01-01
if target.exists():
    print("Fresh file available")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment