Implementation:Apache Dolphinscheduler Workflow Task Entity Pattern
| Knowledge Sources | |
|---|---|
| Domains | Workflow_Orchestration, Data_Modeling |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for defining workflows and tasks using the WorkflowDefinition, TaskDefinition, and WorkflowTaskRelation DAO entities with MyBatis-Plus ORM.
Description
The workflow DAG is persisted through three MyBatis-Plus entities in the dolphinscheduler-dao module. WorkflowDefinition maps to t_ds_workflow_definition and holds workflow metadata. TaskDefinition maps to t_ds_task_definition and holds task type and parameters as a JSON string. WorkflowTaskRelation maps to t_ds_workflow_task_relation and defines edges between tasks using their unique codes.
Usage
These entities are created via the DolphinScheduler REST API or UI. Application code interacts with them through their respective DAO interfaces (WorkflowDefinitionDao, TaskDefinitionDao, etc.).
Code Reference
Source Location
- Repository: dolphinscheduler
- File: dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ (entity package)
Signature
@Data
@Builder
@TableName("t_ds_workflow_definition")
public class WorkflowDefinition {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
private Long code;
private String name;
private Integer version;
private String description;
private Long projectCode;
private String tenantCode;
private String globalParams; // JSON array of global parameters
private Flag flag; // YES/NO
private ReleaseState releaseState;
// timestamps, user info, etc.
}
@Data
@Builder
@TableName("t_ds_task_definition")
public class TaskDefinition {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
private Long code;
private String name;
private Integer version;
private String taskType; // SHELL, SQL, PYTHON, SUB_PROCESS, etc.
private String taskParams; // JSON type-specific parameters
private TaskExecuteType taskExecuteType;
private String workerGroup;
private int failRetryTimes;
private int failRetryInterval;
private Priority taskPriority;
private TimeoutFlag timeoutFlag;
private int timeout;
// timestamps, user info, etc.
}
@Data
@TableName("t_ds_workflow_task_relation")
public class WorkflowTaskRelation {
private Long workflowDefinitionCode;
private Integer workflowDefinitionVersion;
private Long preTaskCode; // upstream task (0 = start node)
private Integer preTaskVersion;
private Long postTaskCode; // downstream task
private Integer postTaskVersion;
}
Import
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| Workflow DAG | JSON from API/UI | Yes | Task nodes, dependency edges, global parameters |
| Task parameters | JSON string | Yes | Type-specific task configuration |
Outputs
| Name | Type | Description |
|---|---|---|
| WorkflowDefinition | DB record | Persisted workflow metadata with unique code |
| TaskDefinition | DB records | Persisted task definitions with codes and parameters |
| WorkflowTaskRelation | DB records | DAG edges linking tasks by code |
Usage Examples
Defining a Simple Two-Task Workflow
// Create workflow definition
WorkflowDefinition workflow = WorkflowDefinition.builder()
.code(1234567890L)
.name("ETL_Pipeline")
.version(1)
.projectCode(100L)
.tenantCode("default")
.build();
// Create task definitions
TaskDefinition extractTask = TaskDefinition.builder()
.code(1001L)
.name("Extract_Data")
.taskType("SQL")
.taskParams("{\"sql\":\"SELECT * FROM source\"}")
.build();
TaskDefinition loadTask = TaskDefinition.builder()
.code(1002L)
.name("Load_Data")
.taskType("SHELL")
.taskParams("{\"rawScript\":\"load_data.sh\"}")
.build();
// Define dependency: Extract -> Load
WorkflowTaskRelation relation = new WorkflowTaskRelation();
relation.setPreTaskCode(1001L);
relation.setPostTaskCode(1002L);
relation.setWorkflowDefinitionCode(1234567890L);