Implementation:Mage ai Mage ai Azure Blob Storage Source
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Azure_Blob_Storage, Source_Connector, File_Based |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for extracting data from Azure Blob Storage containers by reading CSV and Parquet files provided by the Mage integrations source connector framework.
Description
The AzureBlobStorage source connector extends the base Source class to implement data extraction from Azure Blob Storage containers. It connects via a connection string to a BlobServiceClient, lists blobs under a configured prefix, and reads CSV and Parquet files into pandas DataFrames. Schema discovery uses the first non-empty blob found under the prefix, inferring column types from the actual data values using pandas type inference. Mixed-type columns are resolved by counting the most common Python type (list, dict, or string). Each blob's key path (minus the filename) is used to derive the stream identifier by joining path segments with underscores. During load_data(), all non-empty blobs under the prefix are iterated and each file's records are yielded as a batch. The test_connection() method verifies that the container exists and that at least one blob can be listed under the prefix. File type is auto-detected from the file extension (.parquet or .csv). The replication method is full-table.
Usage
Use this source connector when building a Mage data pipeline that needs to extract data from Azure Blob Storage containers containing CSV or Parquet files. Configure with connection_string, container_name, and prefix.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/azure_blob_storage/__init__.py
- Lines: 1-150
Signature
class AzureBlobStorage(Source):
@property
def container_name(self) -> str:
...
@property
def prefix(self) -> str:
...
def build_client(self):
...
def discover(self, streams: List[str] = None) -> Catalog:
...
def load_data(self, *args, **kwargs) -> Generator[List[Dict], None, None]:
...
def test_connection(self) -> None:
...
Import
from mage_integrations.sources.azure_blob_storage import AzureBlobStorage
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | dict | Yes | Configuration dictionary with Azure connection string and container settings |
| catalog | Catalog | No | Singer catalog specifying streams to extract |
| state | dict | No | Previous sync state for incremental extraction |
Configuration Parameters
| Name | Type | Required | Description |
|---|---|---|---|
| connection_string | str | Yes | Azure Storage account connection string |
| container_name | str | Yes | Name of the blob container to read from |
| prefix | str | Yes | Blob name prefix to filter objects |
Outputs
| Name | Type | Description |
|---|---|---|
| catalog | Catalog | Discovered streams with schemas inferred from file contents (from discover()) |
| records | Generator[List[Dict]] | Batches of records from CSV/Parquet blobs (from load_data()) |
Usage Examples
from mage_integrations.sources.azure_blob_storage import AzureBlobStorage
config = {
"connection_string": "DefaultEndpointsProtocol=https;AccountName=myaccount;AccountKey=mykey;EndpointSuffix=core.windows.net",
"container_name": "my-data-container",
"prefix": "exports/daily/",
}
source = AzureBlobStorage(config=config)
# Discover available streams
catalog = source.discover()
# Test connection
source.test_connection()