Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi BigQueryTarget

From Leeroopedia
Revision as of 16:46, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_BigQueryTarget.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data_Warehouse, Google_Cloud
Last Updated 2026-02-10 08:00 GMT

Overview

Comprehensive Luigi contrib module for Google BigQuery integration, providing a client, target, and multiple task types for loading, querying, extracting, and managing BigQuery tables and views.

Description

The bigquery module is one of the most feature-rich contrib modules in Luigi, providing a complete set of classes for interacting with Google BigQuery. It uses the google-api-python-client library and includes built-in retry logic via the tenacity library for handling transient 5xx errors and network issues.

Core Data Types:

  • BQDataset: A named tuple of (project_id, dataset_id, location) representing a BigQuery dataset.
  • BQTable: A named tuple of (project_id, dataset_id, table_id, location) representing a BigQuery table, with a dataset property and a uri property that returns bq://project/dataset/table.

Client:

  • BigQueryClient: Full-featured client wrapping the BigQuery REST API. Supports dataset_exists, table_exists, make_dataset, delete_dataset, delete_table, list_datasets, list_tables, get_view, update_view, run_job, and copy. The client automatically reinitializes its connection after retryable errors. Authentication uses the GCP helper module (luigi.contrib.gcp).

Target:

  • BigQueryTarget (extends luigi.target.Target): Represents a BigQuery table as a Luigi target. Constructed with project_id, dataset_id, table_id, and optional location. The exists() method checks if the table exists in BigQuery.

Task Types:

  • BigQueryLoadTask: Loads data from Google Cloud Storage into BigQuery. Supports configurable source format (AVRO, CSV, JSON, Parquet), encoding, write disposition, schema (auto-detect or explicit), CSV-specific options (field delimiter, skip rows, jagged rows, quoted newlines), and custom job configuration.
  • BigQueryRunQueryTask: Runs a SQL query and stores results in a BigQuery table. Supports write/create disposition, query mode (INTERACTIVE/BATCH), UDF resources, legacy SQL toggle, and flatten results option.
  • BigQueryCreateViewTask: Creates or updates a BigQuery view. The complete() method compares the existing view SQL with the desired SQL to detect changes.
  • BigQueryExtractTask: Extracts (unloads) data from a BigQuery table to Google Cloud Storage. Supports configurable destination format (AVRO, CSV, JSON), compression, print header, and field delimiter.
  • ExternalBigQueryTask: Represents an externally managed BigQuery table for use as a dependency.

Mixin:

  • MixinBigQueryBulkComplete: Enables efficient bulk completion checking for ranges of BigQuery targets, supporting Luigi's range scheduling tools.

Enums and Constants:

  • CreateDisposition, WriteDisposition, QueryMode, SourceFormat, FieldDelimiter, PrintHeader, DestinationFormat, Compression, Encoding

Usage

Use this module when your Luigi pipeline needs to interact with Google BigQuery for data warehousing operations, including loading data from GCS, running analytical queries, managing views, or extracting data from BigQuery to GCS.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/bigquery.py
  • Lines: 1-875

Signature

class BigQueryClient:
    def __init__(self, oauth_credentials=None, descriptor='', http_=None):
        ...

class BigQueryTarget(luigi.target.Target):
    def __init__(self, project_id, dataset_id, table_id, client=None, location=None):
        ...

class BigQueryLoadTask(MixinBigQueryBulkComplete, luigi.Task):
    ...

class BigQueryRunQueryTask(MixinBigQueryBulkComplete, luigi.Task):
    ...

class BigQueryCreateViewTask(luigi.Task):
    ...

class BigQueryExtractTask(luigi.Task):
    ...

class ExternalBigQueryTask(MixinBigQueryBulkComplete, luigi.ExternalTask):
    ...

class MixinBigQueryBulkComplete:
    @classmethod
    def bulk_complete(cls, parameter_tuples):
        ...

Import

from luigi.contrib.bigquery import (
    BigQueryClient,
    BigQueryTarget,
    BigQueryLoadTask,
    BigQueryRunQueryTask,
    BigQueryCreateViewTask,
    BigQueryExtractTask,
    MixinBigQueryBulkComplete,
    ExternalBigQueryTask,
    BQTable,
    BQDataset,
)

I/O Contract

Inputs

Name Type Required Description
project_id str Yes Google Cloud project ID
dataset_id str Yes BigQuery dataset ID
table_id str Yes BigQuery table ID
client BigQueryClient No Pre-configured BigQuery client; auto-created if not provided
location str No Regional location for the dataset (e.g., 'US', 'EU')
oauth_credentials google.auth.Credentials No OAuth credentials for authentication; defaults to application default credentials
descriptor str No JSON descriptor for service discovery; if empty, uses automated discovery
source_format SourceFormat No Format of source data for load tasks (default: NEWLINE_DELIMITED_JSON)
write_disposition WriteDisposition No Behavior when table exists (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY)
query str Yes (query tasks) SQL query text to execute

Outputs

Name Type Description
BigQueryTarget luigi.target.Target Represents a BigQuery table; exists() checks table existence
bool (exists) bool Whether the specified BigQuery table exists
job_id str The BigQuery job ID returned after running a job
GCS files GCSTarget Extracted data files written to Google Cloud Storage (for extract tasks)

Usage Examples

Basic Usage

import luigi
from luigi.contrib.bigquery import BigQueryTarget, BigQueryRunQueryTask

class MyQueryTask(BigQueryRunQueryTask):
    date = luigi.DateParameter()

    @property
    def query(self):
        return """
            SELECT user_id, COUNT(*) as cnt
            FROM `my_project.my_dataset.events`
            WHERE DATE(timestamp) = '{date}'
            GROUP BY user_id
        """.format(date=self.date)

    @property
    def write_disposition(self):
        return 'WRITE_TRUNCATE'

    def output(self):
        return BigQueryTarget(
            project_id='my-project',
            dataset_id='my_dataset',
            table_id='daily_user_counts_{}'.format(self.date.strftime('%Y%m%d'))
        )

Loading Data from GCS

from luigi.contrib.bigquery import BigQueryLoadTask, BigQueryTarget, SourceFormat
from luigi.contrib.gcs import GCSTarget

class LoadCSVTask(BigQueryLoadTask):

    @property
    def source_format(self):
        return SourceFormat.CSV

    @property
    def skip_leading_rows(self):
        return 1  # skip header row

    @property
    def schema(self):
        return [
            {'name': 'user_id', 'type': 'STRING'},
            {'name': 'event', 'type': 'STRING'},
            {'name': 'timestamp', 'type': 'TIMESTAMP'},
        ]

    def requires(self):
        return SomeGCSTask()

    def output(self):
        return BigQueryTarget('my-project', 'my_dataset', 'events')

Creating a View

from luigi.contrib.bigquery import BigQueryCreateViewTask, BigQueryTarget

class CreateUserView(BigQueryCreateViewTask):

    @property
    def view(self):
        return """
            SELECT user_id, MAX(login_time) as last_login
            FROM `my_project.my_dataset.logins`
            GROUP BY user_id
        """

    def output(self):
        return BigQueryTarget('my-project', 'my_dataset', 'active_users_view')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment