Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langgenius Dify Workflow SSE Stream

From Leeroopedia
Knowledge Sources Dify
Domains Workflow, DAG, Frontend
Last Updated 2026-02-12 00:00 GMT

Overview

Description

The Workflow SSE Stream implementation encompasses three functions that manage the lifecycle of workflow execution: fetchWorkflowDraft to load the current draft, handleStream to consume real-time SSE events during execution, and stopWorkflowRun to abort a running workflow. The handleStream function is the central piece -- a streaming SSE parser that reads the HTTP response body incrementally, extracts data: lines, parses them as JSON, and dispatches to typed callback handlers based on the event field.

The handleStream function supports over 20 distinct event types covering workflow lifecycle, node execution, container iteration/loop progress, parallel branching, text streaming, human-in-the-loop interaction, and data source processing.

Usage

  • Call fetchWorkflowDraft to retrieve the current draft state before running.
  • Pass the SSE response from a workflow execution POST to handleStream with the desired callback handlers.
  • Call stopWorkflowRun to cancel an active workflow run.

Code Reference

Source Location

  • web/service/workflow.ts, lines 14-16 (fetchWorkflowDraft), lines 45-47 (stopWorkflowRun)
  • web/service/base.ts, lines 184-389 (handleStream)

Signature

// Load the current workflow draft
export const fetchWorkflowDraft = (url: string) => {
  return get(url, {}, { silent: true }) as Promise<FetchWorkflowDraftResponse>
}

// Stop a running workflow
export const stopWorkflowRun = (url: string) => {
  return post<CommonResponse>(url)
}

// SSE stream handler with typed event callbacks
export const handleStream = (
  response: Response,
  onData: IOnData,
  onCompleted?: IOnCompleted,
  onThought?: IOnThought,
  onMessageEnd?: IOnMessageEnd,
  onMessageReplace?: IOnMessageReplace,
  onFile?: IOnFile,
  onWorkflowStarted?: IOnWorkflowStarted,
  onWorkflowFinished?: IOnWorkflowFinished,
  onNodeStarted?: IOnNodeStarted,
  onNodeFinished?: IOnNodeFinished,
  onIterationStart?: IOnIterationStarted,
  onIterationNext?: IOnIterationNext,
  onIterationFinish?: IOnIterationFinished,
  onLoopStart?: IOnLoopStarted,
  onLoopNext?: IOnLoopNext,
  onLoopFinish?: IOnLoopFinished,
  onNodeRetry?: IOnNodeRetry,
  onParallelBranchStarted?: IOnParallelBranchStarted,
  onParallelBranchFinished?: IOnParallelBranchFinished,
  onTextChunk?: IOnTextChunk,
  onTTSChunk?: IOnTTSChunk,
  onTTSEnd?: IOnTTSEnd,
  onTextReplace?: IOnTextReplace,
  onAgentLog?: IOnAgentLog,
  onHumanInputRequired?: IOHumanInputRequired,
  onHumanInputFormFilled?: IOnHumanInputFormFilled,
  onHumanInputFormTimeout?: IOnHumanInputFormTimeout,
  onWorkflowPaused?: IOWorkflowPaused,
  onDataSourceNodeProcessing?: IOnDataSourceNodeProcessing,
  onDataSourceNodeCompleted?: IOnDataSourceNodeCompleted,
  onDataSourceNodeError?: IOnDataSourceNodeError,
) => { /* streaming parser logic */ }

Import

import { fetchWorkflowDraft, stopWorkflowRun } from '@/service/workflow'
import { handleStream } from '@/service/base'

I/O Contract

Inputs (fetchWorkflowDraft)

Parameter Type Required Description
url string Yes The API endpoint URL for fetching the workflow draft (e.g., /apps/{appId}/workflows/draft)

Outputs (fetchWorkflowDraft)

Field Type Description
(return) Promise<FetchWorkflowDraftResponse> The draft workflow including graph (nodes, edges, viewport), features, created_at, created_by, hash, and updated_at

Inputs (handleStream)

Parameter Type Required Description
response Response Yes The HTTP Response object from the workflow execution POST request
onData IOnData Yes Callback for message/agent_message events (streaming text output)
onWorkflowStarted IOnWorkflowStarted No Callback fired when workflow execution begins
onWorkflowFinished IOnWorkflowFinished No Callback fired when workflow execution completes
onNodeStarted IOnNodeStarted No Callback fired when an individual node begins execution
onNodeFinished IOnNodeFinished No Callback fired when an individual node completes execution
onIterationStart IOnIterationStarted No Callback fired when an Iteration container begins
onIterationNext IOnIterationNext No Callback fired between iteration steps
onIterationFinish IOnIterationFinished No Callback fired when an Iteration container completes
onLoopStart IOnLoopStarted No Callback fired when a Loop container begins
onLoopNext IOnLoopNext No Callback fired between loop iterations
onLoopFinish IOnLoopFinished No Callback fired when a Loop container completes
onTextChunk IOnTextChunk No Callback for incremental text output chunks
onTextReplace IOnTextReplace No Callback for full text replacement events
onHumanInputRequired IOHumanInputRequired No Callback when workflow pauses for human input
onCompleted IOnCompleted No Callback fired when the SSE stream ends

Inputs (stopWorkflowRun)

Parameter Type Required Description
url string Yes The API endpoint URL for stopping the workflow run (e.g., /apps/{appId}/workflows/tasks/{taskId}/stop)

Outputs (stopWorkflowRun)

Field Type Description
(return) Promise<CommonResponse> Standard response confirming the stop was issued

Usage Examples

import { fetchWorkflowDraft, stopWorkflowRun } from '@/service/workflow'
import { handleStream } from '@/service/base'

// 1. Load the draft before execution
const draft = await fetchWorkflowDraft('/apps/app-abc123/workflows/draft')

// 2. Execute the workflow and handle SSE events
const response = await fetch('/apps/app-abc123/workflows/run', { method: 'POST', body: JSON.stringify({ inputs: {} }) })

handleStream(
  response,
  (text, isFirst, meta) => console.log('Data:', text),
  (hasError, errorMsg) => console.log('Completed:', hasError ? errorMsg : 'success'),
  undefined, // onThought
  undefined, // onMessageEnd
  undefined, // onMessageReplace
  undefined, // onFile
  (data) => console.log('Workflow started:', data.workflow_run_id),
  (data) => console.log('Workflow finished:', data.data.status),
  (data) => console.log('Node started:', data.data.node_id),
  (data) => console.log('Node finished:', data.data.node_id, data.data.status),
  (data) => console.log('Iteration started:', data.data.node_id),
  (data) => console.log('Iteration next:', data.data.index),
  (data) => console.log('Iteration finished:', data.data.node_id),
)

// 3. Stop a running workflow
await stopWorkflowRun('/apps/app-abc123/workflows/tasks/task-xyz/stop')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment