Implementation:MaterializeInc Materialize Composition Sql Testdrive
Appearance
| Knowledge Sources | misc/python/materialize/mzcompose/composition.py
|
|---|---|
| Domains | Integration Testing, SQL Execution, Testdrive Scripting |
| Last Updated | 2026-02-08 |
Overview
Concrete API for executing SQL statements and testdrive scripts within integration test workflows, provided by the Composition class in materialize.mzcompose.composition.
Description
The Composition class provides three key methods for test authoring:
Composition.sql(): Executes a batch of SQL statements against the Materialized service. It usessqlparse.split()to separate multiple statements from a single string, then executes each one via apsycopgcursor. Each statement is printed to stdout for traceability. The method supports specifying the target service, user, database, port, password, and whether to reuse connections.
Composition.testdrive(): Runs a string as a testdrive script by executing it inside a testdrive service container. Testdrive is Materialize's domain-specific testing language that supports SQL assertions with retry logic, Kafka operations, schema registry interactions, and more. The method auto-starts the testdrive service if it is not already running, tracks source file location for debugging, and supports targeting a specific Materialize service.
WorkflowArgumentParser: Anargparse.ArgumentParsersubclass that provides command-line argument parsing for workflow functions. It is initialized with the workflow name, description, and the arguments passed to the workflow. Whenparse_args()orparse_known_args()is called, it automatically uses the stored workflow arguments rather thansys.argv.
Usage
Use these APIs when:
- Writing SQL setup or teardown logic in a workflow function.
- Asserting query results using testdrive's retry-based comparison operators.
- Parsing command-line arguments in a parameterized workflow.
- Targeting a specific Materialized instance in a multi-node test topology.
Code Reference
Source Location
| Component | File | Lines |
|---|---|---|
Composition.sql() |
misc/python/materialize/mzcompose/composition.py |
L792-817 |
Composition.testdrive() |
misc/python/materialize/mzcompose/composition.py |
L1474-1524 |
WorkflowArgumentParser |
misc/python/materialize/mzcompose/composition.py |
L85-105 |
Signature
Composition.sql():
def sql(
self,
sql: str | Composed,
service: str | None = None,
user: str = "materialize",
database: str = "materialize",
port: int | None = None,
password: str | None = None,
print_statement: bool = True,
reuse_connection: bool = True,
) -> None
Composition.testdrive():
def testdrive(
self,
input: str,
service: str = "testdrive",
args: list[str] = [],
caller: Traceback | str | None = None,
mz_service: str | None = None,
quiet: bool = False,
silent: bool = False,
print_prefix: str | None = None,
) -> subprocess.CompletedProcess
WorkflowArgumentParser:
class WorkflowArgumentParser(argparse.ArgumentParser):
def __init__(self, name: str, description: str | None, args: list[str]):
self.args = args
super().__init__(prog=f"mzcompose run {name}", description=description)
def parse_known_args(
self,
args: Sequence[str] | None = None,
namespace: argparse.Namespace | None = None,
) -> tuple[argparse.Namespace, list[str]]
Import
from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
sql (sql method) |
Composed | One or more SQL statements to execute; multiple statements are split by sqlparse.split()
|
service |
None | Target Materialized service name; defaults to the first Materialized service in the composition |
user |
str |
PostgreSQL user for the connection; defaults to "materialize"
|
database |
str |
Target database name; defaults to "materialize"
|
port |
None | Port override for the SQL connection |
password |
None | Optional password for the SQL connection |
print_statement |
bool |
Whether to print each SQL statement before execution; defaults to True
|
input (testdrive method) |
str |
The testdrive script content to execute |
mz_service |
None | Specific Materialize service to target; overrides default connection URLs |
quiet |
bool |
Suppress testdrive output; defaults to False
|
name (WorkflowArgumentParser) |
str |
The workflow name, used in the prog string
|
args (WorkflowArgumentParser) |
list[str] |
The command-line arguments passed to the workflow |
Outputs
| Method | Return Type | Description |
|---|---|---|
sql() |
None |
Executes SQL statements for their side effects; raises on error |
testdrive() |
subprocess.CompletedProcess |
The result of executing the testdrive script, including return code, stdout, and stderr |
parse_known_args() |
tuple[argparse.Namespace, list[str]] |
Parsed arguments namespace and list of unrecognized arguments |
Usage Examples
Executing SQL in a workflow:
def workflow_default(c: Composition):
c.up("materialized")
c.sql(
"""
CREATE SOURCE my_source
FROM KAFKA CONNECTION kafka_conn (TOPIC 'test-topic')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn;
CREATE MATERIALIZED VIEW my_view AS
SELECT count(*) FROM my_source;
"""
)
Running a testdrive script:
def workflow_default(c: Composition):
c.up("materialized", "zookeeper", "kafka", "schema-registry")
c.testdrive(
"""
> SELECT 1
1
$ kafka-create-topic topic=test-topic
$ kafka-ingest format=avro topic=test-topic schema=${schema}
{"f1": "val1"}
> SELECT * FROM my_source
val1
"""
)
Using WorkflowArgumentParser for parameterized tests:
def workflow_default(c: Composition, parser: WorkflowArgumentParser):
parser.add_argument("--num-records", type=int, default=1000)
parser.add_argument("--enable-feature", action="store_true")
args = parser.parse_args()
c.up("materialized")
c.sql(f"INSERT INTO test SELECT generate_series(1, {args.num_records})")
Targeting a specific Materialize service:
def workflow_multi_node(c: Composition):
c.up("materialized_1", "materialized_2")
c.sql("CREATE TABLE t (a INT)", service="materialized_1")
c.testdrive(
"""
> SELECT count(*) FROM t
0
""",
mz_service="materialized_2",
)
Related Pages
Implements Principle
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment