Implementation:Spotify Luigi BigQueryTarget
| Knowledge Sources | |
|---|---|
| Domains | Data_Warehouse, Google_Cloud |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
Comprehensive Luigi contrib module for Google BigQuery integration, providing a client, target, and multiple task types for loading, querying, extracting, and managing BigQuery tables and views.
Description
The bigquery module is one of the most feature-rich contrib modules in Luigi, providing a complete set of classes for interacting with Google BigQuery. It uses the google-api-python-client library and includes built-in retry logic via the tenacity library for handling transient 5xx errors and network issues.
Core Data Types:
- BQDataset: A named tuple of
(project_id, dataset_id, location)representing a BigQuery dataset. - BQTable: A named tuple of
(project_id, dataset_id, table_id, location)representing a BigQuery table, with adatasetproperty and auriproperty that returnsbq://project/dataset/table.
Client:
- BigQueryClient: Full-featured client wrapping the BigQuery REST API. Supports
dataset_exists,table_exists,make_dataset,delete_dataset,delete_table,list_datasets,list_tables,get_view,update_view,run_job, andcopy. The client automatically reinitializes its connection after retryable errors. Authentication uses the GCP helper module (luigi.contrib.gcp).
Target:
- BigQueryTarget (extends
luigi.target.Target): Represents a BigQuery table as a Luigi target. Constructed withproject_id,dataset_id,table_id, and optionallocation. Theexists()method checks if the table exists in BigQuery.
Task Types:
- BigQueryLoadTask: Loads data from Google Cloud Storage into BigQuery. Supports configurable source format (AVRO, CSV, JSON, Parquet), encoding, write disposition, schema (auto-detect or explicit), CSV-specific options (field delimiter, skip rows, jagged rows, quoted newlines), and custom job configuration.
- BigQueryRunQueryTask: Runs a SQL query and stores results in a BigQuery table. Supports write/create disposition, query mode (INTERACTIVE/BATCH), UDF resources, legacy SQL toggle, and flatten results option.
- BigQueryCreateViewTask: Creates or updates a BigQuery view. The
complete()method compares the existing view SQL with the desired SQL to detect changes. - BigQueryExtractTask: Extracts (unloads) data from a BigQuery table to Google Cloud Storage. Supports configurable destination format (AVRO, CSV, JSON), compression, print header, and field delimiter.
- ExternalBigQueryTask: Represents an externally managed BigQuery table for use as a dependency.
Mixin:
- MixinBigQueryBulkComplete: Enables efficient bulk completion checking for ranges of BigQuery targets, supporting Luigi's range scheduling tools.
Enums and Constants:
CreateDisposition,WriteDisposition,QueryMode,SourceFormat,FieldDelimiter,PrintHeader,DestinationFormat,Compression,Encoding
Usage
Use this module when your Luigi pipeline needs to interact with Google BigQuery for data warehousing operations, including loading data from GCS, running analytical queries, managing views, or extracting data from BigQuery to GCS.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File:
luigi/contrib/bigquery.py - Lines: 1-875
Signature
class BigQueryClient:
def __init__(self, oauth_credentials=None, descriptor='', http_=None):
...
class BigQueryTarget(luigi.target.Target):
def __init__(self, project_id, dataset_id, table_id, client=None, location=None):
...
class BigQueryLoadTask(MixinBigQueryBulkComplete, luigi.Task):
...
class BigQueryRunQueryTask(MixinBigQueryBulkComplete, luigi.Task):
...
class BigQueryCreateViewTask(luigi.Task):
...
class BigQueryExtractTask(luigi.Task):
...
class ExternalBigQueryTask(MixinBigQueryBulkComplete, luigi.ExternalTask):
...
class MixinBigQueryBulkComplete:
@classmethod
def bulk_complete(cls, parameter_tuples):
...
Import
from luigi.contrib.bigquery import (
BigQueryClient,
BigQueryTarget,
BigQueryLoadTask,
BigQueryRunQueryTask,
BigQueryCreateViewTask,
BigQueryExtractTask,
MixinBigQueryBulkComplete,
ExternalBigQueryTask,
BQTable,
BQDataset,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| project_id | str | Yes | Google Cloud project ID |
| dataset_id | str | Yes | BigQuery dataset ID |
| table_id | str | Yes | BigQuery table ID |
| client | BigQueryClient | No | Pre-configured BigQuery client; auto-created if not provided |
| location | str | No | Regional location for the dataset (e.g., 'US', 'EU') |
| oauth_credentials | google.auth.Credentials | No | OAuth credentials for authentication; defaults to application default credentials |
| descriptor | str | No | JSON descriptor for service discovery; if empty, uses automated discovery |
| source_format | SourceFormat | No | Format of source data for load tasks (default: NEWLINE_DELIMITED_JSON) |
| write_disposition | WriteDisposition | No | Behavior when table exists (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY) |
| query | str | Yes (query tasks) | SQL query text to execute |
Outputs
| Name | Type | Description |
|---|---|---|
| BigQueryTarget | luigi.target.Target | Represents a BigQuery table; exists() checks table existence
|
| bool (exists) | bool | Whether the specified BigQuery table exists |
| job_id | str | The BigQuery job ID returned after running a job |
| GCS files | GCSTarget | Extracted data files written to Google Cloud Storage (for extract tasks) |
Usage Examples
Basic Usage
import luigi
from luigi.contrib.bigquery import BigQueryTarget, BigQueryRunQueryTask
class MyQueryTask(BigQueryRunQueryTask):
date = luigi.DateParameter()
@property
def query(self):
return """
SELECT user_id, COUNT(*) as cnt
FROM `my_project.my_dataset.events`
WHERE DATE(timestamp) = '{date}'
GROUP BY user_id
""".format(date=self.date)
@property
def write_disposition(self):
return 'WRITE_TRUNCATE'
def output(self):
return BigQueryTarget(
project_id='my-project',
dataset_id='my_dataset',
table_id='daily_user_counts_{}'.format(self.date.strftime('%Y%m%d'))
)
Loading Data from GCS
from luigi.contrib.bigquery import BigQueryLoadTask, BigQueryTarget, SourceFormat
from luigi.contrib.gcs import GCSTarget
class LoadCSVTask(BigQueryLoadTask):
@property
def source_format(self):
return SourceFormat.CSV
@property
def skip_leading_rows(self):
return 1 # skip header row
@property
def schema(self):
return [
{'name': 'user_id', 'type': 'STRING'},
{'name': 'event', 'type': 'STRING'},
{'name': 'timestamp', 'type': 'TIMESTAMP'},
]
def requires(self):
return SomeGCSTask()
def output(self):
return BigQueryTarget('my-project', 'my_dataset', 'events')
Creating a View
from luigi.contrib.bigquery import BigQueryCreateViewTask, BigQueryTarget
class CreateUserView(BigQueryCreateViewTask):
@property
def view(self):
return """
SELECT user_id, MAX(login_time) as last_login
FROM `my_project.my_dataset.logins`
GROUP BY user_id
"""
def output(self):
return BigQueryTarget('my-project', 'my_dataset', 'active_users_view')