Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi RemoteTarget SSH

From Leeroopedia
Revision as of 16:47, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_RemoteTarget_SSH.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Remote_Execution, SSH
Last Updated 2026-02-10 08:00 GMT

Overview

RemoteTarget and RemoteContext provide SSH-based remote file operations and command execution for Luigi, enabling tasks to read, write, and manage files on remote machines transparently through the Luigi Target and FileSystem abstractions.

Description

The SSH contrib module is a light-weight remote execution library that wraps SSH and SCP commands to provide Luigi-compatible remote file access and command execution. It contains four main classes:

  • RemoteContext -- Provides functionality similar to the standard library subprocess module, but for remote command execution via SSH. It supports configurable connection parameters including username, key file, port, connection timeout, host key checking, sshpass integration, and TTY allocation. Key methods include Popen() for launching remote processes, check_output() for capturing remote command output, and a tunnel() context manager for creating SSH tunnels between local and remote ports. All SSH commands are assembled with appropriate flags (BatchMode=yes, ControlMaster=no) and security options.
  • RemoteFileSystem (extends luigi.target.FileSystem) -- Implements the Luigi FileSystem interface for remote hosts. All filesystem operations (exists(), listdir(), isdir(), remove(), mkdir()) are executed remotely via RemoteContext.check_output(). File transfers use SCP with atomic write semantics -- files are first copied to a temporary path with a random suffix, then atomically moved to the final destination via mv. The put() and get() methods handle both directions of file transfer.
  • RemoteTarget (extends luigi.target.FileSystemTarget) -- A Luigi target for files on remote machines. It supports reading (mode='r) via cat piped through SSH and writing (mode='w) via AtomicRemoteFileWriter. The target integrates with Luigi's format system for transparent compression and encoding support.
  • AtomicRemoteFileWriter (extends luigi.format.OutputPipeProcessWrapper) -- Ensures atomic writes to remote files by streaming data to a temporary file via SSH pipe and then atomically moving it into place on close(). Includes cleanup logic to remove in-flight temporary files on deletion.

The module also defines RemoteCalledProcessError for better error reporting that includes the remote host in error messages.

Usage

Use RemoteTarget when your Luigi tasks need to read or write files on remote machines accessible via SSH, such as data stored on remote servers, NFS mounts, or edge nodes. Use RemoteContext when you need to execute commands on a remote host as part of a task's run() method. Use RemoteFileSystem when you need to check existence, list, or manage files on a remote host. The SSH tunnel feature is particularly useful for securely accessing services on remote machines through firewalls or when non-secure protocols need to be tunneled over SSH.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/ssh.py
  • Lines: 1-355

Signature

class RemoteCalledProcessError(subprocess.CalledProcessError):
    def __init__(self, returncode, command, host, output=None): ...

class RemoteContext:
    def __init__(self, host, **kwargs): ...
        # kwargs: username, key_file, connect_timeout, port,
        #         no_host_key_check, sshpass, tty
    def Popen(self, cmd, **kwargs): ...
    def check_output(self, cmd): ...
    def tunnel(self, local_port, remote_port=None,
               remote_host="localhost"): ...  # context manager

class RemoteFileSystem(luigi.target.FileSystem):
    def __init__(self, host, **kwargs): ...
    def exists(self, path): ...
    def listdir(self, path): ...
    def isdir(self, path): ...
    def remove(self, path, recursive=True): ...
    def mkdir(self, path, parents=True, raise_if_exists=False): ...
    def put(self, local_path, path): ...
    def get(self, path, local_path): ...

class AtomicRemoteFileWriter(luigi.format.OutputPipeProcessWrapper):
    def __init__(self, fs, path): ...
    def close(self): ...
    @property
    def tmp_path(self): ...
    @property
    def fs(self): ...

class RemoteTarget(luigi.target.FileSystemTarget):
    def __init__(self, path, host, format=None, **kwargs): ...
    def open(self, mode='r'): ...
    def put(self, local_path): ...
    def get(self, local_path): ...
    @property
    def fs(self): ...

Import

from luigi.contrib.ssh import RemoteContext, RemoteFileSystem, RemoteTarget

I/O Contract

Inputs

Name Type Required Description
host str Yes Hostname or IP address of the remote machine
path str Yes Absolute path to the file or directory on the remote machine
username str No SSH username for authentication
key_file str No Path to the SSH private key file
port str No SSH port number (default: system SSH default, typically 22)
connect_timeout int No SSH connection timeout in seconds
no_host_key_check bool No Disable strict host key checking; default: False
sshpass bool No Use sshpass for password-based authentication; default: False
tty bool No Allocate a TTY for the SSH connection; default: False
format Format No Luigi format for encoding/compression (e.g., luigi.format.Nop)

Outputs

Name Type Description
RemoteTarget FileSystemTarget A Luigi target representing a file on a remote host, supporting read and write operations
check_output result bytes Raw output from remote command execution
tunnel context manager An SSH tunnel between local_port and remote_host:remote_port via the SSH host

Usage Examples

Basic Usage

import luigi
from luigi.contrib.ssh import RemoteTarget, RemoteContext

class ProcessRemoteFile(luigi.Task):

    def output(self):
        return RemoteTarget(
            '/data/results/output.csv',
            'worker-node-01',
            username='luigi',
            key_file='/home/luigi/.ssh/id_rsa'
        )

    def run(self):
        with self.output().open('w') as f:
            f.write('col1,col2\n')
            f.write('val1,val2\n')


class ReadRemoteFile(luigi.Task):

    def requires(self):
        return ProcessRemoteFile()

    def run(self):
        with self.input().open('r') as f:
            for line in f:
                print(line.strip())

Remote Command Execution

from luigi.contrib.ssh import RemoteContext

ctx = RemoteContext(
    'compute-node',
    username='luigi',
    key_file='/home/luigi/.ssh/id_rsa',
    connect_timeout=10
)

# Execute a command and get output
output = ctx.check_output(['ls', '-la', '/data/'])
print(output.decode('utf-8'))

# Use Popen for streaming
import subprocess
proc = ctx.Popen(['tail', '-f', '/var/log/app.log'],
                 stdout=subprocess.PIPE)

SSH Tunnel

from luigi.contrib.ssh import RemoteContext

ctx = RemoteContext('bastion-host', username='luigi',
                    key_file='/home/luigi/.ssh/id_rsa')

with ctx.tunnel(local_port=9200, remote_port=9200,
                remote_host='elasticsearch-internal'):
    # Access elasticsearch-internal:9200 via localhost:9200
    import requests
    response = requests.get('http://localhost:9200/_cluster/health')
    print(response.json())

File Transfer

from luigi.contrib.ssh import RemoteFileSystem

rfs = RemoteFileSystem('storage-node', username='luigi',
                       key_file='/home/luigi/.ssh/id_rsa')

# Upload a file
rfs.put('/local/data/input.csv', '/remote/data/input.csv')

# Download a file
rfs.get('/remote/data/output.csv', '/local/data/output.csv')

# Check existence
if rfs.exists('/remote/data/output.csv'):
    print('File exists on remote host')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment