Implementation:Spotify Luigi FTPTarget
| Knowledge Sources | |
|---|---|
| Domains | File_Transfer, Networking |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
Luigi contrib module providing FTP and SFTP file transfer integration through the RemoteFileSystem filesystem, RemoteTarget target, and AtomicFtpFile atomic write support.
Description
The ftp module is a wrapper around Python's ftplib and the pysftp library, providing Luigi's FileSystem and FileSystemTarget abstractions for remote file servers accessed via FTP, FTPS (FTP over TLS), or SFTP protocols.
Core Classes:
- RemoteFileSystem (extends
FileSystem): Manages connections and filesystem operations against a remote FTP/SFTP server. It supports:- FTP (plain): Uses Python's
ftplib.FTP. - FTPS (FTP with TLS): Uses
ftplib.FTP_TLSwithprot_p()for data channel encryption. - SFTP: Uses
pysftp.Connection(requires separatepysftpinstallation). - Operations:
exists(with optional mtime check),remove(recursive),put(with atomic rename),get(download with atomic rename), andlistdir. - Default ports: 21 for FTP/FTPS, 22 for SFTP.
- FTP (plain): Uses Python's
- RemoteTarget (extends
FileSystemTarget): Represents a file on a remote FTP/SFTP server as a Luigi target. In read mode, it downloads the file to a local temporary location and returns a buffered reader wrapped through Luigi's format system. In write mode, it usesAtomicFtpFilefor atomic uploads. Theexists()method supports an optionalmtimeparameter for freshness checks.
- AtomicFtpFile (extends
AtomicLocalFile): Writes to a local temp file and uploads to the remote server viaRemoteFileSystem.put()when closed. This ensures atomic writes -- the remote file is only created when the write completes successfully.
Atomic Upload Behavior:
Both RemoteFileSystem.put() and _sftp_put()/_ftp_put() support atomic uploads by writing to a temporary file (prefixed with luigi-tmp-) on the remote server and then renaming it to the final path.
Usage
Use this module when your Luigi pipeline needs to read from or write to remote FTP/SFTP servers. Common use cases include fetching data from legacy systems that expose files via FTP, pushing processed results to file servers, or integrating with partners that use FTP-based data exchange.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File:
luigi/contrib/ftp.py - Lines: 1-444
Signature
class RemoteFileSystem(luigi.target.FileSystem):
def __init__(self, host, username=None, password=None, port=None,
tls=False, timeout=60, sftp=False, pysftp_conn_kwargs=None):
...
class AtomicFtpFile(luigi.target.AtomicLocalFile):
def __init__(self, fs, path):
...
class RemoteTarget(luigi.target.FileSystemTarget):
def __init__(self, path, host, format=None, username=None,
password=None, port=None, mtime=None, tls=False,
timeout=60, sftp=False, pysftp_conn_kwargs=None):
...
Import
from luigi.contrib.ftp import RemoteFileSystem, RemoteTarget
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| host | str | Yes | Hostname or IP address of the remote FTP/SFTP server |
| path | str | Yes (Target) | Path to the file on the remote server |
| username | str | No | Username for FTP/SFTP authentication |
| password | str | No | Password for FTP/SFTP authentication |
| port | int | No | Server port number; defaults to 21 (FTP) or 22 (SFTP) |
| tls | bool | No | Enable FTP over TLS (FTPS); defaults to False |
| timeout | int | No | Connection timeout in seconds; defaults to 60 |
| sftp | bool | No | Use SFTP (via pysftp) instead of FTP; defaults to False |
| pysftp_conn_kwargs | dict | No | Additional keyword arguments passed to pysftp.Connection
|
| format | luigi.format.Format | No | Luigi format for encoding/decoding; defaults to get_default_format()
|
| mtime | datetime | No | If set, exists() returns False if the remote file is older than this time
|
Outputs
| Name | Type | Description |
|---|---|---|
| file-like (read) | BufferedReader | Returned when opening target in read mode ('r'); file is downloaded to local temp then read |
| AtomicFtpFile (write) | file-like | Returned when opening target in write mode ('w'); writes locally then uploads atomically |
| bool (exists) | bool | exists() returns True if the file exists on the remote server (optionally checking mtime)
|
Usage Examples
Basic Usage
import luigi
from luigi.contrib.ftp import RemoteTarget
class FetchFromFTP(luigi.Task):
def output(self):
return luigi.LocalTarget('/tmp/downloaded_data.csv')
def run(self):
remote = RemoteTarget(
path='/data/exports/daily.csv',
host='ftp.example.com',
username='user',
password='pass'
)
remote.get(self.output().path)
Writing to an FTP Server
class UploadToFTP(luigi.Task):
def requires(self):
return ProcessDataTask()
def output(self):
return RemoteTarget(
path='/uploads/processed_data.csv',
host='ftp.example.com',
username='user',
password='pass'
)
def run(self):
with self.output().open('w') as f:
f.write('processed,data,here\n')
Using SFTP with Key-Based Authentication
from luigi.contrib.ftp import RemoteFileSystem, RemoteTarget
# Using pysftp connection kwargs for private key authentication
target = RemoteTarget(
path='/secure/data/output.csv',
host='sftp.example.com',
username='deploy',
sftp=True,
pysftp_conn_kwargs={
'private_key': '/home/user/.ssh/id_rsa'
}
)
Checking File Freshness with mtime
import datetime
target = RemoteTarget(
path='/data/daily_feed.csv',
host='ftp.example.com',
username='user',
password='pass',
mtime=datetime.datetime(2024, 1, 1, 0, 0, 0)
)
# Returns True only if file exists AND was modified after 2024-01-01
if target.exists():
print("Fresh file available")