Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Risingwavelabs Risingwave SourceValidateHandler

From Leeroopedia


Knowledge Sources
Domains Connectors, CDC, Validation, gRPC
Last Updated 2026-02-09 07:00 GMT

Overview

Central validation handler for CDC source connector configurations, dispatching to database-specific validators based on source type.

Description

The SourceValidateHandler class processes gRPC ValidateSourceRequest messages to validate CDC source configurations before creating source connectors. It dispatches to database-specific validators depending on the source type:

  • POSTGRES: Validates required properties (host, port, database, user, password, schema name, slot name, publication name, publication auto-create), queue memory ratio, and delegates to PostgresValidator for database-level checks.
  • CITUS: Validates required properties plus distributed table configuration, iterating over worker servers to validate each node individually via CitusValidator.
  • MYSQL: Validates required properties plus server ID, queue memory ratio, heartbeat interval, and delegates to MySqlValidator.
  • MONGODB: Validates MongoDB URL and collection name, queue memory ratio, and delegates to MongoDbValidator.
  • SQL_SERVER: Validates required properties plus schema name, queue memory ratio, heartbeat interval, and delegates to SqlServerValidator.

The handler also provides helper methods ensurePropNotBlank and ensurePropsExists for property validation, and validateQueueMemoryRatio and validateHeartbeatInterval for validating Debezium-specific configuration parameters.

Usage

This handler is invoked during CREATE SOURCE or CREATE TABLE ... WITH statement processing for CDC sources. The Rust frontend calls the Java connector service to validate that the upstream database is reachable, the user has the required privileges, and the table schema is compatible.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java
  • Lines: 239

Signature

public class SourceValidateHandler {

    public SourceValidateHandler(
            StreamObserver<ConnectorServiceProto.ValidateSourceResponse> responseObserver) { ... }

    public void handle(ConnectorServiceProto.ValidateSourceRequest request) { ... }

    public static ConnectorServiceProto.ValidateSourceResponse validateResponse(String message) { ... }

    static void ensureRequiredProps(Map<String, String> props, boolean isCdcSourceJob) { ... }

    public static void validateSource(ConnectorServiceProto.ValidateSourceRequest request)
            throws Exception { ... }
}

Import

import com.risingwave.connector.source.SourceValidateHandler;

I/O Contract

Inputs

Name Type Required Description
request ValidateSourceRequest Yes Contains source type, properties map, table schema, source ID, isSourceJob flag, and isBackfillTable flag

Outputs

Name Type Description
Success response ValidateSourceResponse Empty response indicating all validations passed
Error response ValidateSourceResponse Contains a ValidationError with errorMessage describing the failure

Required Properties by Source Type

Source Type Required Properties
POSTGRES host, port, database.name, user, password, schema.name, slot.name, publication.name, publication.autocreate.mode
CITUS host, port, database.name, user, password, table.name, schema.name, database.servers
MYSQL host, port, database.name, user, password, server.id
MONGODB mongodb.url, collection.name
SQL_SERVER host, port, database.name, user, password, schema.name

Usage Examples

Validate a PostgreSQL CDC Source

-- Validation is triggered automatically during source creation
CREATE SOURCE pg_source WITH (
    connector = 'postgres-cdc',
    hostname = 'localhost',
    port = '5432',
    username = 'repl_user',
    password = 'secret',
    database.name = 'mydb',
    schema.name = 'public',
    table.name = 'orders',
    slot.name = 'rw_slot',
    publication.name = 'rw_pub',
    publication.create.enable = 'true'
);

Validate a MySQL CDC Source

CREATE SOURCE mysql_source WITH (
    connector = 'mysql-cdc',
    hostname = 'localhost',
    port = '3306',
    username = 'repl_user',
    password = 'secret',
    database.name = 'mydb',
    table.name = 'orders',
    server.id = '1001'
);

Related Pages

Implements Principle

Requires Environment

Related Implementations

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment