Implementation:Risingwavelabs Risingwave MongoDbValidator
| Knowledge Sources | |
|---|---|
| Domains | Connectors, CDC, Validation, MongoDB |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Database validator for MongoDB CDC sources, verifying connectivity, user privileges, and readiness for change data capture.
Description
The MongoDbValidator class extends DatabaseValidator and implements AutoCloseable to provide MongoDB-specific validation for CDC source connectors. It performs the following validations:
Database Configuration (validateDbConfig): Connects to MongoDB using a short validation timeout (5 seconds) configured via MongoClientSettings. It sets heartbeat frequency, socket connect/read timeouts, and server selection timeout to ensure fast failure when the database is unreachable. The connection is verified by issuing a lightweight ping command against the admin database.
User Privilege Validation (validateUserPrivilege): Checks that the configured MongoDB user has appropriate roles to read the admin database (required for oplog access). It queries the usersInfo command on the authentication database and inspects both direct roles and inherited roles for any of: readWrite, read, readWriteAnyDatabase, or readAnyDatabase on the admin database. If the user lacks the appropriate roles, a CdcConnectorException is thrown.
Table Validation (validateTable): Performs no operation since MongoDB is schemaless, and table-level schema validation is not applicable.
The class always returns false for isCdcSourceJob(), meaning table validation is always eligible to run (though it is a no-op for MongoDB).
Usage
This validator is instantiated by SourceValidateHandler when the source type is MONGODB. It is used in a try-with-resources block to ensure the underlying MongoClient is properly closed after validation.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java
- Lines: 202
Signature
public class MongoDbValidator extends DatabaseValidator implements AutoCloseable {
public MongoDbValidator(Map<String, String> userProps) { ... }
@Override
public void validateDbConfig() { ... }
@Override
void validateUserPrivilege() { ... }
@Override
void validateTable() { ... }
@Override
boolean isCdcSourceJob() { ... }
@Override
public void close() { ... }
boolean checkReadRoleForAdminDb(List<Document> roles) { ... }
}
Import
import com.risingwave.connector.source.common.MongoDbValidator;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| userProps | Map<String, String> | Yes | Must contain mongodb.url with a valid MongoDB connection string |
Outputs
| Name | Type | Description |
|---|---|---|
| (void) | -- | validateAll() returns void on success |
| CdcConnectorException | RuntimeException | Thrown when connectivity fails or user lacks required roles |
Checked Roles
The following MongoDB roles on the admin database satisfy the privilege check:
- read
- readWrite
- readAnyDatabase
- readWriteAnyDatabase
Both direct and inherited roles are inspected.
Usage Examples
MongoDB CDC Source Creation
-- Validation is triggered automatically during source creation
CREATE SOURCE mongo_source WITH (
connector = 'mongodb-cdc',
mongodb.url = 'mongodb://user:password@host:27017/?replicaSet=rs0&authSource=admin',
collection.name = 'mydb.orders'
);
Programmatic Validation
Map<String, String> props = new HashMap<>();
props.put("mongodb.url", "mongodb://user:pass@host:27017/?replicaSet=rs0");
props.put("collection.name", "mydb.orders");
try (var validator = new MongoDbValidator(props)) {
validator.validateAll();
}