Skip to content

feat: workflow continue on error #11474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
112872e
feat: error handle in panel
zxhlyh Nov 19, 2024
791501d
Merge branch 'main' into feat/workflow-continue-on-error
zxhlyh Nov 21, 2024
39bfbf5
error handle
zxhlyh Nov 21, 2024
e5619b7
error handle
zxhlyh Nov 22, 2024
a689f6d
Merge branch 'main' into feat/workflow-continue-on-error
zxhlyh Nov 22, 2024
cd7cfe8
update submodule
zxhlyh Nov 24, 2024
3f38ec6
merge main
zxhlyh Nov 26, 2024
fb9653f
collapse
zxhlyh Nov 26, 2024
f740bd4
error handle
zxhlyh Nov 27, 2024
738126a
error handle
zxhlyh Nov 27, 2024
16cc9ba
Merge branch 'main' into feat/workflow-continue-on-error
zxhlyh Nov 27, 2024
7b292b6
error handle
zxhlyh Nov 27, 2024
fc3b49f
error handle
zxhlyh Nov 28, 2024
435f983
Merge branch 'main' into feat/workflow-continue-on-error
zxhlyh Dec 3, 2024
2b40e9a
error handle
zxhlyh Dec 3, 2024
12295da
Merge branch 'main' into feat/workflow-continue-on-error
zxhlyh Dec 4, 2024
b5136fc
error handle
zxhlyh Dec 4, 2024
e1d1324
Merge branch 'main' into feat/workflow-continue-on-error
zxhlyh Dec 4, 2024
9468a0d
error handle
zxhlyh Dec 4, 2024
e138eef
Merge branch 'main' into feat/workflow-continue-on-error
zxhlyh Dec 5, 2024
ec507a1
fix: error handle tip
zxhlyh Dec 5, 2024
6f75c00
fix: error handle style
zxhlyh Dec 5, 2024
31128ef
fix: error handle node Handle
zxhlyh Dec 5, 2024
56c2265
fix: error handle output variable
zxhlyh Dec 5, 2024
02a1d8a
fix: error handle output variable
zxhlyh Dec 5, 2024
b8eaff8
fix: edge line color
zxhlyh Dec 6, 2024
449c3dc
fix: error handle tip
zxhlyh Dec 6, 2024
a17bb43
fix: run detail
zxhlyh Dec 6, 2024
049bea9
Merge branch 'main' into feat/workflow-continue-on-error
zxhlyh Dec 6, 2024
daa72a6
fix: single run detail
zxhlyh Dec 6, 2024
b5055dd
fix: fail branch
zxhlyh Dec 6, 2024
abb1cdd
fix: link
zxhlyh Dec 9, 2024
d51c7f2
fix: exception variable color
zxhlyh Dec 9, 2024
a097764
Merge branch 'main' into feat/workflow-continue-on-error
zxhlyh Dec 9, 2024
ec9f82c
fix: exception variable color
zxhlyh Dec 9, 2024
68e8618
fix: default value
zxhlyh Dec 9, 2024
add4870
fix: code node default value
zxhlyh Dec 9, 2024
149ea80
fix: types
zxhlyh Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { VarBlockIcon } from '@/app/components/workflow/block-icon'
import { Line3 } from '@/app/components/base/icons/src/public/common'
import { isConversationVar, isENV, isSystemVar } from '@/app/components/workflow/nodes/_base/components/variable/utils'
import Tooltip from '@/app/components/base/tooltip'
import { isExceptionVariable } from '@/app/components/workflow/utils'

type WorkflowVariableBlockComponentProps = {
nodeKey: string
Expand Down Expand Up @@ -53,6 +54,7 @@ const WorkflowVariableBlockComponent = ({
const node = localWorkflowNodesMap![variables[0]]
const isEnv = isENV(variables)
const isChatVar = isConversationVar(variables)
const isException = isExceptionVariable(varName, node?.type)

useEffect(() => {
if (!editor.hasNodes([WorkflowVariableBlockNode]))
Expand Down Expand Up @@ -98,10 +100,10 @@ const WorkflowVariableBlockComponent = ({
</div>
)}
<div className='flex items-center text-primary-600'>
{!isEnv && !isChatVar && <Variable02 className='shrink-0 w-3.5 h-3.5' />}
{!isEnv && !isChatVar && <Variable02 className={cn('shrink-0 w-3.5 h-3.5', isException && 'text-text-warning')} />}
{isEnv && <Env className='shrink-0 w-3.5 h-3.5 text-util-colors-violet-violet-600' />}
{isChatVar && <BubbleX className='w-3.5 h-3.5 text-util-colors-teal-teal-700' />}
<div className={cn('shrink-0 ml-0.5 text-xs font-medium truncate', (isEnv || isChatVar) && 'text-gray-900')} title={varName}>{varName}</div>
<div className={cn('shrink-0 ml-0.5 text-xs font-medium truncate', (isEnv || isChatVar) && 'text-gray-900', isException && 'text-text-warning')} title={varName}>{varName}</div>
{
!node && !isEnv && !isChatVar && (
<RiErrorWarningFill className='ml-0.5 w-3 h-3 text-[#D92D20]' />
Expand Down
53 changes: 53 additions & 0 deletions web/app/components/workflow/custom-edge-linear-gradient-render.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
type CustomEdgeLinearGradientRenderProps = {
id: string
startColor: string
stopColor: string
position: {
x1: number
x2: number
y1: number
y2: number
}
}
const CustomEdgeLinearGradientRender = ({
id,
startColor,
stopColor,
position,
}: CustomEdgeLinearGradientRenderProps) => {
const {
x1,
x2,
y1,
y2,
} = position
return (
<defs>
<linearGradient
id={id}
gradientUnits='userSpaceOnUse'
x1={x1}
y1={y1}
x2={x2}
y2={y2}
>
<stop
offset='0%'
style={{
stopColor: startColor,
stopOpacity: 1,
}}
/>
<stop
offset='100%'
style={{
stopColor,
stopOpacity: 1,
}}
/>
</linearGradient>
</defs>
)
}

export default CustomEdgeLinearGradientRender
57 changes: 56 additions & 1 deletion web/app/components/workflow/custom-edge.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
memo,
useCallback,
useMemo,
useState,
} from 'react'
import { intersection } from 'lodash-es'
Expand All @@ -20,8 +21,12 @@ import type {
Edge,
OnSelectBlock,
} from './types'
import { NodeRunningStatus } from './types'
import { getEdgeColor } from './utils'
import { ITERATION_CHILDREN_Z_INDEX } from './constants'
import CustomEdgeLinearGradientRender from './custom-edge-linear-gradient-render'
import cn from '@/utils/classnames'
import { ErrorHandleTypeEnum } from '@/app/components/workflow/nodes/_base/components/error-handle/types'

const CustomEdge = ({
id,
Expand Down Expand Up @@ -53,6 +58,26 @@ const CustomEdge = ({
const { handleNodeAdd } = useNodesInteractions()
const { availablePrevBlocks } = useAvailableBlocks((data as Edge['data'])!.targetType, (data as Edge['data'])?.isInIteration)
const { availableNextBlocks } = useAvailableBlocks((data as Edge['data'])!.sourceType, (data as Edge['data'])?.isInIteration)
const {
_sourceRunningStatus,
_targetRunningStatus,
} = data

const linearGradientId = useMemo(() => {
if (
(
_sourceRunningStatus === NodeRunningStatus.Succeeded
|| _sourceRunningStatus === NodeRunningStatus.Failed
|| _sourceRunningStatus === NodeRunningStatus.Exception
) && (
_targetRunningStatus === NodeRunningStatus.Succeeded
|| _targetRunningStatus === NodeRunningStatus.Failed
|| _targetRunningStatus === NodeRunningStatus.Exception
|| _targetRunningStatus === NodeRunningStatus.Running
)
)
return id
}, [_sourceRunningStatus, _targetRunningStatus, id])

const handleOpenChange = useCallback((v: boolean) => {
setOpen(v)
Expand All @@ -73,14 +98,43 @@ const CustomEdge = ({
)
}, [handleNodeAdd, source, sourceHandleId, target, targetHandleId])

const stroke = useMemo(() => {
if (selected)
return getEdgeColor(NodeRunningStatus.Running)

if (linearGradientId)
return `url(#${linearGradientId})`

if (data?._connectedNodeIsHovering)
return getEdgeColor(NodeRunningStatus.Running, sourceHandleId === ErrorHandleTypeEnum.failBranch)

return getEdgeColor()
}, [data._connectedNodeIsHovering, linearGradientId, selected, sourceHandleId])

return (
<>
{
linearGradientId && (
<CustomEdgeLinearGradientRender
id={linearGradientId}
startColor={getEdgeColor(_sourceRunningStatus)}
stopColor={getEdgeColor(_targetRunningStatus)}
position={{
x1: sourceX,
y1: sourceY,
x2: targetX,
y2: targetY,
}}
/>
)
}
<BaseEdge
id={id}
path={edgePath}
style={{
stroke: (selected || data?._connectedNodeIsHovering || data?._run) ? '#2970FF' : '#D0D5DD',
stroke,
strokeWidth: 2,
opacity: data._waitingRun ? 0.7 : 1,
}}
/>
<EdgeLabelRenderer>
Expand All @@ -95,6 +149,7 @@ const CustomEdge = ({
position: 'absolute',
transform: `translate(-50%, -50%) translate(${labelX}px, ${labelY}px)`,
pointerEvents: 'all',
opacity: data._waitingRun ? 0.7 : 1,
}}
>
<BlockSelector
Expand Down
34 changes: 20 additions & 14 deletions web/app/components/workflow/hooks/use-edges-interactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,29 @@ export const useEdgesInteractions = () => {
edges,
setEdges,
} = store.getState()
const currentEdgeIndex = edges.findIndex(edge => edge.source === nodeId && edge.sourceHandle === branchId)
const edgeWillBeDeleted = edges.filter(edge => edge.source === nodeId && edge.sourceHandle === branchId)

if (currentEdgeIndex < 0)
if (!edgeWillBeDeleted.length)
return

const currentEdge = edges[currentEdgeIndex]
const newNodes = produce(getNodes(), (draft: Node[]) => {
const sourceNode = draft.find(node => node.id === currentEdge.source)
const targetNode = draft.find(node => node.id === currentEdge.target)

if (sourceNode)
sourceNode.data._connectedSourceHandleIds = sourceNode.data._connectedSourceHandleIds?.filter(handleId => handleId !== currentEdge.sourceHandle)

if (targetNode)
targetNode.data._connectedTargetHandleIds = targetNode.data._connectedTargetHandleIds?.filter(handleId => handleId !== currentEdge.targetHandle)
const nodes = getNodes()
const nodesConnectedSourceOrTargetHandleIdsMap = getNodesConnectedSourceOrTargetHandleIdsMap(
edgeWillBeDeleted.map(edge => ({ type: 'remove', edge })),
nodes,
)
const newNodes = produce(nodes, (draft: Node[]) => {
draft.forEach((node) => {
if (nodesConnectedSourceOrTargetHandleIdsMap[node.id]) {
node.data = {
...node.data,
...nodesConnectedSourceOrTargetHandleIdsMap[node.id],
}
}
})
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
draft.splice(currentEdgeIndex, 1)
return draft.filter(edge => !edgeWillBeDeleted.find(e => e.id === edge.id))
})
setEdges(newEdges)
handleSyncWorkflowDraft()
Expand Down Expand Up @@ -155,7 +159,9 @@ export const useEdgesInteractions = () => {

const newEdges = produce(edges, (draft) => {
draft.forEach((edge) => {
edge.data._run = false
edge.data._sourceRunningStatus = undefined
edge.data._targetRunningStatus = undefined
edge.data._waitingRun = false
})
})
setEdges(newEdges)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@ export const useNodesInteractions = () => {
const newNodes = produce(nodes, (draft) => {
draft.forEach((node) => {
node.data._runningStatus = undefined
node.data._waitingRun = false
})
})
setNodes(newNodes)
Expand Down
78 changes: 67 additions & 11 deletions web/app/components/workflow/hooks/use-workflow-run.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { useCallback } from 'react'
import {
getIncomers,
useReactFlow,
useStoreApi,
} from 'reactflow'
Expand All @@ -9,8 +8,8 @@ import { v4 as uuidV4 } from 'uuid'
import { usePathname } from 'next/navigation'
import { useWorkflowStore } from '../store'
import { useNodesSyncDraft } from '../hooks'
import type { Node } from '../types'
import {
BlockEnum,
NodeRunningStatus,
WorkflowRunningStatus,
} from '../types'
Expand All @@ -28,6 +27,7 @@ import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player
import {
getFilesInLogs,
} from '@/app/components/base/file-uploader/utils'
import { ErrorHandleTypeEnum } from '@/app/components/workflow/nodes/_base/components/error-handle/types'

export const useWorkflowRun = () => {
const store = useStoreApi()
Expand Down Expand Up @@ -174,6 +174,8 @@ export const useWorkflowRun = () => {
setIterParallelLogMap,
} = workflowStore.getState()
const {
getNodes,
setNodes,
edges,
setEdges,
} = store.getState()
Expand All @@ -186,12 +188,20 @@ export const useWorkflowRun = () => {
status: WorkflowRunningStatus.Running,
}
}))

const nodes = getNodes()
const newNodes = produce(nodes, (draft) => {
draft.forEach((node) => {
node.data._waitingRun = true
})
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
draft.forEach((edge) => {
edge.data = {
...edge.data,
_run: false,
_sourceRunningStatus: undefined,
_targetRunningStatus: undefined,
_waitingRun: true,
}
})
})
Expand Down Expand Up @@ -311,13 +321,27 @@ export const useWorkflowRun = () => {
}
const newNodes = produce(nodes, (draft) => {
draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
draft[currentNodeIndex].data._waitingRun = false
})
setNodes(newNodes)
const incomeNodesId = getIncomers({ id: data.node_id } as Node, newNodes, edges).filter(node => node.data._runningStatus === NodeRunningStatus.Succeeded).map(node => node.id)
const newEdges = produce(edges, (draft) => {
draft.forEach((edge) => {
if (edge.target === data.node_id && incomeNodesId.includes(edge.source))
edge.data = { ...edge.data, _run: true } as any
const incomeEdges = draft.filter((edge) => {
return edge.target === data.node_id
})

incomeEdges.forEach((edge) => {
const incomeNode = nodes.find(node => node.id === edge.source)!
if (
(!incomeNode.data._runningBranchId && edge.sourceHandle === 'source')
|| (incomeNode.data._runningBranchId && edge.sourceHandle === incomeNode.data._runningBranchId)
) {
edge.data = {
...edge.data,
_sourceRunningStatus: incomeNode.data._runningStatus,
_targetRunningStatus: NodeRunningStatus.Running,
_waitingRun: false,
}
}
})
})
setEdges(newEdges)
Expand All @@ -336,6 +360,8 @@ export const useWorkflowRun = () => {
const {
getNodes,
setNodes,
edges,
setEdges,
} = store.getState()
const nodes = getNodes()
const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId
Expand Down Expand Up @@ -423,8 +449,31 @@ export const useWorkflowRun = () => {
const newNodes = produce(nodes, (draft) => {
const currentNode = draft.find(node => node.id === data.node_id)!
currentNode.data._runningStatus = data.status as any
if (data.status === NodeRunningStatus.Exception) {
if (data.execution_metadata.error_strategy === ErrorHandleTypeEnum.failBranch)
currentNode.data._runningBranchId = ErrorHandleTypeEnum.failBranch
}
else {
if (data.node_type === BlockEnum.IfElse)
currentNode.data._runningBranchId = data?.outputs?.selected_case_id

if (data.node_type === BlockEnum.QuestionClassifier)
currentNode.data._runningBranchId = data?.outputs?.class_id
}
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
const incomeEdges = draft.filter((edge) => {
return edge.target === data.node_id
})
incomeEdges.forEach((edge) => {
edge.data = {
...edge.data,
_targetRunningStatus: data.status as any,
}
})
})
setEdges(newEdges)
prevNodeId = data.node_id
}

Expand Down Expand Up @@ -474,13 +523,20 @@ export const useWorkflowRun = () => {
const newNodes = produce(nodes, (draft) => {
draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length
draft[currentNodeIndex].data._waitingRun = false
})
setNodes(newNodes)
const newEdges = produce(edges, (draft) => {
const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
const incomeEdges = draft.filter(edge => edge.target === data.node_id)

if (edge)
edge.data = { ...edge.data, _run: true } as any
incomeEdges.forEach((edge) => {
edge.data = {
...edge.data,
_sourceRunningStatus: nodes.find(node => node.id === edge.source)!.data._runningStatus,
_targetRunningStatus: NodeRunningStatus.Running,
_waitingRun: false,
}
})
})
setEdges(newEdges)

Expand Down
Loading
Loading