Implementation:Risingwavelabs Risingwave Connector Integration Tests
| Knowledge Sources | |
|---|---|
| Domains | Testing, Connectors, gRPC, Sinks |
| Language | Python |
| Lines | 325 |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Python integration test client that exercises the Java connector node's sink gRPC APIs, validating file sink, Elasticsearch sink, Iceberg sink, upsert Iceberg sink, DeltaLake sink, and StreamChunk data format functionality.
Description
This script connects to the connector service via gRPC on localhost:50051 and performs end-to-end integration tests for various sink types. It constructs mock table schemas using protobuf definitions, loads binary StreamChunk payloads from files, then sends SinkWriterStreamRequest sequences (StartSink, WriteBatch, Barrier) and validates that the responses follow the expected protocol (start acknowledgment followed by commit responses with matching epochs).
For coordinated sinks (such as Iceberg), the script additionally exercises the SinkCoordinatorStream RPC, sending CommitMetadata requests and verifying coordinator commit responses.
Usage
This script is run manually or in CI to validate connector service sink functionality against live or mocked external services.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/python-client/integration_tests.py
- Lines: 1-325
Signature
def make_mock_schema():
"""Creates a simple 2-column schema (id: INT, name: VARCHAR)."""
def make_mock_schema_stream_chunk():
"""Creates a 7-column schema (v1-v7) covering types INT16 through VARCHAR."""
def load_input(input_file):
"""Loads JSON sink input from a file."""
def load_binary_input(input_file):
"""Loads binary StreamChunk data from a file."""
def load_json_payload(input_file):
"""Converts JSON input into SinkWriterStreamRequest.WriteBatch.JsonPayload messages."""
def load_stream_chunk_payload(input_file):
"""Wraps binary data into SinkWriterStreamRequest.WriteBatch.StreamChunkPayload."""
def test_sink(prop, payload_input, table_schema, is_coordinated=False):
"""Core test function: sends sink write requests via gRPC and validates responses."""
def test_file_sink(param):
"""Tests file sink with connector='file', output.path='/tmp/connector'."""
def test_elasticsearch_sink(param):
"""Tests Elasticsearch sink on http://127.0.0.1:9200."""
def test_iceberg_sink(param):
"""Tests append-only Iceberg sink with coordinated commit."""
def test_upsert_iceberg_sink(param):
"""Tests upsert Iceberg sink with coordinated commit."""
def test_deltalake_sink(param):
"""Tests DeltaLake sink with S3 storage."""
def test_stream_chunk_data_format(param):
"""Tests StreamChunk binary data format through file sink."""
Import
import os
import argparse
import json
import grpc
import connector_service_pb2_grpc
import connector_service_pb2
import plan_common_pb2
import data_pb2
import psycopg2
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| --file_sink | CLI flag | No | Run the file sink integration test |
| --stream_chunk_format_test | CLI flag | No | Run the StreamChunk data format test |
| --iceberg_sink | CLI flag | No | Run the Iceberg append-only sink test |
| --upsert_iceberg_sink | CLI flag | No | Run the upsert Iceberg sink test |
| --deltalake_sink | CLI flag | No | Run the DeltaLake sink test |
| --es_sink | CLI flag | No | Run the Elasticsearch sink test |
| --input_binary_file | CLI argument | No | Path to binary StreamChunk input data (default: ./data/sink_input) |
Outputs
| Name | Type | Description |
|---|---|---|
| stdout | Console | Success/failure messages for each test |
| exit code | int | 0 on success, 1 on test failure |
Usage Examples
Run File Sink Test
# Run from the python-client directory
python integration_tests.py --file_sink --input_binary_file ./data/sink_input
Run Iceberg Sink Test
# Requires MinIO running on 127.0.0.1:9000
python integration_tests.py --iceberg_sink --input_binary_file ./data/sink_input
Run All Sink Tests
python integration_tests.py --file_sink --iceberg_sink --deltalake_sink --es_sink \
--input_binary_file ./data/sink_input