Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi S3Target

From Leeroopedia
Revision as of 16:47, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_S3Target.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Template:Knowledge Sources Template:Domains Template:Last Updated

Overview

Concrete tool for representing Amazon S3 file objects as pipeline targets -- supporting existence checking, atomic writes, and streaming reads -- provided by Luigi.

Description

The S3Target class (luigi.contrib.s3.S3Target) is a subclass of FileSystemTarget that wraps an s3:// path and delegates all file-system operations to an S3Client instance. The client is built on the boto3 library and provides methods for existence checking (exists), uploading (put, put_multipart), downloading (get), listing (list, listdir), copying (copy), moving (move), and removing (remove) objects.

Key behaviours:

  • Atomic writes -- When opened in write mode ('w'), the target returns a pipe-writer wrapping an AtomicS3File. Data is written to a local temporary file first, and only uploaded to S3 when the file is closed via move_to_final_destination().
  • Streaming reads -- When opened in read mode ('r'), the target returns a pipe-reader wrapping a ReadableS3File that streams content from S3 line by line.
  • Authentication -- S3Client supports explicit credentials, STS role assumption, and boto3's default credential chain.
  • Multi-part upload -- Files larger than 8 MB are uploaded using S3 multi-part upload via boto3.s3.transfer.TransferConfig.

Usage

Import S3Target to use as the return value of a task's output() method, or as the return value of a dependency's output() referenced via self.input().

Code Reference

Source Location

luigi/contrib/s3.py:

  • S3Client: lines 68--551
  • AtomicS3File: lines 554--568
  • ReadableS3File: lines 571--640
  • S3Target: lines 643--675

Class Signatures

class S3Client(FileSystem):
    DEFAULT_PART_SIZE = 8388608
    DEFAULT_THREADS = 100

    def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
                 aws_session_token=None, **kwargs):
        ...
class S3Target(FileSystemTarget):
    fs = None

    def __init__(self, path, format=None, client=None, **kwargs):
        super(S3Target, self).__init__(path)
        if format is None:
            format = get_default_format()
        self.path = path
        self.format = format
        self.fs = client or S3Client()
        self.s3_options = kwargs

    def open(self, mode='r'):
        ...

Import

from luigi.contrib.s3 import S3Target, S3Client

I/O Contract

Inputs

Name Type Description
path str An S3 URI (e.g., "s3://my-bucket/path/to/file").
format luigi.format.Format or None Optional format wrapper (e.g., Gzip). Defaults to get_default_format().
client S3Client or None Optional pre-configured S3 client. A new one is created if None.
**kwargs dict Extra arguments forwarded to boto3's put_object / upload_fileobj (e.g., ServerSideEncryption).

Outputs

Name Type Description
open('r') file-like (ReadableS3File wrapped by format pipe-reader) Streaming reader for the S3 object's contents.
open('w') file-like (AtomicS3File wrapped by format pipe-writer) Atomic writer: data goes to a local temp file, then is uploaded to S3 on close.
exists() bool Whether the S3 object at path exists.

Usage Examples

Example 1: S3Target as Task Output in a PySpark Word Count

from luigi.contrib.s3 import S3Target
from luigi.contrib.spark import PySparkTask


class InlinePySparkWordCount(PySparkTask):
    driver_memory = '2g'
    executor_memory = '3g'

    def input(self):
        return S3Target("s3n://bucket.example.org/wordcount.input")

    def output(self):
        return S3Target("s3n://bucket.example.org/wordcount.output")

    def main(self, sc, *args):
        sc.textFile(self.input().path) \
          .flatMap(lambda line: line.split()) \
          .map(lambda word: (word, 1)) \
          .reduceByKey(lambda a, b: a + b) \
          .saveAsTextFile(self.output().path)

Example 2: Reading and Writing with S3Target Directly

from luigi.contrib.s3 import S3Target
import luigi


class UploadReport(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return S3Target("s3://reports-bucket/daily/{}.csv".format(self.date))

    def run(self):
        with self.output().open('w') as f:
            f.write("date,metric,value\n")
            f.write("{},page_views,12345\n".format(self.date))

Example 3: Using a Custom S3Client with Role Assumption

from luigi.contrib.s3 import S3Target, S3Client


client = S3Client(
    aws_role_arn="arn:aws:iam::123456789012:role/DataPipelineRole",
    aws_role_session_name="luigi-session",
    region_name="us-west-2"
)

target = S3Target("s3://secure-bucket/output/results.parquet", client=client)
print(target.exists())  # False until the task writes to this path

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment