Implementation:Mage ai Mage ai Destination Process Schema
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Data_Quality, Schema_Management |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for registering Singer stream schemas and creating JSON Schema validators for record validation provided by the Mage integrations Destination base class.
Description
Destination.process_schema registers a stream's schema and metadata from a SCHEMA message. It stores the schema dict (with internal columns merged), creates a Draft4Validator for record validation, and stores key_properties, bookmark_properties, replication_method, partition_keys, unique_constraints, unique_conflict_method, and disable_column_type_check per stream. It also handles STREAM_OVERRIDE_SETTINGS to inject static columns and additional partition keys from config.
Usage
Called automatically by _process when a SCHEMA message is encountered. Must be called before any RECORD messages for a given stream.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/destinations/base.py
- Lines: 283-321
Signature
class Destination(ABC):
def process_schema(
self,
stream: str,
schema: Dict,
row: Dict,
tags: Dict = None,
) -> None:
"""Register stream schema and create validator.
Args:
stream: Stream name/identifier.
schema: JSON Schema dict with 'properties'.
row: Full SCHEMA message with metadata keys.
tags: Optional logging tags.
"""
Import
from mage_integrations.destinations.base import Destination
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| stream | str | Yes | Stream name/identifier |
| schema | Dict | Yes | JSON Schema with 'properties' dict |
| row | Dict | Yes | Full SCHEMA message with key_properties, bookmark_properties, replication_method, etc. |
Outputs
| Name | Type | Description |
|---|---|---|
| self.schemas[stream] | Dict | Registered schema with internal columns |
| self.validators[stream] | Draft4Validator | JSON Schema validator instance |
| self.key_properties[stream] | List[str] | Primary key columns |
Usage Examples
# Called internally by _process when SCHEMA message arrives:
# {"type": "SCHEMA", "stream": "users", "schema": {...}, "key_properties": ["id"]}
# After processing:
assert "users" in destination.schemas
assert "users" in destination.validators
assert destination.key_properties["users"] == ["id"]