Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave SinkValidationHandler

From Leeroopedia
Revision as of 16:32, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Risingwavelabs_Risingwave_SinkValidationHandler.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Connectors, Data_Delivery, Validation, gRPC
Last Updated 2026-02-09 07:00 GMT

Overview

Handler for validating sink connector configurations before creating actual sink instances.

Description

The SinkValidationHandler class processes gRPC ValidateSinkRequest messages to pre-check sink configurations. It extracts the SinkParam from the request, builds a TableSchema from the protobuf definition, resolves the appropriate SinkFactory using SinkUtils.getConnectorName and SinkUtils.getSinkFactory, and invokes the factory's validate method with the table schema, properties map, and sink type. If validation succeeds, an empty success response is returned. If validation fails, the handler provides user-friendly error messages by parsing Jackson serialization errors to extract missing or unrecognized field names from the error output.

Usage

This handler is invoked during CREATE SINK statement processing. Before the system creates actual sink writer instances, it calls the Java connector service to validate that the sink configuration is correct and the external system is reachable with the provided properties.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java
  • Lines: 87

Signature

public class SinkValidationHandler {

    public SinkValidationHandler(
            StreamObserver<ConnectorServiceProto.ValidateSinkResponse> responseObserver) { ... }

    public void handle(ConnectorServiceProto.ValidateSinkRequest request) { ... }
}

Import

import com.risingwave.connector.SinkValidationHandler;

I/O Contract

Inputs

Name Type Required Description
request ValidateSinkRequest Yes Contains SinkParam with connector name, table schema, properties map, and sink type

Outputs

Name Type Description
Success response ValidateSinkResponse Empty response indicating validation passed
Error response ValidateSinkResponse Contains a ValidationError with an errorMessage describing the validation failure

Error Handling

  • IllegalArgumentException: Catches Jackson serialization errors and extracts concise messages. Patterns recognized:
    • Missing creator property 'fieldName' is converted to missing field `fieldName`
    • Unrecognized field "fieldName" is converted to unknown field `fieldName`
  • General Exception: All other exceptions have their message forwarded directly in the validation error response.
  • All errors are logged at ERROR level and the response stream is completed after sending the error.

Usage Examples

Validate a Kafka Sink

-- Validation is triggered automatically before sink creation
CREATE SINK kafka_sink FROM my_table
WITH (
    connector = 'kafka',
    topic = 'my_topic',
    properties.bootstrap.server = 'broker:9092',
    type = 'append-only'
) FORMAT PLAIN ENCODE JSON;

Validate a JDBC Sink

-- If the JDBC URL is invalid, validation will return an error
CREATE SINK pg_sink FROM my_table
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://host:5432/db',
    table.name = 'target_table',
    type = 'upsert',
    primary_key = 'id'
);

Related Pages

Implements Principle

Requires Environment

Related Implementations

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment