Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave Connector Integration Tests

From Leeroopedia


Knowledge Sources
Domains Testing, Connectors, gRPC, Sinks
Language Python
Lines 325
Last Updated 2026-02-09 07:00 GMT

Overview

Python integration test client that exercises the Java connector node's sink gRPC APIs, validating file sink, Elasticsearch sink, Iceberg sink, upsert Iceberg sink, DeltaLake sink, and StreamChunk data format functionality.

Description

This script connects to the connector service via gRPC on localhost:50051 and performs end-to-end integration tests for various sink types. It constructs mock table schemas using protobuf definitions, loads binary StreamChunk payloads from files, then sends SinkWriterStreamRequest sequences (StartSink, WriteBatch, Barrier) and validates that the responses follow the expected protocol (start acknowledgment followed by commit responses with matching epochs).

For coordinated sinks (such as Iceberg), the script additionally exercises the SinkCoordinatorStream RPC, sending CommitMetadata requests and verifying coordinator commit responses.

Usage

This script is run manually or in CI to validate connector service sink functionality against live or mocked external services.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/python-client/integration_tests.py
  • Lines: 1-325

Signature

def make_mock_schema():
    """Creates a simple 2-column schema (id: INT, name: VARCHAR)."""

def make_mock_schema_stream_chunk():
    """Creates a 7-column schema (v1-v7) covering types INT16 through VARCHAR."""

def load_input(input_file):
    """Loads JSON sink input from a file."""

def load_binary_input(input_file):
    """Loads binary StreamChunk data from a file."""

def load_json_payload(input_file):
    """Converts JSON input into SinkWriterStreamRequest.WriteBatch.JsonPayload messages."""

def load_stream_chunk_payload(input_file):
    """Wraps binary data into SinkWriterStreamRequest.WriteBatch.StreamChunkPayload."""

def test_sink(prop, payload_input, table_schema, is_coordinated=False):
    """Core test function: sends sink write requests via gRPC and validates responses."""

def test_file_sink(param):
    """Tests file sink with connector='file', output.path='/tmp/connector'."""

def test_elasticsearch_sink(param):
    """Tests Elasticsearch sink on http://127.0.0.1:9200."""

def test_iceberg_sink(param):
    """Tests append-only Iceberg sink with coordinated commit."""

def test_upsert_iceberg_sink(param):
    """Tests upsert Iceberg sink with coordinated commit."""

def test_deltalake_sink(param):
    """Tests DeltaLake sink with S3 storage."""

def test_stream_chunk_data_format(param):
    """Tests StreamChunk binary data format through file sink."""

Import

import os
import argparse
import json
import grpc
import connector_service_pb2_grpc
import connector_service_pb2
import plan_common_pb2
import data_pb2
import psycopg2

I/O Contract

Inputs

Name Type Required Description
--file_sink CLI flag No Run the file sink integration test
--stream_chunk_format_test CLI flag No Run the StreamChunk data format test
--iceberg_sink CLI flag No Run the Iceberg append-only sink test
--upsert_iceberg_sink CLI flag No Run the upsert Iceberg sink test
--deltalake_sink CLI flag No Run the DeltaLake sink test
--es_sink CLI flag No Run the Elasticsearch sink test
--input_binary_file CLI argument No Path to binary StreamChunk input data (default: ./data/sink_input)

Outputs

Name Type Description
stdout Console Success/failure messages for each test
exit code int 0 on success, 1 on test failure

Usage Examples

Run File Sink Test

# Run from the python-client directory
python integration_tests.py --file_sink --input_binary_file ./data/sink_input

Run Iceberg Sink Test

# Requires MinIO running on 127.0.0.1:9000
python integration_tests.py --iceberg_sink --input_binary_file ./data/sink_input

Run All Sink Tests

python integration_tests.py --file_sink --iceberg_sink --deltalake_sink --es_sink \
    --input_binary_file ./data/sink_input

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment