Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Risingwavelabs Risingwave SinkFactory CreateWriter

From Leeroopedia


Knowledge Sources
Domains Connectors, Software_Architecture, Java
Last Updated 2026-02-09 07:00 GMT

Overview

Concrete tool for creating and validating sink connector instances provided by the RisingWave Java connector API.

Description

SinkFactory is the Java interface that all sink connector implementations must implement. It defines three methods: createWriter (instantiate a writer for a specific table schema and properties), validate (check configuration before creating the sink), and createCoordinator (for distributed commit coordination). Implementations include JDBCSinkFactory, EsSinkFactory, CassandraFactory, and others.

The SinkWriter interface defines the data writing lifecycle: beginEpoch, write, barrier, and drop. The SinkCoordinator interface handles distributed commit and drop operations.

Usage

These interfaces are implemented by each sink connector type. They are discovered via Java ServiceLoader and instantiated by SinkUtils.getSinkFactory() when processing CREATE SINK statements.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkFactory.java (L23-32)
  • File: java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkWriter.java (L22-50)
  • File: java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkCoordinator.java (L22-26)

Signature

public interface SinkFactory {
    SinkWriter createWriter(TableSchema tableSchema, Map<String, String> tableProperties);
    void validate(
        TableSchema tableSchema,
        Map<String, String> tableProperties,
        SinkType sinkType
    );
    SinkCoordinator createCoordinator(Map<String, String> tableProperties);
}

public interface SinkWriter {
    void beginEpoch(long epoch);
    boolean write(Iterable<SinkRow> rows);
    Optional<ConnectorServiceProto.SinkMetadata> barrier(boolean isCheckpoint);
    void drop();
}

public interface SinkCoordinator {
    void commit(long epoch, List<ConnectorServiceProto.SinkMetadata> metadataList);
    void drop();
}

Import

import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.connector.api.sink.SinkCoordinator;

I/O Contract

Inputs

Name Type Required Description
tableSchema TableSchema Yes Column names and types from the source relation
tableProperties Map<String, String> Yes WITH clause properties (connector type, connection URL, etc.)
sinkType SinkType Yes (for validate) APPEND_ONLY, FORCE_APPEND_ONLY, or UPSERT

Outputs

Name Type Description
SinkWriter instance Writer configured for the target system
SinkCoordinator instance Coordinator for distributed commit
void (validate) exception on failure Throws if configuration is invalid

Usage Examples

Implementing a Custom Sink

public class MyCustomSinkFactory implements SinkFactory {
    @Override
    public SinkWriter createWriter(TableSchema schema, Map<String, String> props) {
        String url = props.get("my.custom.url");
        return new MyCustomSinkWriter(schema, url);
    }

    @Override
    public void validate(TableSchema schema, Map<String, String> props, SinkType type) {
        if (!props.containsKey("my.custom.url")) {
            throw new IllegalArgumentException("my.custom.url is required");
        }
    }

    @Override
    public SinkCoordinator createCoordinator(Map<String, String> props) {
        return null; // No coordination needed
    }
}

SQL to Create Sink

CREATE SINK es_sink FROM user_events
WITH (
    connector = 'elasticsearch',
    url = 'http://elasticsearch:9200',
    index = 'user_events',
    type = 'append-only'
);

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment