Implementation:Spotify Luigi S3Target
Template:Knowledge Sources
Template:Domains
Template:Last Updated
Overview
Concrete tool for representing Amazon S3 file objects as pipeline targets -- supporting existence checking, atomic writes, and streaming reads -- provided by Luigi.
Description
The S3Target class (luigi.contrib.s3.S3Target) is a subclass of FileSystemTarget that wraps an s3:// path and delegates all file-system operations to an S3Client instance. The client is built on the boto3 library and provides methods for existence checking (exists), uploading (put, put_multipart), downloading (get), listing (list, listdir), copying (copy), moving (move), and removing (remove) objects.
Key behaviours:
- Atomic writes -- When opened in write mode (
'w'), the target returns a pipe-writer wrapping anAtomicS3File. Data is written to a local temporary file first, and only uploaded to S3 when the file is closed viamove_to_final_destination(). - Streaming reads -- When opened in read mode (
'r'), the target returns a pipe-reader wrapping aReadableS3Filethat streams content from S3 line by line. - Authentication --
S3Clientsupports explicit credentials, STS role assumption, and boto3's default credential chain. - Multi-part upload -- Files larger than 8 MB are uploaded using S3 multi-part upload via
boto3.s3.transfer.TransferConfig.
Usage
Import S3Target to use as the return value of a task's output() method, or as the return value of a dependency's output() referenced via self.input().
Code Reference
Source Location
luigi/contrib/s3.py:
S3Client: lines 68--551AtomicS3File: lines 554--568ReadableS3File: lines 571--640S3Target: lines 643--675
Class Signatures
class S3Client(FileSystem):
DEFAULT_PART_SIZE = 8388608
DEFAULT_THREADS = 100
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
aws_session_token=None, **kwargs):
...
class S3Target(FileSystemTarget):
fs = None
def __init__(self, path, format=None, client=None, **kwargs):
super(S3Target, self).__init__(path)
if format is None:
format = get_default_format()
self.path = path
self.format = format
self.fs = client or S3Client()
self.s3_options = kwargs
def open(self, mode='r'):
...
Import
from luigi.contrib.s3 import S3Target, S3Client
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
path |
str |
An S3 URI (e.g., "s3://my-bucket/path/to/file").
|
format |
luigi.format.Format or None |
Optional format wrapper (e.g., Gzip). Defaults to get_default_format().
|
client |
S3Client or None |
Optional pre-configured S3 client. A new one is created if None.
|
**kwargs |
dict |
Extra arguments forwarded to boto3's put_object / upload_fileobj (e.g., ServerSideEncryption).
|
Outputs
| Name | Type | Description |
|---|---|---|
open('r') |
file-like (ReadableS3File wrapped by format pipe-reader) |
Streaming reader for the S3 object's contents. |
open('w') |
file-like (AtomicS3File wrapped by format pipe-writer) |
Atomic writer: data goes to a local temp file, then is uploaded to S3 on close. |
exists() |
bool |
Whether the S3 object at path exists.
|
Usage Examples
Example 1: S3Target as Task Output in a PySpark Word Count
from luigi.contrib.s3 import S3Target
from luigi.contrib.spark import PySparkTask
class InlinePySparkWordCount(PySparkTask):
driver_memory = '2g'
executor_memory = '3g'
def input(self):
return S3Target("s3n://bucket.example.org/wordcount.input")
def output(self):
return S3Target("s3n://bucket.example.org/wordcount.output")
def main(self, sc, *args):
sc.textFile(self.input().path) \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.saveAsTextFile(self.output().path)
Example 2: Reading and Writing with S3Target Directly
from luigi.contrib.s3 import S3Target
import luigi
class UploadReport(luigi.Task):
date = luigi.DateParameter()
def output(self):
return S3Target("s3://reports-bucket/daily/{}.csv".format(self.date))
def run(self):
with self.output().open('w') as f:
f.write("date,metric,value\n")
f.write("{},page_views,12345\n".format(self.date))
Example 3: Using a Custom S3Client with Role Assumption
from luigi.contrib.s3 import S3Target, S3Client
client = S3Client(
aws_role_arn="arn:aws:iam::123456789012:role/DataPipelineRole",
aws_role_session_name="luigi-session",
region_name="us-west-2"
)
target = S3Target("s3://secure-bucket/output/results.parquet", client=client)
print(target.exists()) # False until the task writes to this path