Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Mage ai Mage ai Destination Process Schema

From Leeroopedia


Knowledge Sources
Domains Data_Integration, Data_Quality, Schema_Management
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for registering Singer stream schemas and creating JSON Schema validators for record validation provided by the Mage integrations Destination base class.

Description

Destination.process_schema registers a stream's schema and metadata from a SCHEMA message. It stores the schema dict (with internal columns merged), creates a Draft4Validator for record validation, and stores key_properties, bookmark_properties, replication_method, partition_keys, unique_constraints, unique_conflict_method, and disable_column_type_check per stream. It also handles STREAM_OVERRIDE_SETTINGS to inject static columns and additional partition keys from config.

Usage

Called automatically by _process when a SCHEMA message is encountered. Must be called before any RECORD messages for a given stream.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/destinations/base.py
  • Lines: 283-321

Signature

class Destination(ABC):
    def process_schema(
        self,
        stream: str,
        schema: Dict,
        row: Dict,
        tags: Dict = None,
    ) -> None:
        """Register stream schema and create validator.

        Args:
            stream: Stream name/identifier.
            schema: JSON Schema dict with 'properties'.
            row: Full SCHEMA message with metadata keys.
            tags: Optional logging tags.
        """

Import

from mage_integrations.destinations.base import Destination

I/O Contract

Inputs

Name Type Required Description
stream str Yes Stream name/identifier
schema Dict Yes JSON Schema with 'properties' dict
row Dict Yes Full SCHEMA message with key_properties, bookmark_properties, replication_method, etc.

Outputs

Name Type Description
self.schemas[stream] Dict Registered schema with internal columns
self.validators[stream] Draft4Validator JSON Schema validator instance
self.key_properties[stream] List[str] Primary key columns

Usage Examples

# Called internally by _process when SCHEMA message arrives:
# {"type": "SCHEMA", "stream": "users", "schema": {...}, "key_properties": ["id"]}

# After processing:
assert "users" in destination.schemas
assert "users" in destination.validators
assert destination.key_properties["users"] == ["id"]

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment