Implementation:Risingwavelabs Risingwave SourceValidateHandler
| Knowledge Sources | |
|---|---|
| Domains | Connectors, CDC, Validation, gRPC |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Central validation handler for CDC source connector configurations, dispatching to database-specific validators based on source type.
Description
The SourceValidateHandler class processes gRPC ValidateSourceRequest messages to validate CDC source configurations before creating source connectors. It dispatches to database-specific validators depending on the source type:
- POSTGRES: Validates required properties (host, port, database, user, password, schema name, slot name, publication name, publication auto-create), queue memory ratio, and delegates to PostgresValidator for database-level checks.
- CITUS: Validates required properties plus distributed table configuration, iterating over worker servers to validate each node individually via CitusValidator.
- MYSQL: Validates required properties plus server ID, queue memory ratio, heartbeat interval, and delegates to MySqlValidator.
- MONGODB: Validates MongoDB URL and collection name, queue memory ratio, and delegates to MongoDbValidator.
- SQL_SERVER: Validates required properties plus schema name, queue memory ratio, heartbeat interval, and delegates to SqlServerValidator.
The handler also provides helper methods ensurePropNotBlank and ensurePropsExists for property validation, and validateQueueMemoryRatio and validateHeartbeatInterval for validating Debezium-specific configuration parameters.
Usage
This handler is invoked during CREATE SOURCE or CREATE TABLE ... WITH statement processing for CDC sources. The Rust frontend calls the Java connector service to validate that the upstream database is reachable, the user has the required privileges, and the table schema is compatible.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java
- Lines: 239
Signature
public class SourceValidateHandler {
public SourceValidateHandler(
StreamObserver<ConnectorServiceProto.ValidateSourceResponse> responseObserver) { ... }
public void handle(ConnectorServiceProto.ValidateSourceRequest request) { ... }
public static ConnectorServiceProto.ValidateSourceResponse validateResponse(String message) { ... }
static void ensureRequiredProps(Map<String, String> props, boolean isCdcSourceJob) { ... }
public static void validateSource(ConnectorServiceProto.ValidateSourceRequest request)
throws Exception { ... }
}
Import
import com.risingwave.connector.source.SourceValidateHandler;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| request | ValidateSourceRequest | Yes | Contains source type, properties map, table schema, source ID, isSourceJob flag, and isBackfillTable flag |
Outputs
| Name | Type | Description |
|---|---|---|
| Success response | ValidateSourceResponse | Empty response indicating all validations passed |
| Error response | ValidateSourceResponse | Contains a ValidationError with errorMessage describing the failure |
Required Properties by Source Type
| Source Type | Required Properties |
|---|---|
| POSTGRES | host, port, database.name, user, password, schema.name, slot.name, publication.name, publication.autocreate.mode |
| CITUS | host, port, database.name, user, password, table.name, schema.name, database.servers |
| MYSQL | host, port, database.name, user, password, server.id |
| MONGODB | mongodb.url, collection.name |
| SQL_SERVER | host, port, database.name, user, password, schema.name |
Usage Examples
Validate a PostgreSQL CDC Source
-- Validation is triggered automatically during source creation
CREATE SOURCE pg_source WITH (
connector = 'postgres-cdc',
hostname = 'localhost',
port = '5432',
username = 'repl_user',
password = 'secret',
database.name = 'mydb',
schema.name = 'public',
table.name = 'orders',
slot.name = 'rw_slot',
publication.name = 'rw_pub',
publication.create.enable = 'true'
);
Validate a MySQL CDC Source
CREATE SOURCE mysql_source WITH (
connector = 'mysql-cdc',
hostname = 'localhost',
port = '3306',
username = 'repl_user',
password = 'secret',
database.name = 'mydb',
table.name = 'orders',
server.id = '1001'
);