Implementation:Apache Dolphinscheduler AbstractDataSourceProcessor Extension
| Knowledge Sources | |
|---|---|
| Domains | Plugin_Architecture, Data_Integration |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for implementing a database-specific datasource processor by extending AbstractDataSourceProcessor with @AutoService SPI registration.
Description
The DataSourceProcessor interface defines the full contract for database connection handling. AbstractDataSourceProcessor provides base validation (host, database name, security parameter blocking) while each database plugin extends it with type-specific logic. The @AutoService(DataSourceProcessor.class) annotation auto-generates the META-INF/services file for SPI discovery.
Usage
Extend AbstractDataSourceProcessor when creating a new datasource plugin. Override all abstract methods to handle your specific database's connection string format, JDBC driver, and parameter serialization.
Code Reference
Source Location
- Repository: dolphinscheduler
- File: dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java (L28-119)
- File: dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java (L44-141)
- File: dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-mysql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/mysql/param/MySQLDataSourceProcessor.java (L42-201) (reference implementation)
Signature
public interface DataSourceProcessor {
// Deserialization
BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson);
BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson);
// Connection parameter management
ConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam);
ConnectionParam createConnectionParams(String connectionJson);
// JDBC metadata
String getDatasourceDriver();
String getValidationQuery();
String getJdbcUrl(ConnectionParam connectionParam);
Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException;
// Validation and lifecycle
void checkDatasourceParam(BaseDataSourceParamDTO datasourceParam);
boolean checkDataSourceConnectivity(ConnectionParam connectionParam);
DbType getDbType();
DataSourceProcessor create();
// SQL parsing
List<String> splitAndRemoveComment(String sql);
}
public abstract class AbstractDataSourceProcessor implements DataSourceProcessor {
// Validation methods
protected void checkHost(String host); // L71
protected void checkDatabasePatter(String database); // L83
protected void checkOther(Map<String, String> other); // L94
public void checkDatasourceParam(BaseDataSourceParamDTO datasourceParam); // L57
// Connectivity test
public boolean checkDataSourceConnectivity(ConnectionParam connectionParam); // L126
// SQL parsing via Druid
public List<String> splitAndRemoveComment(String sql); // L136
}
Import
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import com.google.auto.service.AutoService;
@AutoService(DataSourceProcessor.class)
public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
// ...
}
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| paramJson | String | Yes | JSON string of connection parameters from UI/API |
| datasourceParam | BaseDataSourceParamDTO | Yes | Deserialized parameter DTO with host, port, database, credentials |
| connectionParam | ConnectionParam | Yes | Resolved connection parameters with JDBC URL and encoded password |
Outputs
| Name | Type | Description |
|---|---|---|
| BaseDataSourceParamDTO | DTO | Deserialized parameter object |
| ConnectionParam | Object | Resolved connection parameters with JDBC URL |
| Connection | java.sql.Connection | Raw JDBC connection for database operations |
| boolean | primitive | Connectivity test result |
| List<String> | List | Parsed individual SQL statements |
Usage Examples
MySQL Processor Implementation
@AutoService(DataSourceProcessor.class)
public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, MySQLDataSourceParamDTO.class);
}
@Override
public ConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
MySQLDataSourceParamDTO mysqlDatasourceParam = (MySQLDataSourceParamDTO) datasourceParam;
MySQLConnectionParam connectionParam = new MySQLConnectionParam();
connectionParam.setUser(mysqlDatasourceParam.getUserName());
connectionParam.setPassword(PasswordUtils.encodePassword(mysqlDatasourceParam.getPassword()));
connectionParam.setJdbcUrl(getJdbcUrl(connectionParam));
connectionParam.setDriverClassName(getDatasourceDriver());
connectionParam.setValidationQuery(getValidationQuery());
return connectionParam;
}
@Override
public String getDatasourceDriver() {
return "com.mysql.cj.jdbc.Driver";
}
@Override
public String getValidationQuery() {
return "select 1";
}
@Override
public DbType getDbType() {
return DbType.MYSQL;
}
@Override
public DataSourceProcessor create() {
return new MySQLDataSourceProcessor();
}
}