Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Dolphinscheduler Workflow Task Entity Pattern

From Leeroopedia


Knowledge Sources
Domains Workflow_Orchestration, Data_Modeling
Last Updated 2026-02-10 00:00 GMT

Overview

Concrete tool for defining workflows and tasks using the WorkflowDefinition, TaskDefinition, and WorkflowTaskRelation DAO entities with MyBatis-Plus ORM.

Description

The workflow DAG is persisted through three MyBatis-Plus entities in the dolphinscheduler-dao module. WorkflowDefinition maps to t_ds_workflow_definition and holds workflow metadata. TaskDefinition maps to t_ds_task_definition and holds task type and parameters as a JSON string. WorkflowTaskRelation maps to t_ds_workflow_task_relation and defines edges between tasks using their unique codes.

Usage

These entities are created via the DolphinScheduler REST API or UI. Application code interacts with them through their respective DAO interfaces (WorkflowDefinitionDao, TaskDefinitionDao, etc.).

Code Reference

Source Location

  • Repository: dolphinscheduler
  • File: dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ (entity package)

Signature

@Data
@Builder
@TableName("t_ds_workflow_definition")
public class WorkflowDefinition {
    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;
    private Long code;
    private String name;
    private Integer version;
    private String description;
    private Long projectCode;
    private String tenantCode;
    private String globalParams;    // JSON array of global parameters
    private Flag flag;              // YES/NO
    private ReleaseState releaseState;
    // timestamps, user info, etc.
}

@Data
@Builder
@TableName("t_ds_task_definition")
public class TaskDefinition {
    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;
    private Long code;
    private String name;
    private Integer version;
    private String taskType;        // SHELL, SQL, PYTHON, SUB_PROCESS, etc.
    private String taskParams;      // JSON type-specific parameters
    private TaskExecuteType taskExecuteType;
    private String workerGroup;
    private int failRetryTimes;
    private int failRetryInterval;
    private Priority taskPriority;
    private TimeoutFlag timeoutFlag;
    private int timeout;
    // timestamps, user info, etc.
}

@Data
@TableName("t_ds_workflow_task_relation")
public class WorkflowTaskRelation {
    private Long workflowDefinitionCode;
    private Integer workflowDefinitionVersion;
    private Long preTaskCode;       // upstream task (0 = start node)
    private Integer preTaskVersion;
    private Long postTaskCode;      // downstream task
    private Integer postTaskVersion;
}

Import

import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation;

I/O Contract

Inputs

Name Type Required Description
Workflow DAG JSON from API/UI Yes Task nodes, dependency edges, global parameters
Task parameters JSON string Yes Type-specific task configuration

Outputs

Name Type Description
WorkflowDefinition DB record Persisted workflow metadata with unique code
TaskDefinition DB records Persisted task definitions with codes and parameters
WorkflowTaskRelation DB records DAG edges linking tasks by code

Usage Examples

Defining a Simple Two-Task Workflow

// Create workflow definition
WorkflowDefinition workflow = WorkflowDefinition.builder()
    .code(1234567890L)
    .name("ETL_Pipeline")
    .version(1)
    .projectCode(100L)
    .tenantCode("default")
    .build();

// Create task definitions
TaskDefinition extractTask = TaskDefinition.builder()
    .code(1001L)
    .name("Extract_Data")
    .taskType("SQL")
    .taskParams("{\"sql\":\"SELECT * FROM source\"}")
    .build();

TaskDefinition loadTask = TaskDefinition.builder()
    .code(1002L)
    .name("Load_Data")
    .taskType("SHELL")
    .taskParams("{\"rawScript\":\"load_data.sh\"}")
    .build();

// Define dependency: Extract -> Load
WorkflowTaskRelation relation = new WorkflowTaskRelation();
relation.setPreTaskCode(1001L);
relation.setPostTaskCode(1002L);
relation.setWorkflowDefinitionCode(1234567890L);

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment