Implementation:Mage ai Mage ai Google Cloud Storage Source
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Google_Cloud_Storage, Source_Connector, File_Based |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for extracting data from Google Cloud Storage buckets by reading CSV and Parquet files provided by the Mage integrations source connector framework.
Description
The GoogleCloudStorage source connector extends the base Source class to implement data extraction from Google Cloud Storage (GCS) buckets. It connects via a GoogleCloudStorage connection wrapper using either inline credentials_info or a path_to_credentials_json_file for service account authentication. Discovery lists all blobs in the configured bucket under the specified prefix, filtering by the configured file_type extension. Only non-empty blobs matching the file type are processed. For each matching blob, the file is downloaded and read into a pandas DataFrame; column types are inferred by analyzing the data, with mixed-type columns resolved by counting the most common Python type. The stream identifier is derived from the blob name (path without file extension, dots replaced by underscores). During load_data(), only blobs matching selected streams are processed and their records are yielded as batches. CSV files are read with automatic character encoding detection using charset_normalizer. The test_connection() method verifies bucket existence and attempts to list blobs. The replication method is full-table.
Usage
Use this source connector when building a Mage data pipeline that needs to extract data from Google Cloud Storage buckets containing CSV or Parquet files. Configure with bucket, prefix, file_type, and GCS credentials.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/google_cloud_storage/__init__.py
- Lines: 1-159
Signature
class GoogleCloudStorage(Source):
@property
def bucket(self):
...
@property
def file_type(self) -> str:
...
@property
def prefix(self) -> str:
...
def build_client(self):
...
def discover(self, streams: List[str] = None) -> Catalog:
...
def load_data(self, *args, **kwargs) -> Generator[List[Dict], None, None]:
...
def test_connection(self) -> None:
...
Import
from mage_integrations.sources.google_cloud_storage import GoogleCloudStorage
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | dict | Yes | Configuration dictionary with GCS bucket and credentials settings |
| catalog | Catalog | No | Singer catalog specifying streams to extract |
| state | dict | No | Previous sync state for incremental extraction |
Configuration Parameters
| Name | Type | Required | Description |
|---|---|---|---|
| bucket | str | Yes | GCS bucket name |
| prefix | str | No | Blob name prefix to filter objects |
| file_type | str | Yes | File type to filter blobs by extension (e.g., csv, parquet) |
| credentials_info | dict | No | Inline GCP service account credentials dictionary |
| path_to_credentials_json_file | str | No | File path to GCP service account credentials JSON file |
Outputs
| Name | Type | Description |
|---|---|---|
| catalog | Catalog | Discovered streams with schemas inferred from file contents (from discover()) |
| records | Generator[List[Dict]] | Batches of records from CSV/Parquet blobs (from load_data()) |
Usage Examples
from mage_integrations.sources.google_cloud_storage import GoogleCloudStorage
config = {
"bucket": "my-gcs-bucket",
"prefix": "data/exports/",
"file_type": "csv",
"path_to_credentials_json_file": "/path/to/service_account.json",
}
source = GoogleCloudStorage(config=config)
# Discover available streams
catalog = source.discover()
# Test connection
source.test_connection()