Implementation:Risingwavelabs Risingwave SinkValidationHandler
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Data_Delivery, Validation, gRPC |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Handler for validating sink connector configurations before creating actual sink instances.
Description
The SinkValidationHandler class processes gRPC ValidateSinkRequest messages to pre-check sink configurations. It extracts the SinkParam from the request, builds a TableSchema from the protobuf definition, resolves the appropriate SinkFactory using SinkUtils.getConnectorName and SinkUtils.getSinkFactory, and invokes the factory's validate method with the table schema, properties map, and sink type. If validation succeeds, an empty success response is returned. If validation fails, the handler provides user-friendly error messages by parsing Jackson serialization errors to extract missing or unrecognized field names from the error output.
Usage
This handler is invoked during CREATE SINK statement processing. Before the system creates actual sink writer instances, it calls the Java connector service to validate that the sink configuration is correct and the external system is reachable with the provided properties.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java
- Lines: 87
Signature
public class SinkValidationHandler {
public SinkValidationHandler(
StreamObserver<ConnectorServiceProto.ValidateSinkResponse> responseObserver) { ... }
public void handle(ConnectorServiceProto.ValidateSinkRequest request) { ... }
}
Import
import com.risingwave.connector.SinkValidationHandler;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| request | ValidateSinkRequest | Yes | Contains SinkParam with connector name, table schema, properties map, and sink type |
Outputs
| Name | Type | Description |
|---|---|---|
| Success response | ValidateSinkResponse | Empty response indicating validation passed |
| Error response | ValidateSinkResponse | Contains a ValidationError with an errorMessage describing the validation failure |
Error Handling
- IllegalArgumentException: Catches Jackson serialization errors and extracts concise messages. Patterns recognized:
Missing creator property 'fieldName'is converted tomissing field `fieldName`Unrecognized field "fieldName"is converted tounknown field `fieldName`
- General Exception: All other exceptions have their message forwarded directly in the validation error response.
- All errors are logged at ERROR level and the response stream is completed after sending the error.
Usage Examples
Validate a Kafka Sink
-- Validation is triggered automatically before sink creation
CREATE SINK kafka_sink FROM my_table
WITH (
connector = 'kafka',
topic = 'my_topic',
properties.bootstrap.server = 'broker:9092',
type = 'append-only'
) FORMAT PLAIN ENCODE JSON;
Validate a JDBC Sink
-- If the JDBC URL is invalid, validation will return an error
CREATE SINK pg_sink FROM my_table
WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://host:5432/db',
table.name = 'target_table',
type = 'upsert',
primary_key = 'id'
);