Implementation:Spotify Luigi AzureBlobTarget
| Knowledge Sources | |
|---|---|
| Domains | Cloud_Storage, Azure |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
Luigi contrib module providing Azure Blob Storage integration through the AzureBlobClient filesystem and AzureBlobTarget target classes.
Description
The azureblob module implements Luigi's FileSystem and FileSystemTarget abstractions for Azure Blob Storage. It consists of two primary classes:
- AzureBlobClient (extends
FileSystem): Provides authenticated access to Azure Blob Storage accounts. It supports authentication via account key, SAS token, or connection string. The client manages blob operations including upload, download (as bytes or file), copy, move, remove, and existence checks. Azure Blob Storage organizes data into storage accounts containing containers, which in turn hold blobs. The client wraps theazure.storage.blob.BlobServiceClientand uses blob leases to ensure atomic operations during uploads, copies, and deletions.
- AzureBlobTarget (extends
FileSystemTarget): Represents a specific blob in Azure Blob Storage as a Luigi target. It supports reading viaReadableAzureBlobFile(which can optionally download the blob to a local temporary file) and writing viaAtomicAzureBlobFile(which writes to a local temp file and uploads on close for atomicity). The target integrates with Luigi's format system for transparent encoding/decoding.
Additional helper classes include:
- ReadableAzureBlobFile: A readable file-like object that streams or downloads blob content.
- AtomicAzureBlobFile (extends
AtomicLocalFile): Writes to a local temporary file, then uploads to Azure Blob Storage on close for atomic write semantics.
Usage
Use this module when your Luigi pipeline needs to read from or write to Azure Blob Storage. It is suitable for cloud-based ETL workflows, data lake ingestion, and any scenario where Azure Blob Storage serves as an intermediate or final data store.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File:
luigi/contrib/azureblob.py - Lines: 1-335
Signature
class AzureBlobClient(FileSystem):
def __init__(self, account_name=None, account_key=None, sas_token=None, **kwargs):
...
class AzureBlobTarget(FileSystemTarget):
def __init__(self, container, blob, client=None, format=None, download_when_reading=True, **kwargs):
...
Import
from luigi.contrib.azureblob import AzureBlobClient, AzureBlobTarget
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| account_name | str | No | The Azure storage account name used for authentication and endpoint construction |
| account_key | str | No | The storage account key for shared key authentication |
| sas_token | str | No | A shared access signature token for scoped authentication |
| container | str | Yes (Target) | The Azure container name where the blob is stored |
| blob | str | Yes (Target) | The name of the blob within the specified container |
| client | AzureBlobClient | No | An optional pre-configured client instance; defaults to anonymous access |
| format | luigi.format.Format | No | Luigi format for encoding/decoding; defaults to get_default_format()
|
| download_when_reading | bool | No | If True (default), downloads blob to a temp file when reading; otherwise streams bytes directly |
| kwargs | dict | No | Additional connection options: protocol, connection_string, endpoint_suffix, custom_domain, token_credential |
Outputs
| Name | Type | Description |
|---|---|---|
| ReadableAzureBlobFile | file-like | Returned when opening target in read mode ('r'); provides read() and context manager support
|
| AtomicAzureBlobFile | file-like | Returned when opening target in write mode ('w'); writes locally then uploads atomically on close |
| bool (exists) | bool | exists() returns True if the container or blob exists in Azure Blob Storage
|
Usage Examples
Basic Usage
import luigi
from luigi.contrib.azureblob import AzureBlobClient, AzureBlobTarget
class MyAzureTask(luigi.Task):
def output(self):
return AzureBlobTarget(
container='my-container',
blob='output/data.csv',
client=AzureBlobClient(
account_name='mystorageaccount',
account_key='my-account-key'
)
)
def run(self):
with self.output().open('w') as f:
f.write('col1,col2\n')
f.write('value1,value2\n')
Reading from Azure Blob Storage
class ReadAzureTask(luigi.Task):
def requires(self):
return MyAzureTask()
def run(self):
with self.input().open('r') as f:
data = f.read()
print(data)
def output(self):
return luigi.LocalTarget('/tmp/local_copy.csv')
Using SAS Token Authentication
client = AzureBlobClient(
account_name='mystorageaccount',
sas_token='sv=2020-08-04&ss=b&srt=sco&sp=rwdlacitfx&se=...'
)
target = AzureBlobTarget(
container='secure-container',
blob='data/report.json',
client=client
)