Implementation:MaterializeInc Materialize Platform Check Subclasses
| Knowledge Sources | |
|---|---|
| Domains | Testing, Upgrade_Validation, Platform_Checks |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Collection of concrete Check subclass implementations that validate Materialize features survive version upgrades and platform changes.
Description
The checks/all_checks/ directory contains 93 individual Check subclass files. Each Check follows the initialize/manipulate/validate lifecycle defined by the base Check class. These checks test specific Materialize features (CDC sources, sinks, data types, SQL operations, access control, etc.) across upgrade boundaries.
Each check file defines one or more subclasses of Check from materialize.checks.checks and implements:
- initialize(): Create test objects and insert initial data (pre-upgrade)
- manipulate(): Modify state during or after upgrade
- validate(): Verify data integrity and feature correctness (post-upgrade)
Usage
Import individual check classes when adding them to upgrade test scenarios. All checks are auto-discovered by the checks/all_checks/__init__.py module.
Code Reference
Source Location
- Repository: MaterializeInc_Materialize
- File: misc/python/materialize/checks/all_checks/ (directory, 93 files)
Signature
# Common pattern for all Check subclasses
class FeatureCheck(Check):
def initialize(self) -> Testdrive:
return Testdrive(dedent("""
> CREATE TABLE ...
> INSERT INTO ...
"""))
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent("""
> INSERT INTO ...
""")),
Testdrive(dedent("""
> INSERT INTO ...
""")),
]
def validate(self) -> Testdrive:
return Testdrive(dedent("""
> SELECT * FROM ...
expected_result
"""))
Import
from materialize.checks.all_checks.error import ParseError, ParseHexError
from materialize.checks.all_checks.sink import SinkUpsert
from materialize.checks.all_checks.mysql_cdc import MySqlCdc
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| self._base_version | MzVersion | Yes | Base version for conditional logic |
| self.current_version | MzVersion | Yes | Current running version |
Outputs
| Name | Type | Description |
|---|---|---|
| initialize() | Testdrive | Testdrive actions for initial state setup |
| manipulate() | list[Testdrive] | Testdrive actions for state manipulation across upgrades |
| validate() | Testdrive | Testdrive actions to verify feature correctness |
Check Categories
| Category | Files | Description |
|---|---|---|
| CDC Sources | mysql_cdc.py, pg_cdc.py, sql_server_cdc.py | Validate Change Data Capture across upgrades |
| Sinks | sink.py | Test Kafka sink upsert behavior |
| Data Types | boolean_type.py, float_types.py, numeric_types.py, temporal_types.py, text_bytea_types.py, jsonb_type.py, array_type.py, nested_types.py, uuid.py, ranges.py, materialize_type.py | Type handling correctness |
| SQL Operations | aggregation.py, join_types.py, join_implementations.py, having.py, like.py, regex.py, window_functions.py, with_mutually_recursive.py, top_k.py, threshold.py, insert_select.py, delete.py, update.py | Query correctness |
| DDL Operations | create_table.py, create_index.py, alter_table.py, alter_index.py, alter_connection.py, drop_table.py, drop_index.py | Schema change durability |
| Rename Operations | rename_cluster.py, rename_index.py, rename_replica.py, rename_schema.py, rename_source.py, rename_table.py, rename_view.py | Rename operation correctness |
| Access Control | privileges.py, default_privileges.py, roles.py, password_auth.py, owners.py | Security and RBAC |
| Upsert | upsert.py, upsert_enrich.py, upsert_many_columns.py, upsert_many_rows.py, upsert_many_updates.py, upsert_shrink_grow.py, upsert_unordered_key.py, upsert_wide.py | Upsert behavior under various conditions |
| Infrastructure | cluster.py, cluster_unification.py, managed_cluster.py, swap_cluster.py, swap_schema.py, databases.py, schemas.py | Cluster and schema management |
| Sources | source_tables.py, source_errors.py, json_source.py, kafka_formats.py, kafka_protocols.py, load_generator.py, multiple_partitions.py, webhook.py | Source ingestion |
| Error Handling | error.py | Parse errors, hex decode errors, dataflow error retraction, Kafka decode errors for both upsert keys and values |
| Misc Features | cast.py, comment.py, commit.py, constant_plan.py, continual_task.py, copy_to_s3.py, debezium.py, explain_catalog_item.py, identifiers.py, large_tables.py, materialized_views.py, null_value.py, optimizer_notices.py, peek_cancellation.py, peek_persist.py, retain_history.py, rollback.py, ssh.py, statement_logging.py, string.py, table_data.py, aws.py | Various feature checks |
Representative Examples
Error Check (error.py)
Demonstrates the simplest check pattern. ParseError creates a table with a materialized view that casts STRING to INTEGER, inserts invalid data, and validates that the expected error message appears after upgrade.
class ParseError(Check):
def initialize(self) -> Testdrive:
return Testdrive(dedent("""
> CREATE TABLE parse_error_table (f1 STRING);
> CREATE MATERIALIZED VIEW parse_error_view AS SELECT f1::INTEGER FROM parse_error_table;
> INSERT INTO parse_error_table VALUES ('123');
"""))
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(s) for s in [
"> INSERT INTO parse_error_table VALUES ('abc'), ('234');",
"> INSERT INTO parse_error_table VALUES ('345'), ('klm');",
]
]
def validate(self) -> Testdrive:
return Testdrive(dedent("""
! SELECT * FROM parse_error_view;
contains: invalid input syntax for type integer
"""))
MySQL CDC Check (mysql_cdc.py)
Demonstrates a more complex check with a base class (MySqlCdcBase) that parameterizes behavior. The check creates MySQL tables, configures replication connections with secrets, creates Materialize sources, and validates data survives upgrades.
class MySqlCdcBase:
base_version: MzVersion
current_version: MzVersion
wait: bool
suffix: str
def __init__(self, wait: bool, **kwargs: Any) -> None:
self.wait = wait
self.repeats = 1024 if wait else 16384
self.expects = 97350 if wait else 1633350
self.suffix = f"_{str(wait).lower()}"
super().__init__(**kwargs) # forward unused args to Check
Owners Check (owners.py)
Demonstrates checks that test RBAC (role-based access control). The Owners check creates databases, schemas, connections, types, tables, indexes, views, materialized views, and secrets under specific roles, then validates ownership transfer across upgrades.
Materialized Views Check (materialized_views.py)
Demonstrates checks on core Materialize functionality. Creates tables with large datasets (10,000+ rows), materialized views with aggregations, performs inserts and deletes across upgrade phases, and validates that materialized view results remain correct.
Decorators and Annotations
Some checks use special decorators:
- @externally_idempotent(False): Marks checks that cannot be safely re-run (e.g., Kafka-based checks where message ingestion is not idempotent). Used by scenarios like BackupAndRestoreToPreviousState to filter incompatible checks.
- @disabled: Temporarily disables a check from being collected.
from materialize.checks.checks import Check, externally_idempotent, disabled
@externally_idempotent(False)
class SinkUpsert(Check):
"""Basic Check on sinks from an upsert source"""
...
@disabled
class TemporarilyBrokenCheck(Check):
...
Usage Examples
Using a Check in a Scenario
from materialize.checks.all_checks.sink import SinkUpsert
from materialize.checks.scenarios import Scenario
# Checks are automatically collected and used in upgrade scenarios
# Each Check subclass is instantiated and its lifecycle methods called:
check = SinkUpsert(base_version, rng)
init_actions = check.initialize() # Run before upgrade
manip_actions = check.manipulate() # Run during/after upgrade
valid_actions = check.validate() # Verify post-upgrade