Overview
Description
The Workflow SSE Stream implementation encompasses three functions that manage the lifecycle of workflow execution: fetchWorkflowDraft to load the current draft, handleStream to consume real-time SSE events during execution, and stopWorkflowRun to abort a running workflow. The handleStream function is the central piece -- a streaming SSE parser that reads the HTTP response body incrementally, extracts data: lines, parses them as JSON, and dispatches to typed callback handlers based on the event field.
The handleStream function supports over 20 distinct event types covering workflow lifecycle, node execution, container iteration/loop progress, parallel branching, text streaming, human-in-the-loop interaction, and data source processing.
Usage
- Call fetchWorkflowDraft to retrieve the current draft state before running.
- Pass the SSE response from a workflow execution POST to handleStream with the desired callback handlers.
- Call stopWorkflowRun to cancel an active workflow run.
Code Reference
Source Location
web/service/workflow.ts, lines 14-16 (fetchWorkflowDraft), lines 45-47 (stopWorkflowRun)
web/service/base.ts, lines 184-389 (handleStream)
Signature
// Load the current workflow draft
export const fetchWorkflowDraft = (url: string) => {
return get(url, {}, { silent: true }) as Promise<FetchWorkflowDraftResponse>
}
// Stop a running workflow
export const stopWorkflowRun = (url: string) => {
return post<CommonResponse>(url)
}
// SSE stream handler with typed event callbacks
export const handleStream = (
response: Response,
onData: IOnData,
onCompleted?: IOnCompleted,
onThought?: IOnThought,
onMessageEnd?: IOnMessageEnd,
onMessageReplace?: IOnMessageReplace,
onFile?: IOnFile,
onWorkflowStarted?: IOnWorkflowStarted,
onWorkflowFinished?: IOnWorkflowFinished,
onNodeStarted?: IOnNodeStarted,
onNodeFinished?: IOnNodeFinished,
onIterationStart?: IOnIterationStarted,
onIterationNext?: IOnIterationNext,
onIterationFinish?: IOnIterationFinished,
onLoopStart?: IOnLoopStarted,
onLoopNext?: IOnLoopNext,
onLoopFinish?: IOnLoopFinished,
onNodeRetry?: IOnNodeRetry,
onParallelBranchStarted?: IOnParallelBranchStarted,
onParallelBranchFinished?: IOnParallelBranchFinished,
onTextChunk?: IOnTextChunk,
onTTSChunk?: IOnTTSChunk,
onTTSEnd?: IOnTTSEnd,
onTextReplace?: IOnTextReplace,
onAgentLog?: IOnAgentLog,
onHumanInputRequired?: IOHumanInputRequired,
onHumanInputFormFilled?: IOnHumanInputFormFilled,
onHumanInputFormTimeout?: IOnHumanInputFormTimeout,
onWorkflowPaused?: IOWorkflowPaused,
onDataSourceNodeProcessing?: IOnDataSourceNodeProcessing,
onDataSourceNodeCompleted?: IOnDataSourceNodeCompleted,
onDataSourceNodeError?: IOnDataSourceNodeError,
) => { /* streaming parser logic */ }
Import
import { fetchWorkflowDraft, stopWorkflowRun } from '@/service/workflow'
import { handleStream } from '@/service/base'
I/O Contract
Inputs (fetchWorkflowDraft)
| Parameter |
Type |
Required |
Description
|
| url |
string |
Yes |
The API endpoint URL for fetching the workflow draft (e.g., /apps/{appId}/workflows/draft)
|
Outputs (fetchWorkflowDraft)
| Field |
Type |
Description
|
| (return) |
Promise<FetchWorkflowDraftResponse> |
The draft workflow including graph (nodes, edges, viewport), features, created_at, created_by, hash, and updated_at
|
Inputs (handleStream)
| Parameter |
Type |
Required |
Description
|
| response |
Response |
Yes |
The HTTP Response object from the workflow execution POST request
|
| onData |
IOnData |
Yes |
Callback for message/agent_message events (streaming text output)
|
| onWorkflowStarted |
IOnWorkflowStarted |
No |
Callback fired when workflow execution begins
|
| onWorkflowFinished |
IOnWorkflowFinished |
No |
Callback fired when workflow execution completes
|
| onNodeStarted |
IOnNodeStarted |
No |
Callback fired when an individual node begins execution
|
| onNodeFinished |
IOnNodeFinished |
No |
Callback fired when an individual node completes execution
|
| onIterationStart |
IOnIterationStarted |
No |
Callback fired when an Iteration container begins
|
| onIterationNext |
IOnIterationNext |
No |
Callback fired between iteration steps
|
| onIterationFinish |
IOnIterationFinished |
No |
Callback fired when an Iteration container completes
|
| onLoopStart |
IOnLoopStarted |
No |
Callback fired when a Loop container begins
|
| onLoopNext |
IOnLoopNext |
No |
Callback fired between loop iterations
|
| onLoopFinish |
IOnLoopFinished |
No |
Callback fired when a Loop container completes
|
| onTextChunk |
IOnTextChunk |
No |
Callback for incremental text output chunks
|
| onTextReplace |
IOnTextReplace |
No |
Callback for full text replacement events
|
| onHumanInputRequired |
IOHumanInputRequired |
No |
Callback when workflow pauses for human input
|
| onCompleted |
IOnCompleted |
No |
Callback fired when the SSE stream ends
|
Inputs (stopWorkflowRun)
| Parameter |
Type |
Required |
Description
|
| url |
string |
Yes |
The API endpoint URL for stopping the workflow run (e.g., /apps/{appId}/workflows/tasks/{taskId}/stop)
|
Outputs (stopWorkflowRun)
| Field |
Type |
Description
|
| (return) |
Promise<CommonResponse> |
Standard response confirming the stop was issued
|
Usage Examples
import { fetchWorkflowDraft, stopWorkflowRun } from '@/service/workflow'
import { handleStream } from '@/service/base'
// 1. Load the draft before execution
const draft = await fetchWorkflowDraft('/apps/app-abc123/workflows/draft')
// 2. Execute the workflow and handle SSE events
const response = await fetch('/apps/app-abc123/workflows/run', { method: 'POST', body: JSON.stringify({ inputs: {} }) })
handleStream(
response,
(text, isFirst, meta) => console.log('Data:', text),
(hasError, errorMsg) => console.log('Completed:', hasError ? errorMsg : 'success'),
undefined, // onThought
undefined, // onMessageEnd
undefined, // onMessageReplace
undefined, // onFile
(data) => console.log('Workflow started:', data.workflow_run_id),
(data) => console.log('Workflow finished:', data.data.status),
(data) => console.log('Node started:', data.data.node_id),
(data) => console.log('Node finished:', data.data.node_id, data.data.status),
(data) => console.log('Iteration started:', data.data.node_id),
(data) => console.log('Iteration next:', data.data.index),
(data) => console.log('Iteration finished:', data.data.node_id),
)
// 3. Stop a running workflow
await stopWorkflowRun('/apps/app-abc123/workflows/tasks/task-xyz/stop')
Related Pages