Implementation:Mage ai Mage ai Update Catalog Dict
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, ETL |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for configuring stream selection, column inclusion, and replication settings in a Singer catalog provided by the Mage integrations framework.
Description
update_catalog_dict modifies a raw catalog dictionary to configure a specific stream for sync. It sets key_properties, bookmark_properties, replication_method, unique_constraints, and unique_conflict_method on the stream entry, then iterates through metadata to set the "selected" flag on each column based on inclusion rules. Columns with "automatic" inclusion are always selected; "unsupported" columns are never selected; "available" columns are selected based on the selected_columns/deselected_columns parameters.
Usage
Call this function after discover() returns a catalog and before passing the catalog to sync(). Typically called once per stream that the user wants to configure.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/utils.py
- Lines: 50-106
Signature
def update_catalog_dict(
catalog: Dict,
stream_id: str,
key_properties: List[str],
replication_method: str,
bookmark_properties: List[str] = None,
deselected_columns: List[str] = None,
select_all: bool = False,
select_stream: bool = False,
selected_columns: List[str] = None,
unique_conflict_method: str = None,
unique_constraints: List[str] = None,
) -> Dict:
Import
from mage_integrations.sources.utils import update_catalog_dict
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| catalog | Dict | Yes | Raw catalog dict with 'streams' array from discover |
| stream_id | str | Yes | tap_stream_id of the stream to configure |
| key_properties | List[str] | Yes | Primary key columns |
| replication_method | str | Yes | FULL_TABLE or INCREMENTAL |
| bookmark_properties | List[str] | No | Bookmark/replication key columns |
| select_stream | bool | No | Whether to select the stream for sync |
| selected_columns | List[str] | No | Columns to include (None = all available) |
| deselected_columns | List[str] | No | Columns to explicitly exclude |
| select_all | bool | No | If True, select all columns regardless |
Outputs
| Name | Type | Description |
|---|---|---|
| return | Dict | Updated catalog dict with configured stream metadata |
Usage Examples
from mage_integrations.sources.utils import update_catalog_dict
# After discover, configure a stream for incremental sync
catalog_dict = source.discover().to_dict()
updated = update_catalog_dict(
catalog=catalog_dict,
stream_id="users",
key_properties=["id"],
replication_method="INCREMENTAL",
bookmark_properties=["updated_at"],
select_stream=True,
selected_columns=["id", "name", "email", "updated_at"],
)