Implementation:Spotify Luigi BigQueryLoadAvro
| Knowledge Sources | |
|---|---|
| Domains | Data_Warehouse, Google_Cloud |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
BigQueryLoadAvro is a specialized Luigi contrib task that extends BigQueryLoadTask to load Apache Avro data from Google Cloud Storage (GCS) into Google BigQuery. It automatically handles Avro schema discovery and propagates Avro documentation (field-level and table-level descriptions) into the resulting BigQuery table metadata.
Description
The BigQueryLoadAvro class sets source_format = SourceFormat.AVRO and provides convenience logic for:
- Avro URI resolution: Automatically appends
/*.avroto GCS directory paths, or uses the path directly if it already ends in.avro. - Schema extraction: Downloads a partial Avro file header from GCS (first 64 KB) to read the embedded Avro schema using the
avroPython library. - Documentation propagation: After the load completes, it patches the BigQuery table description with the Avro schema's
docfield. BigQuery internally copies field-level descriptions from the Avro schema.
This is a fire-and-forget documentation step; if propagation fails, the table is still loaded but a warning is logged.
Usage
Subclass BigQueryLoadAvro and override requires() to return tasks that output to GCS targets (paths should be URIs of .avro files or GCS directory prefixes containing Avro files). Override output() to return a BigQueryTarget representing the destination table.
Code Reference
Source Location
luigi/contrib/bigquery_avro.py (110 lines)
Signature
class BigQueryLoadAvro(BigQueryLoadTask):
source_format = SourceFormat.AVRO
def source_uris(self):
"""Returns list of GCS URIs for Avro input files."""
def _get_input_schema(self):
"""Reads Avro schema from an arbitrary input file (downloads first 64KB)."""
@staticmethod
def _get_writer_schema(datum_reader):
"""Python-version agnostic getter for datum_reader.writer_schema."""
def _set_output_doc(self, avro_schema):
"""Patches BigQuery table description with avro_schema.doc."""
def run(self):
"""Runs parent BigQueryLoadTask.run(), then propagates Avro doc."""
Import
from luigi.contrib.bigquery_avro import BigQueryLoadAvro
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
self.input() |
GCS Target(s) | One or more GCS targets whose paths point to .avro files or GCS directories containing Avro files. Provided by overriding requires().
|
Outputs
| Output | Type | Description |
|---|---|---|
self.output() |
BigQueryTarget |
The destination BigQuery table. Provided by overriding output(). After load, table description is patched with Avro schema doc.
|
Usage Examples
from luigi.contrib.bigquery_avro import BigQueryLoadAvro
from luigi.contrib.bigquery import BigQueryTarget
from luigi.contrib.gcs import GCSTarget
class LoadUserEventsAvro(BigQueryLoadAvro):
date = luigi.DateParameter()
def requires(self):
return GCSTarget('gs://my-bucket/avro/user_events/{}/'.format(self.date))
def output(self):
return BigQueryTarget(
project_id='my-project',
dataset_id='analytics',
table_id='user_events_{}'.format(self.date.strftime('%Y%m%d'))
)
Related Pages
- Spotify_Luigi_Cloud_Data_Targets -- Principle governing cloud-based data target abstractions
luigi.contrib.bigquery.BigQueryLoadTask-- Parent class providing core BigQuery load logicluigi.contrib.gcs.GCSClient-- Used internally for downloading Avro file headers from GCS