Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:MaterializeInc Materialize Validate Action

From Leeroopedia


Knowledge Sources misc/python/materialize/checks/actions.py, misc/python/materialize/checks/checks.py, misc/python/materialize/checks/executors.py
Domains Upgrade Testing, Data Integrity Verification, Python API
Last Updated 2026-02-08

Overview

Concrete validation action and executor infrastructure for post-upgrade data integrity checks, provided by the Validate class in materialize.checks.actions and the Executor hierarchy in materialize.checks.executors.

Description

The validation implementation is a collaboration between three components:

1. The Validate action class (in actions.py, lines 119–131):

Validate is a concrete Action subclass that orchestrates the validation phase across all checks in a scenario. Its constructor receives the scenario and stores its check_objects. When executed:

  • execute(e) iterates over all check objects, calling check.start_validate(e, self) on each. This triggers each check to create its validation action (Testdrive or PyAction) and begin executing it.
  • join(e) iterates over all check objects, calling check.join_validate(e) on each. This waits for each validation to complete and raises on failure.

2. The Check.validate() lifecycle method (in checks.py, lines 60–62):

Each Check subclass implements validate() to return a Testdrive or PyAction containing the actual assertions. The start_validate/join_validate wrapper methods on Check handle guard logic (checking _can_run() and enabled) and executor delegation.

3. The Executor hierarchy (in executors.py):

Executors bridge the gap between the abstract action model and concrete execution backends:

  • MzcomposeExecutor — Runs testdrive commands synchronously against a Docker Compose-managed Materialize instance. Calls composition.testdrive() for testdrive actions and opens a direct SQL connection for PyActions.
  • MzcomposeExecutorParallel — Runs testdrive commands in separate threads for parallel check execution. Uses threading.Thread and propagates exceptions on join().
  • CloudtestExecutor — Runs testdrive commands against a Kubernetes-managed Materialize application via the cloudtest framework.

All executors share the current_mz_version attribute, which is updated by StartMz and read by checks that need version-aware assertions.

Usage

Use the Validate action in scenario action lists to trigger the validation phase. Use MzcomposeExecutor for local Docker-based testing and CloudtestExecutor for cloud-based testing. The parallel executor is used for performance-sensitive CI runs.

Code Reference

Source Location

  • misc/python/materialize/checks/actions.py, lines 119–131 (Validate class).
  • misc/python/materialize/checks/checks.py, lines 60–62 (Check.validate()), lines 90–98 (start_validate/join_validate).
  • misc/python/materialize/checks/executors.py, lines 22–53 (Executor base), lines 56–84 (MzcomposeExecutor), lines 86–147 (MzcomposeExecutorParallel).

Signature

Validate action:

class Validate(Action):
    def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None: ...
    def execute(self, e: Executor) -> None: ...
    def join(self, e: Executor) -> None: ...

Check.validate():

class Check:
    def validate(self) -> Testdrive | PyAction: ...
    def start_validate(self, e: Executor, a: "Action") -> None: ...
    def join_validate(self, e: Executor) -> None: ...

MzcomposeExecutor:

class MzcomposeExecutor(Executor):
    def __init__(self, composition: Composition) -> None: ...
    def testdrive(
        self,
        input: str,
        caller: Traceback | None = None,
        mz_service: str | None = None,
    ) -> None: ...
    def run_pyaction(
        self,
        method: Callable,
        mz_service: str | None = None,
    ) -> None: ...

Import

from materialize.checks.actions import Validate
from materialize.checks.executors import MzcomposeExecutor, CloudtestExecutor

I/O Contract

Inputs

Parameter Type Description
scenario (on Validate) Scenario The parent scenario, providing the list of instantiated check objects.
mz_service (on Validate) None Optional service name override for multi-service upgrade scenarios (e.g., "mz_1" or "mz_2").
e (on execute/join) Executor The execution backend that runs testdrive or Python actions.

Outputs

Method Side Effect Description
Validate.execute() Starts all check validations Iterates over checks and calls start_validate() on each.
Validate.join() Waits for completions, raises on failure Iterates over checks and calls join_validate() on each. Raises if any assertion fails.
Check.validate() Returns action Creates and returns a Testdrive or PyAction containing the check's assertions.

Usage Examples

Adding validation to a scenario action list:

from materialize.checks.actions import Initialize, Manipulate, Validate
from materialize.checks.mzcompose_actions import StartMz, KillMz


class MyScenario(Scenario):
    def actions(self) -> list[Action]:
        return [
            StartMz(self, tag=self.base_version()),
            Initialize(self),
            Manipulate(self, phase=1),
            KillMz(capture_logs=True),
            StartMz(self, tag=None),
            Manipulate(self, phase=2),
            Validate(self),
            # Second validation after restart to catch idempotency issues
            KillMz(),
            StartMz(self, tag=None),
            Validate(self),
        ]

Writing a validate() method in a Check subclass:

from materialize.checks.checks import Check
from materialize.checks.actions import Testdrive


class CheckTablePersistence(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            """
            > CREATE TABLE persist_test (id INT, val TEXT);
            > INSERT INTO persist_test VALUES (1, 'before_upgrade');
            """
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(
                "> INSERT INTO persist_test VALUES (2, 'during_upgrade_phase1');"
            ),
            Testdrive(
                "> INSERT INTO persist_test VALUES (3, 'during_upgrade_phase2');"
            ),
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            """
            > SELECT count(*) FROM persist_test;
            3
            > SELECT val FROM persist_test ORDER BY id;
            before_upgrade
            during_upgrade_phase1
            during_upgrade_phase2
            """
        )

Using MzcomposeExecutor for validation:

from materialize.checks.executors import MzcomposeExecutor

executor = MzcomposeExecutor(composition)
validate_action = Validate(scenario)
validate_action.execute(executor)
validate_action.join(executor)  # Raises if any check fails

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment