Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Dolphinscheduler AbstractDataSourceProcessor Extension

From Leeroopedia


Knowledge Sources
Domains Plugin_Architecture, Data_Integration
Last Updated 2026-02-10 00:00 GMT

Overview

Concrete tool for implementing a database-specific datasource processor by extending AbstractDataSourceProcessor with @AutoService SPI registration.

Description

The DataSourceProcessor interface defines the full contract for database connection handling. AbstractDataSourceProcessor provides base validation (host, database name, security parameter blocking) while each database plugin extends it with type-specific logic. The @AutoService(DataSourceProcessor.class) annotation auto-generates the META-INF/services file for SPI discovery.

Usage

Extend AbstractDataSourceProcessor when creating a new datasource plugin. Override all abstract methods to handle your specific database's connection string format, JDBC driver, and parameter serialization.

Code Reference

Source Location

  • Repository: dolphinscheduler
  • File: dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java (L28-119)
  • File: dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java (L44-141)
  • File: dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-mysql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/mysql/param/MySQLDataSourceProcessor.java (L42-201) (reference implementation)

Signature

public interface DataSourceProcessor {
    // Deserialization
    BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson);
    BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson);

    // Connection parameter management
    ConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam);
    ConnectionParam createConnectionParams(String connectionJson);

    // JDBC metadata
    String getDatasourceDriver();
    String getValidationQuery();
    String getJdbcUrl(ConnectionParam connectionParam);
    Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException;

    // Validation and lifecycle
    void checkDatasourceParam(BaseDataSourceParamDTO datasourceParam);
    boolean checkDataSourceConnectivity(ConnectionParam connectionParam);
    DbType getDbType();
    DataSourceProcessor create();

    // SQL parsing
    List<String> splitAndRemoveComment(String sql);
}

public abstract class AbstractDataSourceProcessor implements DataSourceProcessor {
    // Validation methods
    protected void checkHost(String host);                    // L71
    protected void checkDatabasePatter(String database);      // L83
    protected void checkOther(Map<String, String> other);     // L94
    public void checkDatasourceParam(BaseDataSourceParamDTO datasourceParam); // L57

    // Connectivity test
    public boolean checkDataSourceConnectivity(ConnectionParam connectionParam); // L126

    // SQL parsing via Druid
    public List<String> splitAndRemoveComment(String sql);    // L136
}

Import

import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import com.google.auto.service.AutoService;

@AutoService(DataSourceProcessor.class)
public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
    // ...
}

I/O Contract

Inputs

Name Type Required Description
paramJson String Yes JSON string of connection parameters from UI/API
datasourceParam BaseDataSourceParamDTO Yes Deserialized parameter DTO with host, port, database, credentials
connectionParam ConnectionParam Yes Resolved connection parameters with JDBC URL and encoded password

Outputs

Name Type Description
BaseDataSourceParamDTO DTO Deserialized parameter object
ConnectionParam Object Resolved connection parameters with JDBC URL
Connection java.sql.Connection Raw JDBC connection for database operations
boolean primitive Connectivity test result
List<String> List Parsed individual SQL statements

Usage Examples

MySQL Processor Implementation

@AutoService(DataSourceProcessor.class)
public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {

    @Override
    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
        return JSONUtils.parseObject(paramJson, MySQLDataSourceParamDTO.class);
    }

    @Override
    public ConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
        MySQLDataSourceParamDTO mysqlDatasourceParam = (MySQLDataSourceParamDTO) datasourceParam;
        MySQLConnectionParam connectionParam = new MySQLConnectionParam();
        connectionParam.setUser(mysqlDatasourceParam.getUserName());
        connectionParam.setPassword(PasswordUtils.encodePassword(mysqlDatasourceParam.getPassword()));
        connectionParam.setJdbcUrl(getJdbcUrl(connectionParam));
        connectionParam.setDriverClassName(getDatasourceDriver());
        connectionParam.setValidationQuery(getValidationQuery());
        return connectionParam;
    }

    @Override
    public String getDatasourceDriver() {
        return "com.mysql.cj.jdbc.Driver";
    }

    @Override
    public String getValidationQuery() {
        return "select 1";
    }

    @Override
    public DbType getDbType() {
        return DbType.MYSQL;
    }

    @Override
    public DataSourceProcessor create() {
        return new MySQLDataSourceProcessor();
    }
}

Related Pages

Implements Principle

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment