Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:MaterializeInc Materialize Composition Sql Testdrive

From Leeroopedia


Knowledge Sources misc/python/materialize/mzcompose/composition.py
Domains Integration Testing, SQL Execution, Testdrive Scripting
Last Updated 2026-02-08

Overview

Concrete API for executing SQL statements and testdrive scripts within integration test workflows, provided by the Composition class in materialize.mzcompose.composition.

Description

The Composition class provides three key methods for test authoring:

  • Composition.sql(): Executes a batch of SQL statements against the Materialized service. It uses sqlparse.split() to separate multiple statements from a single string, then executes each one via a psycopg cursor. Each statement is printed to stdout for traceability. The method supports specifying the target service, user, database, port, password, and whether to reuse connections.
  • Composition.testdrive(): Runs a string as a testdrive script by executing it inside a testdrive service container. Testdrive is Materialize's domain-specific testing language that supports SQL assertions with retry logic, Kafka operations, schema registry interactions, and more. The method auto-starts the testdrive service if it is not already running, tracks source file location for debugging, and supports targeting a specific Materialize service.
  • WorkflowArgumentParser: An argparse.ArgumentParser subclass that provides command-line argument parsing for workflow functions. It is initialized with the workflow name, description, and the arguments passed to the workflow. When parse_args() or parse_known_args() is called, it automatically uses the stored workflow arguments rather than sys.argv.

Usage

Use these APIs when:

  • Writing SQL setup or teardown logic in a workflow function.
  • Asserting query results using testdrive's retry-based comparison operators.
  • Parsing command-line arguments in a parameterized workflow.
  • Targeting a specific Materialized instance in a multi-node test topology.

Code Reference

Source Location

Component File Lines
Composition.sql() misc/python/materialize/mzcompose/composition.py L792-817
Composition.testdrive() misc/python/materialize/mzcompose/composition.py L1474-1524
WorkflowArgumentParser misc/python/materialize/mzcompose/composition.py L85-105

Signature

Composition.sql():

def sql(
    self,
    sql: str | Composed,
    service: str | None = None,
    user: str = "materialize",
    database: str = "materialize",
    port: int | None = None,
    password: str | None = None,
    print_statement: bool = True,
    reuse_connection: bool = True,
) -> None

Composition.testdrive():

def testdrive(
    self,
    input: str,
    service: str = "testdrive",
    args: list[str] = [],
    caller: Traceback | str | None = None,
    mz_service: str | None = None,
    quiet: bool = False,
    silent: bool = False,
    print_prefix: str | None = None,
) -> subprocess.CompletedProcess

WorkflowArgumentParser:

class WorkflowArgumentParser(argparse.ArgumentParser):
    def __init__(self, name: str, description: str | None, args: list[str]):
        self.args = args
        super().__init__(prog=f"mzcompose run {name}", description=description)

    def parse_known_args(
        self,
        args: Sequence[str] | None = None,
        namespace: argparse.Namespace | None = None,
    ) -> tuple[argparse.Namespace, list[str]]

Import

from materialize.mzcompose.composition import Composition, WorkflowArgumentParser

I/O Contract

Inputs

Parameter Type Description
sql (sql method) Composed One or more SQL statements to execute; multiple statements are split by sqlparse.split()
service None Target Materialized service name; defaults to the first Materialized service in the composition
user str PostgreSQL user for the connection; defaults to "materialize"
database str Target database name; defaults to "materialize"
port None Port override for the SQL connection
password None Optional password for the SQL connection
print_statement bool Whether to print each SQL statement before execution; defaults to True
input (testdrive method) str The testdrive script content to execute
mz_service None Specific Materialize service to target; overrides default connection URLs
quiet bool Suppress testdrive output; defaults to False
name (WorkflowArgumentParser) str The workflow name, used in the prog string
args (WorkflowArgumentParser) list[str] The command-line arguments passed to the workflow

Outputs

Method Return Type Description
sql() None Executes SQL statements for their side effects; raises on error
testdrive() subprocess.CompletedProcess The result of executing the testdrive script, including return code, stdout, and stderr
parse_known_args() tuple[argparse.Namespace, list[str]] Parsed arguments namespace and list of unrecognized arguments

Usage Examples

Executing SQL in a workflow:

def workflow_default(c: Composition):
    c.up("materialized")
    c.sql(
        """
        CREATE SOURCE my_source
          FROM KAFKA CONNECTION kafka_conn (TOPIC 'test-topic')
          FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn;

        CREATE MATERIALIZED VIEW my_view AS
          SELECT count(*) FROM my_source;
        """
    )

Running a testdrive script:

def workflow_default(c: Composition):
    c.up("materialized", "zookeeper", "kafka", "schema-registry")
    c.testdrive(
        """
> SELECT 1
1

$ kafka-create-topic topic=test-topic

$ kafka-ingest format=avro topic=test-topic schema=${schema}
{"f1": "val1"}

> SELECT * FROM my_source
val1
        """
    )

Using WorkflowArgumentParser for parameterized tests:

def workflow_default(c: Composition, parser: WorkflowArgumentParser):
    parser.add_argument("--num-records", type=int, default=1000)
    parser.add_argument("--enable-feature", action="store_true")
    args = parser.parse_args()

    c.up("materialized")
    c.sql(f"INSERT INTO test SELECT generate_series(1, {args.num_records})")

Targeting a specific Materialize service:

def workflow_multi_node(c: Composition):
    c.up("materialized_1", "materialized_2")
    c.sql("CREATE TABLE t (a INT)", service="materialized_1")
    c.testdrive(
        """
> SELECT count(*) FROM t
0
        """,
        mz_service="materialized_2",
    )

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment