Implementation:Risingwavelabs Risingwave SqlServerValidator
| Knowledge Sources | |
|---|---|
| Domains | CDC, Connectors, Validation, SQL Server |
| Language | Java |
| Lines | 349 |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Database validator for SQL Server CDC sources that checks database-level CDC enablement, SQL Agent status, user privileges, table CDC configuration, primary key alignment, and schema data type compatibility.
Description
SqlServerValidator extends DatabaseValidator and implements AutoCloseable. It connects to a SQL Server instance via JDBC and performs a comprehensive series of validation checks before a CDC source can be created:
- Database configuration (validateDbConfig) -- Verifies that CDC is enabled on the target database by querying system metadata. For CDC source jobs, it additionally checks that the SQL Server Agent service is running, which is required for fn_cdc_get_max_lsn to function.
- User privileges (validateUserPrivilege) -- Confirms the configured user has SELECT permission on the CDC change table for the specified source table.
- Table validation (validateTable) -- Performs multiple checks:
- Verifies the table exists in the specified schema.
- Confirms CDC capture is enabled on the table.
- Validates primary key alignment between the RisingWave source schema and the upstream SQL Server table.
- Detects whether the database uses case-sensitive collation and adjusts column name matching accordingly.
- Validates that all columns defined in the RisingWave source schema exist in the upstream table and have compatible data types.
The isDataTypeCompatible method maps SQL Server data types (bit, tinyint, smallint, int, bigint, float, real, double, decimal, numeric, char, varchar, nvarchar, text, binary, varbinary, date, time, datetime, datetime2, datetimeoffset, etc.) to RisingWave DataType.TypeName values.
Usage
Instantiated and invoked by the connector service during CDC source creation to validate that the SQL Server database, table, and user are properly configured for change data capture.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java
- Lines: 1-349
Signature
public class SqlServerValidator extends DatabaseValidator implements AutoCloseable {
public SqlServerValidator(
Map<String, String> userProps,
TableSchema tableSchema,
boolean isCdcSourceJob) throws SQLException;
@Override
public void validateDbConfig();
@Override
public void validateUserPrivilege();
@Override
public void validateTable();
@Override
boolean isCdcSourceJob();
@Override
public void close() throws Exception;
private void validateTableSchema() throws SQLException;
private void validatePrivileges() throws SQLException;
private static void primaryKeyCheck(TableSchema sourceSchema, Set<String> pkFields);
private boolean isDataTypeCompatible(String ssDataType, Data.DataType.TypeName typeName);
}
Import
import com.risingwave.connector.source.common.SqlServerValidator;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| userProps | Map<String, String> | Yes | Connection properties including host, port, db_name, user, password, encrypt, schema_name, table_name |
| tableSchema | TableSchema | Yes | RisingWave source table schema with column names, types, and primary keys |
| isCdcSourceJob | boolean | Yes | If true, skips table-specific validations (shared CDC source job mode) |
Outputs
| Name | Type | Description |
|---|---|---|
| void | -- | Validation methods throw exceptions on failure; successful return indicates all checks passed |
Exceptions
| Exception | Condition |
|---|---|
| StatusRuntimeException (INVALID_ARGUMENT) | CDC not enabled on the database |
| StatusRuntimeException (INVALID_ARGUMENT) | SQL Server Agent not running (for CDC source jobs) |
| StatusRuntimeException (INVALID_ARGUMENT) | Table does not exist in the specified schema |
| StatusRuntimeException (INVALID_ARGUMENT) | CDC not enabled on the table |
| StatusRuntimeException (INVALID_ARGUMENT) | Primary key count or column name mismatch |
| StatusRuntimeException (INVALID_ARGUMENT) | Column not found in upstream database |
| StatusRuntimeException (INVALID_ARGUMENT) | Incompatible data type between source and upstream |
| StatusRuntimeException (INVALID_ARGUMENT) | Insufficient SELECT privilege on CDC table |
| StatusRuntimeException (INTERNAL) | SQL execution errors |
Usage Examples
Validate SQL Server CDC Source
Map<String, String> props = new HashMap<>();
props.put("hostname", "sqlserver-host");
props.put("port", "1433");
props.put("database.name", "mydb");
props.put("username", "sa");
props.put("password", "secret");
props.put("schema.name", "dbo");
props.put("table.name", "orders");
try (SqlServerValidator validator =
new SqlServerValidator(props, tableSchema, false)) {
validator.validateDbConfig();
validator.validateUserPrivilege();
validator.validateTable();
}