Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:MaterializeInc Materialize Platform Check Subclasses

From Leeroopedia


Knowledge Sources
Domains Testing, Upgrade_Validation, Platform_Checks
Last Updated 2026-02-08 00:00 GMT

Overview

Collection of concrete Check subclass implementations that validate Materialize features survive version upgrades and platform changes.

Description

The checks/all_checks/ directory contains 93 individual Check subclass files. Each Check follows the initialize/manipulate/validate lifecycle defined by the base Check class. These checks test specific Materialize features (CDC sources, sinks, data types, SQL operations, access control, etc.) across upgrade boundaries.

Each check file defines one or more subclasses of Check from materialize.checks.checks and implements:

  • initialize(): Create test objects and insert initial data (pre-upgrade)
  • manipulate(): Modify state during or after upgrade
  • validate(): Verify data integrity and feature correctness (post-upgrade)

Usage

Import individual check classes when adding them to upgrade test scenarios. All checks are auto-discovered by the checks/all_checks/__init__.py module.

Code Reference

Source Location

Signature

# Common pattern for all Check subclasses
class FeatureCheck(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(dedent("""
            > CREATE TABLE ...
            > INSERT INTO ...
        """))

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(dedent("""
                > INSERT INTO ...
            """)),
            Testdrive(dedent("""
                > INSERT INTO ...
            """)),
        ]

    def validate(self) -> Testdrive:
        return Testdrive(dedent("""
            > SELECT * FROM ...
            expected_result
        """))

Import

from materialize.checks.all_checks.error import ParseError, ParseHexError
from materialize.checks.all_checks.sink import SinkUpsert
from materialize.checks.all_checks.mysql_cdc import MySqlCdc

I/O Contract

Inputs

Name Type Required Description
self._base_version MzVersion Yes Base version for conditional logic
self.current_version MzVersion Yes Current running version

Outputs

Name Type Description
initialize() Testdrive Testdrive actions for initial state setup
manipulate() list[Testdrive] Testdrive actions for state manipulation across upgrades
validate() Testdrive Testdrive actions to verify feature correctness

Check Categories

Category Files Description
CDC Sources mysql_cdc.py, pg_cdc.py, sql_server_cdc.py Validate Change Data Capture across upgrades
Sinks sink.py Test Kafka sink upsert behavior
Data Types boolean_type.py, float_types.py, numeric_types.py, temporal_types.py, text_bytea_types.py, jsonb_type.py, array_type.py, nested_types.py, uuid.py, ranges.py, materialize_type.py Type handling correctness
SQL Operations aggregation.py, join_types.py, join_implementations.py, having.py, like.py, regex.py, window_functions.py, with_mutually_recursive.py, top_k.py, threshold.py, insert_select.py, delete.py, update.py Query correctness
DDL Operations create_table.py, create_index.py, alter_table.py, alter_index.py, alter_connection.py, drop_table.py, drop_index.py Schema change durability
Rename Operations rename_cluster.py, rename_index.py, rename_replica.py, rename_schema.py, rename_source.py, rename_table.py, rename_view.py Rename operation correctness
Access Control privileges.py, default_privileges.py, roles.py, password_auth.py, owners.py Security and RBAC
Upsert upsert.py, upsert_enrich.py, upsert_many_columns.py, upsert_many_rows.py, upsert_many_updates.py, upsert_shrink_grow.py, upsert_unordered_key.py, upsert_wide.py Upsert behavior under various conditions
Infrastructure cluster.py, cluster_unification.py, managed_cluster.py, swap_cluster.py, swap_schema.py, databases.py, schemas.py Cluster and schema management
Sources source_tables.py, source_errors.py, json_source.py, kafka_formats.py, kafka_protocols.py, load_generator.py, multiple_partitions.py, webhook.py Source ingestion
Error Handling error.py Parse errors, hex decode errors, dataflow error retraction, Kafka decode errors for both upsert keys and values
Misc Features cast.py, comment.py, commit.py, constant_plan.py, continual_task.py, copy_to_s3.py, debezium.py, explain_catalog_item.py, identifiers.py, large_tables.py, materialized_views.py, null_value.py, optimizer_notices.py, peek_cancellation.py, peek_persist.py, retain_history.py, rollback.py, ssh.py, statement_logging.py, string.py, table_data.py, aws.py Various feature checks

Representative Examples

Error Check (error.py)

Demonstrates the simplest check pattern. ParseError creates a table with a materialized view that casts STRING to INTEGER, inserts invalid data, and validates that the expected error message appears after upgrade.

class ParseError(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(dedent("""
            > CREATE TABLE parse_error_table (f1 STRING);
            > CREATE MATERIALIZED VIEW parse_error_view AS SELECT f1::INTEGER FROM parse_error_table;
            > INSERT INTO parse_error_table VALUES ('123');
        """))

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(s) for s in [
                "> INSERT INTO parse_error_table VALUES ('abc'), ('234');",
                "> INSERT INTO parse_error_table VALUES ('345'), ('klm');",
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(dedent("""
            ! SELECT * FROM parse_error_view;
            contains: invalid input syntax for type integer
        """))

MySQL CDC Check (mysql_cdc.py)

Demonstrates a more complex check with a base class (MySqlCdcBase) that parameterizes behavior. The check creates MySQL tables, configures replication connections with secrets, creates Materialize sources, and validates data survives upgrades.

class MySqlCdcBase:
    base_version: MzVersion
    current_version: MzVersion
    wait: bool
    suffix: str

    def __init__(self, wait: bool, **kwargs: Any) -> None:
        self.wait = wait
        self.repeats = 1024 if wait else 16384
        self.expects = 97350 if wait else 1633350
        self.suffix = f"_{str(wait).lower()}"
        super().__init__(**kwargs)  # forward unused args to Check

Owners Check (owners.py)

Demonstrates checks that test RBAC (role-based access control). The Owners check creates databases, schemas, connections, types, tables, indexes, views, materialized views, and secrets under specific roles, then validates ownership transfer across upgrades.

Materialized Views Check (materialized_views.py)

Demonstrates checks on core Materialize functionality. Creates tables with large datasets (10,000+ rows), materialized views with aggregations, performs inserts and deletes across upgrade phases, and validates that materialized view results remain correct.

Decorators and Annotations

Some checks use special decorators:

  • @externally_idempotent(False): Marks checks that cannot be safely re-run (e.g., Kafka-based checks where message ingestion is not idempotent). Used by scenarios like BackupAndRestoreToPreviousState to filter incompatible checks.
  • @disabled: Temporarily disables a check from being collected.
from materialize.checks.checks import Check, externally_idempotent, disabled

@externally_idempotent(False)
class SinkUpsert(Check):
    """Basic Check on sinks from an upsert source"""
    ...

@disabled
class TemporarilyBrokenCheck(Check):
    ...

Usage Examples

Using a Check in a Scenario

from materialize.checks.all_checks.sink import SinkUpsert
from materialize.checks.scenarios import Scenario

# Checks are automatically collected and used in upgrade scenarios
# Each Check subclass is instantiated and its lifecycle methods called:
check = SinkUpsert(base_version, rng)
init_actions = check.initialize()    # Run before upgrade
manip_actions = check.manipulate()   # Run during/after upgrade
valid_actions = check.validate()     # Verify post-upgrade

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment