Implementation:Spotify Luigi RemoteTarget SSH
| Knowledge Sources | |
|---|---|
| Domains | Remote_Execution, SSH |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
RemoteTarget and RemoteContext provide SSH-based remote file operations and command execution for Luigi, enabling tasks to read, write, and manage files on remote machines transparently through the Luigi Target and FileSystem abstractions.
Description
The SSH contrib module is a light-weight remote execution library that wraps SSH and SCP commands to provide Luigi-compatible remote file access and command execution. It contains four main classes:
- RemoteContext -- Provides functionality similar to the standard library subprocess module, but for remote command execution via SSH. It supports configurable connection parameters including username, key file, port, connection timeout, host key checking, sshpass integration, and TTY allocation. Key methods include Popen() for launching remote processes, check_output() for capturing remote command output, and a tunnel() context manager for creating SSH tunnels between local and remote ports. All SSH commands are assembled with appropriate flags (BatchMode=yes, ControlMaster=no) and security options.
- RemoteFileSystem (extends luigi.target.FileSystem) -- Implements the Luigi FileSystem interface for remote hosts. All filesystem operations (exists(), listdir(), isdir(), remove(), mkdir()) are executed remotely via RemoteContext.check_output(). File transfers use SCP with atomic write semantics -- files are first copied to a temporary path with a random suffix, then atomically moved to the final destination via mv. The put() and get() methods handle both directions of file transfer.
- RemoteTarget (extends luigi.target.FileSystemTarget) -- A Luigi target for files on remote machines. It supports reading (mode='r) via cat piped through SSH and writing (mode='w) via AtomicRemoteFileWriter. The target integrates with Luigi's format system for transparent compression and encoding support.
- AtomicRemoteFileWriter (extends luigi.format.OutputPipeProcessWrapper) -- Ensures atomic writes to remote files by streaming data to a temporary file via SSH pipe and then atomically moving it into place on close(). Includes cleanup logic to remove in-flight temporary files on deletion.
The module also defines RemoteCalledProcessError for better error reporting that includes the remote host in error messages.
Usage
Use RemoteTarget when your Luigi tasks need to read or write files on remote machines accessible via SSH, such as data stored on remote servers, NFS mounts, or edge nodes. Use RemoteContext when you need to execute commands on a remote host as part of a task's run() method. Use RemoteFileSystem when you need to check existence, list, or manage files on a remote host. The SSH tunnel feature is particularly useful for securely accessing services on remote machines through firewalls or when non-secure protocols need to be tunneled over SSH.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File: luigi/contrib/ssh.py
- Lines: 1-355
Signature
class RemoteCalledProcessError(subprocess.CalledProcessError):
def __init__(self, returncode, command, host, output=None): ...
class RemoteContext:
def __init__(self, host, **kwargs): ...
# kwargs: username, key_file, connect_timeout, port,
# no_host_key_check, sshpass, tty
def Popen(self, cmd, **kwargs): ...
def check_output(self, cmd): ...
def tunnel(self, local_port, remote_port=None,
remote_host="localhost"): ... # context manager
class RemoteFileSystem(luigi.target.FileSystem):
def __init__(self, host, **kwargs): ...
def exists(self, path): ...
def listdir(self, path): ...
def isdir(self, path): ...
def remove(self, path, recursive=True): ...
def mkdir(self, path, parents=True, raise_if_exists=False): ...
def put(self, local_path, path): ...
def get(self, path, local_path): ...
class AtomicRemoteFileWriter(luigi.format.OutputPipeProcessWrapper):
def __init__(self, fs, path): ...
def close(self): ...
@property
def tmp_path(self): ...
@property
def fs(self): ...
class RemoteTarget(luigi.target.FileSystemTarget):
def __init__(self, path, host, format=None, **kwargs): ...
def open(self, mode='r'): ...
def put(self, local_path): ...
def get(self, local_path): ...
@property
def fs(self): ...
Import
from luigi.contrib.ssh import RemoteContext, RemoteFileSystem, RemoteTarget
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| host | str | Yes | Hostname or IP address of the remote machine |
| path | str | Yes | Absolute path to the file or directory on the remote machine |
| username | str | No | SSH username for authentication |
| key_file | str | No | Path to the SSH private key file |
| port | str | No | SSH port number (default: system SSH default, typically 22) |
| connect_timeout | int | No | SSH connection timeout in seconds |
| no_host_key_check | bool | No | Disable strict host key checking; default: False |
| sshpass | bool | No | Use sshpass for password-based authentication; default: False |
| tty | bool | No | Allocate a TTY for the SSH connection; default: False |
| format | Format | No | Luigi format for encoding/compression (e.g., luigi.format.Nop) |
Outputs
| Name | Type | Description |
|---|---|---|
| RemoteTarget | FileSystemTarget | A Luigi target representing a file on a remote host, supporting read and write operations |
| check_output result | bytes | Raw output from remote command execution |
| tunnel | context manager | An SSH tunnel between local_port and remote_host:remote_port via the SSH host |
Usage Examples
Basic Usage
import luigi
from luigi.contrib.ssh import RemoteTarget, RemoteContext
class ProcessRemoteFile(luigi.Task):
def output(self):
return RemoteTarget(
'/data/results/output.csv',
'worker-node-01',
username='luigi',
key_file='/home/luigi/.ssh/id_rsa'
)
def run(self):
with self.output().open('w') as f:
f.write('col1,col2\n')
f.write('val1,val2\n')
class ReadRemoteFile(luigi.Task):
def requires(self):
return ProcessRemoteFile()
def run(self):
with self.input().open('r') as f:
for line in f:
print(line.strip())
Remote Command Execution
from luigi.contrib.ssh import RemoteContext
ctx = RemoteContext(
'compute-node',
username='luigi',
key_file='/home/luigi/.ssh/id_rsa',
connect_timeout=10
)
# Execute a command and get output
output = ctx.check_output(['ls', '-la', '/data/'])
print(output.decode('utf-8'))
# Use Popen for streaming
import subprocess
proc = ctx.Popen(['tail', '-f', '/var/log/app.log'],
stdout=subprocess.PIPE)
SSH Tunnel
from luigi.contrib.ssh import RemoteContext
ctx = RemoteContext('bastion-host', username='luigi',
key_file='/home/luigi/.ssh/id_rsa')
with ctx.tunnel(local_port=9200, remote_port=9200,
remote_host='elasticsearch-internal'):
# Access elasticsearch-internal:9200 via localhost:9200
import requests
response = requests.get('http://localhost:9200/_cluster/health')
print(response.json())
File Transfer
from luigi.contrib.ssh import RemoteFileSystem
rfs = RemoteFileSystem('storage-node', username='luigi',
key_file='/home/luigi/.ssh/id_rsa')
# Upload a file
rfs.put('/local/data/input.csv', '/remote/data/input.csv')
# Download a file
rfs.get('/remote/data/output.csv', '/local/data/output.csv')
# Check existence
if rfs.exists('/remote/data/output.csv'):
print('File exists on remote host')