Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi GCSTarget

From Leeroopedia


Knowledge Sources
Domains Cloud_Storage, Google_Cloud
Last Updated 2026-02-10 08:00 GMT

Overview

Luigi contrib module providing Google Cloud Storage integration through the GCSClient filesystem, GCSTarget target, and GCSFlagTarget directory-based target classes.

Description

The gcs module implements Luigi's FileSystem and FileSystemTarget abstractions for Google Cloud Storage (GCS). It uses the google-api-python-client library and includes built-in retry logic via tenacity for handling transient 5xx HTTP errors and network issues. The module also implements eventual consistency wait loops for create/delete operations.

Core Classes:

  • GCSClient (extends FileSystem): Full-featured GCS filesystem client. By default it uses application default credentials, but also accepts explicit OAuth credentials. Operations include:
    • exists(path): Checks whether an object or directory-like prefix exists.
    • isdir(path): Checks for directory existence by looking for objects with a trailing slash or objects sharing the prefix.
    • remove(path, recursive): Deletes objects or entire directories (using batch HTTP requests for efficiency).
    • put(filename, dest_path, mimetype, chunksize): Uploads a local file with resumable uploads for non-empty files.
    • put_multiple(filepaths, remote_directory, ...): Uploads multiple files, optionally using multiprocessing.
    • put_string(contents, dest_path): Uploads string/bytes content directly.
    • mkdir(path): Creates a directory marker object.
    • copy(source_path, destination_path): Copies objects or entire directories.
    • move(source_path, destination_path): Moves objects (copy + remove).
    • listdir(path): Lists directory contents as an iterable.
    • list_wildcard(wildcard_path): Lists objects matching a simple glob pattern with a single '*'.
    • download(path, chunksize, chunk_callback): Downloads objects to a temporary local file.
  • GCSTarget (extends FileSystemTarget): Represents a single GCS object as a Luigi target. In read mode, it downloads the object via GCSClient.download() and wraps it through Luigi's format system. In write mode, it uses AtomicGCSFile for atomic uploads.
  • GCSFlagTarget (extends GCSTarget): A directory-based target that checks for a flag file (defaults to _SUCCESS) to signify job completion. This pattern is common for Hadoop-style outputs where a directory of files is produced and the presence of a success marker file indicates the job finished correctly. The path must end with '/'.
  • AtomicGCSFile (extends AtomicLocalFile): Writes to a local temp file and uploads to GCS via GCSClient.put() on close.

Constants:

  • CHUNKSIZE: 10 MB default chunk size for uploads/downloads.
  • DEFAULT_MIMETYPE: 'application/octet-stream' used when the MIME type cannot be guessed.
  • GCS_BATCH_URI: 'https://storage.googleapis.com/batch/storage/v1' for batch delete operations.

Usage

Use this module when your Luigi pipeline needs to read from or write to Google Cloud Storage. It is the most common storage backend for Luigi pipelines running on Google Cloud Platform and integrates closely with BigQuery and Dataflow tasks.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/gcs.py
  • Lines: 1-514

Signature

class GCSClient(luigi.target.FileSystem):
    def __init__(self, oauth_credentials=None, descriptor='', http_=None,
                 chunksize=CHUNKSIZE, **discovery_build_kwargs):
        ...

class GCSTarget(luigi.target.FileSystemTarget):
    def __init__(self, path, format=None, client=None):
        ...

class GCSFlagTarget(GCSTarget):
    def __init__(self, path, format=None, client=None, flag='_SUCCESS'):
        ...

class AtomicGCSFile(luigi.target.AtomicLocalFile):
    def __init__(self, path, gcs_client):
        ...

Import

from luigi.contrib.gcs import GCSClient, GCSTarget, GCSFlagTarget

I/O Contract

Inputs

Name Type Required Description
path str Yes GCS path in gs://bucket/object format
oauth_credentials google.auth.Credentials No OAuth credentials for authentication; defaults to application default credentials
descriptor str No JSON descriptor for service discovery; if empty, uses automated discovery
http_ httplib2.Http No Custom HTTP transport object
chunksize int No Bytes per upload/download chunk; defaults to 10 MB
format luigi.format.Format No Luigi format for encoding/decoding; defaults to get_default_format()
client GCSClient No Pre-configured GCS client; auto-created if not provided
flag str No Flag file name for GCSFlagTarget; defaults to '_SUCCESS'

Outputs

Name Type Description
file-like (read) BufferedReader Returned when opening target in read mode ('r'); object is downloaded to a temp file
AtomicGCSFile (write) file-like Returned when opening target in write mode ('w'); writes locally then uploads to GCS
bool (exists) bool exists() returns True if the GCS object or prefix exists

Usage Examples

Basic Usage

import luigi
from luigi.contrib.gcs import GCSTarget

class WriteToGCS(luigi.Task):
    def output(self):
        return GCSTarget('gs://my-bucket/output/results.csv')

    def run(self):
        with self.output().open('w') as f:
            f.write('col1,col2\n')
            f.write('value1,value2\n')

Reading from GCS

class ReadFromGCS(luigi.Task):
    def requires(self):
        return WriteToGCS()

    def run(self):
        with self.input().open('r') as f:
            data = f.read()
            # Process data...

    def output(self):
        return luigi.LocalTarget('/tmp/processed.txt')

Using GCSFlagTarget for Hadoop-Style Output

from luigi.contrib.gcs import GCSFlagTarget

class HadoopStyleOutput(luigi.Task):
    def output(self):
        # Path must end with '/'
        return GCSFlagTarget('gs://my-bucket/output/daily_run/')

    def run(self):
        # Write multiple output files to the directory...
        # Then write the _SUCCESS flag
        client = self.output().fs
        client.put_string('', 'gs://my-bucket/output/daily_run/_SUCCESS')

Using Custom Credentials

from luigi.contrib.gcs import GCSClient, GCSTarget

# Using explicit service account credentials
import google.auth

credentials, project = google.auth.default(
    scopes=['https://www.googleapis.com/auth/devstorage.read_write']
)
client = GCSClient(oauth_credentials=credentials)

target = GCSTarget('gs://my-bucket/data/output.json', client=client)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment