Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:MaterializeInc Materialize Check Support Classes

From Leeroopedia


Knowledge Sources
Domains Testing, Upgrade_Validation, Platform_Checks
Last Updated 2026-02-08 00:00 GMT

Overview

Support classes and utilities that provide infrastructure for the platform checks framework, including backup/restore actions, Kubernetes cloud test actions, shared constants, feature flags, and backup/restore scenario definitions.

Description

The platform checks framework relies on several support modules beyond the individual Check subclasses:

  • backup_actions.py: Defines Backup and Restore Action subclasses for backup/restore test orchestration
  • cloudtest_actions.py: Defines Action subclasses for Kubernetes-based cloud testing (ReplaceEnvironmentdStatefulSet, SetupSshTunnels)
  • common.py: Shared Kafka Avro schema constants used across multiple check files
  • features.py: Features class for managing platform-specific feature flags (e.g., Azurite availability)
  • scenarios_backup_restore.py: Concrete Scenario subclasses that orchestrate backup and restore test sequences

Code Reference

Source Locations

  • Repository: MaterializeInc_Materialize
  • Files:
    • misc/python/materialize/checks/backup_actions.py
    • misc/python/materialize/checks/cloudtest_actions.py
    • misc/python/materialize/checks/common.py
    • misc/python/materialize/checks/features.py
    • misc/python/materialize/checks/scenarios_backup_restore.py

Backup Actions (backup_actions.py)

Provides two Action subclasses that delegate to the mzcompose composition for backup and restore operations.

Backup

class Backup(Action):
    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()
        c.backup()

    def join(self, e: Executor) -> None:
        # Action is blocking
        pass

Triggers a backup of the Materialize state via the mzcompose composition. The action is synchronous (blocking), so join() is a no-op.

Restore

class Restore(Action):
    def __init__(self, restart_mz=True):
        self.restart_mz = restart_mz

    def execute(self, e: Executor) -> None:
        c = e.mzcompose_composition()
        c.restore(restart_mz=self.restart_mz)

    def join(self, e: Executor) -> None:
        # Action is blocking
        pass

Restores Materialize state from a previous backup. The restart_mz parameter controls whether Materialize is automatically restarted after restore (defaults to True). When set to False, the caller is responsible for starting Materialize separately (used in scenarios like BackupAndRestoreToPreviousState).

I/O Contract

Class Input Output Side Effect
Backup Executor (provides mzcompose composition) None Creates a backup of Materialize state
Restore Executor, restart_mz: bool None Restores Materialize state; optionally restarts Mz

Cloud Test Actions (cloudtest_actions.py)

Provides Action subclasses for Kubernetes-based cloud testing environments.

ReplaceEnvironmentdStatefulSet

class ReplaceEnvironmentdStatefulSet(Action):
    """Change the image tag of the environmentd stateful set,
    re-create the definition and replace the existing one."""
    new_tag: str | None

    def __init__(self, new_tag: str | None = None) -> None:
        self.new_tag = new_tag

    def execute(self, e: Executor) -> None:
        new_version = (
            MzVersion.parse_mz(self.new_tag)
            if self.new_tag
            else MzVersion.parse_cargo()
        )
        mz = e.cloudtest_application()
        stateful_set = [
            resource for resource in mz.resources
            if type(resource) == EnvironmentdStatefulSet
        ]
        assert len(stateful_set) == 1
        stateful_set = stateful_set[0]
        stateful_set.tag = self.new_tag
        stateful_set.replace()
        e.current_mz_version = new_version

Replaces the Kubernetes EnvironmentdStatefulSet with a new image tag to simulate version upgrades in cloud test environments. If new_tag is None, it falls back to parsing the version from Cargo.toml. Updates e.current_mz_version after the replacement.

SetupSshTunnels

class SetupSshTunnels(Action):
    """Prepare the SSH tunnels."""

    def __init__(self, mz: MaterializeApplication) -> None:
        self.handle: Any | None = None
        self.mz = mz

    def execute(self, e: Executor) -> None:
        connection_count = 4
        self.handle = e.testdrive(...)
        for i in range(connection_count):
            public_key = self.mz.environmentd.sql_query(...)
            self.mz.kubectl("exec", "svc/ssh-bastion-host", ...)

    def join(self, e: Executor) -> None:
        e.join(self.handle)

Creates 4 SSH tunnel connections in Materialize and registers their public keys with the SSH bastion host pod in Kubernetes. Unlike Backup/Restore, this action is partially asynchronous: the testdrive handle is stored and joined later.

I/O Contract

Class Input Output Side Effect
ReplaceEnvironmentdStatefulSet new_tag: str or None None Replaces K8s StatefulSet image, updates executor version
SetupSshTunnels MaterializeApplication None Creates SSH tunnel connections and registers public keys

Common Constants (common.py)

Provides shared Kafka Avro schema definitions used by multiple check files, particularly sink and source checks.

KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD = """
       $ set keyschema={
           "type": "record",
           "name": "Key",
           "fields": [
               {"name": "key1", "type": "string"}
           ]
         }

       $ set schema={
           "type" : "record",
           "name" : "test",
           "fields" : [
               {"name":"f1", "type":"string"}
           ]
         }
    """

This constant defines a testdrive-compatible Avro schema pair (key + value) used for Kafka source and sink checks. Checks import it as:

from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD

Features (features.py)

Manages platform-specific feature flags that control which checks can run in a given test environment.

class Features:
    AZURITE = "azurite"

    def __init__(self, features):
        self.features = features

    def azurite_enabled(self) -> bool:
        return self.features and self.AZURITE in self.features

Currently supports a single feature flag:

  • AZURITE: Controls whether Azure Blob Storage emulation (Azurite) is available. Some checks (e.g., copy_to_s3.py) may require this for S3-compatible storage tests.

The Features class is instantiated with a collection of feature strings and provides boolean accessor methods. This design accommodates platform-specific constraints such as architecture differences (e.g., SQL Server Docker image only available on x86_64).

I/O Contract

Method Input Output Description
__init__() features: collection of strings Features instance Stores the feature set
azurite_enabled() None bool Returns True if Azurite is in the feature set

Backup/Restore Scenarios (scenarios_backup_restore.py)

Defines four Scenario subclasses that orchestrate backup and restore operations interleaved with check lifecycle phases.

BackupAndRestoreAfterManipulate

class BackupAndRestoreAfterManipulate(Scenario):
    """Backup and Restore Materialize after manipulate(phase=2) has run.
    Only validate() is run post-restore."""

    def actions(self) -> list[Action]:
        return [
            StartMz(self),
            Initialize(self),
            Manipulate(self, phase=1),
            Manipulate(self, phase=2),
            Backup(),
            KillMz(),
            Restore(),
            Validate(self),
        ]

Runs the full initialize/manipulate lifecycle, then backs up, kills Materialize, restores, and validates. Tests that a complete state survives backup/restore.

BackupAndRestoreBeforeManipulate

class BackupAndRestoreBeforeManipulate(Scenario):
    """Backup and Restore Materialize before manipulate(phase=2) has run."""

    def actions(self) -> list[Action]:
        return [
            StartMz(self),
            Initialize(self),
            Manipulate(self, phase=1),
            Backup(),
            KillMz(),
            Restore(),
            Manipulate(self, phase=2),
            Validate(self),
        ]

Backs up after phase 1 manipulation only, restores, then continues with phase 2 manipulation and validation. Tests that partial state can be restored and continued.

BackupAndRestoreToPreviousState

class BackupAndRestoreToPreviousState(Scenario):
    """Backup, run more workloads, and then Restore to a previous state."""

    def requires_external_idempotence(self) -> bool:
        return True

    def actions(self) -> list[Action]:
        return [
            StartMz(self),
            Initialize(self),
            Manipulate(self, phase=1),
            Backup(),
            Manipulate(self, phase=2),  # Those updates will be lost here ..
            KillMz(),
            Restore(restart_mz=False),
            StartMz(self),
            Manipulate(self, phase=2),  # ... and redone here
            Validate(self),
        ]

Tests restoring to a previous state by running phase 2 manipulation, then restoring to the pre-phase-2 backup and re-running phase 2. Requires external idempotence because phase 2 manipulations run twice (once before backup restore, once after). Checks marked @externally_idempotent(False) are excluded from this scenario.

BackupAndRestoreMulti

class BackupAndRestoreMulti(Scenario):
    """Repeated Backup and Restore operations."""

    def actions(self) -> list[Action]:
        return [
            StartMz(self),
            Initialize(self),
            Backup(), KillMz(), Restore(),
            Manipulate(self, phase=1),
            Backup(), KillMz(), Restore(),
            Manipulate(self, phase=2),
            Backup(), KillMz(), Restore(),
            Validate(self),
            Backup(), KillMz(), Restore(),
            Validate(self),
        ]

Performs multiple backup/restore cycles throughout the entire lifecycle, including after validation. Tests that repeated backup/restore operations do not corrupt state.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment