Implementation:Spotify Luigi GCSTarget
| Knowledge Sources | |
|---|---|
| Domains | Cloud_Storage, Google_Cloud |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
Luigi contrib module providing Google Cloud Storage integration through the GCSClient filesystem, GCSTarget target, and GCSFlagTarget directory-based target classes.
Description
The gcs module implements Luigi's FileSystem and FileSystemTarget abstractions for Google Cloud Storage (GCS). It uses the google-api-python-client library and includes built-in retry logic via tenacity for handling transient 5xx HTTP errors and network issues. The module also implements eventual consistency wait loops for create/delete operations.
Core Classes:
- GCSClient (extends
FileSystem): Full-featured GCS filesystem client. By default it uses application default credentials, but also accepts explicit OAuth credentials. Operations include:exists(path): Checks whether an object or directory-like prefix exists.isdir(path): Checks for directory existence by looking for objects with a trailing slash or objects sharing the prefix.remove(path, recursive): Deletes objects or entire directories (using batch HTTP requests for efficiency).put(filename, dest_path, mimetype, chunksize): Uploads a local file with resumable uploads for non-empty files.put_multiple(filepaths, remote_directory, ...): Uploads multiple files, optionally using multiprocessing.put_string(contents, dest_path): Uploads string/bytes content directly.mkdir(path): Creates a directory marker object.copy(source_path, destination_path): Copies objects or entire directories.move(source_path, destination_path): Moves objects (copy + remove).listdir(path): Lists directory contents as an iterable.list_wildcard(wildcard_path): Lists objects matching a simple glob pattern with a single '*'.download(path, chunksize, chunk_callback): Downloads objects to a temporary local file.
- GCSTarget (extends
FileSystemTarget): Represents a single GCS object as a Luigi target. In read mode, it downloads the object viaGCSClient.download()and wraps it through Luigi's format system. In write mode, it usesAtomicGCSFilefor atomic uploads.
- GCSFlagTarget (extends
GCSTarget): A directory-based target that checks for a flag file (defaults to_SUCCESS) to signify job completion. This pattern is common for Hadoop-style outputs where a directory of files is produced and the presence of a success marker file indicates the job finished correctly. The path must end with '/'.
- AtomicGCSFile (extends
AtomicLocalFile): Writes to a local temp file and uploads to GCS viaGCSClient.put()on close.
Constants:
CHUNKSIZE: 10 MB default chunk size for uploads/downloads.DEFAULT_MIMETYPE:'application/octet-stream'used when the MIME type cannot be guessed.GCS_BATCH_URI:'https://storage.googleapis.com/batch/storage/v1'for batch delete operations.
Usage
Use this module when your Luigi pipeline needs to read from or write to Google Cloud Storage. It is the most common storage backend for Luigi pipelines running on Google Cloud Platform and integrates closely with BigQuery and Dataflow tasks.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File:
luigi/contrib/gcs.py - Lines: 1-514
Signature
class GCSClient(luigi.target.FileSystem):
def __init__(self, oauth_credentials=None, descriptor='', http_=None,
chunksize=CHUNKSIZE, **discovery_build_kwargs):
...
class GCSTarget(luigi.target.FileSystemTarget):
def __init__(self, path, format=None, client=None):
...
class GCSFlagTarget(GCSTarget):
def __init__(self, path, format=None, client=None, flag='_SUCCESS'):
...
class AtomicGCSFile(luigi.target.AtomicLocalFile):
def __init__(self, path, gcs_client):
...
Import
from luigi.contrib.gcs import GCSClient, GCSTarget, GCSFlagTarget
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| path | str | Yes | GCS path in gs://bucket/object format
|
| oauth_credentials | google.auth.Credentials | No | OAuth credentials for authentication; defaults to application default credentials |
| descriptor | str | No | JSON descriptor for service discovery; if empty, uses automated discovery |
| http_ | httplib2.Http | No | Custom HTTP transport object |
| chunksize | int | No | Bytes per upload/download chunk; defaults to 10 MB |
| format | luigi.format.Format | No | Luigi format for encoding/decoding; defaults to get_default_format()
|
| client | GCSClient | No | Pre-configured GCS client; auto-created if not provided |
| flag | str | No | Flag file name for GCSFlagTarget; defaults to '_SUCCESS'
|
Outputs
| Name | Type | Description |
|---|---|---|
| file-like (read) | BufferedReader | Returned when opening target in read mode ('r'); object is downloaded to a temp file |
| AtomicGCSFile (write) | file-like | Returned when opening target in write mode ('w'); writes locally then uploads to GCS |
| bool (exists) | bool | exists() returns True if the GCS object or prefix exists
|
Usage Examples
Basic Usage
import luigi
from luigi.contrib.gcs import GCSTarget
class WriteToGCS(luigi.Task):
def output(self):
return GCSTarget('gs://my-bucket/output/results.csv')
def run(self):
with self.output().open('w') as f:
f.write('col1,col2\n')
f.write('value1,value2\n')
Reading from GCS
class ReadFromGCS(luigi.Task):
def requires(self):
return WriteToGCS()
def run(self):
with self.input().open('r') as f:
data = f.read()
# Process data...
def output(self):
return luigi.LocalTarget('/tmp/processed.txt')
Using GCSFlagTarget for Hadoop-Style Output
from luigi.contrib.gcs import GCSFlagTarget
class HadoopStyleOutput(luigi.Task):
def output(self):
# Path must end with '/'
return GCSFlagTarget('gs://my-bucket/output/daily_run/')
def run(self):
# Write multiple output files to the directory...
# Then write the _SUCCESS flag
client = self.output().fs
client.put_string('', 'gs://my-bucket/output/daily_run/_SUCCESS')
Using Custom Credentials
from luigi.contrib.gcs import GCSClient, GCSTarget
# Using explicit service account credentials
import google.auth
credentials, project = google.auth.default(
scopes=['https://www.googleapis.com/auth/devstorage.read_write']
)
client = GCSClient(oauth_credentials=credentials)
target = GCSTarget('gs://my-bucket/data/output.json', client=client)