From 14a6f13bf5258e9edfa136f48dc5aa35bec496b2 Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Wed, 11 Dec 2024 10:15:58 +0800 Subject: [PATCH 1/2] fix: iteration node in parallel mode token error --- api/core/workflow/nodes/iteration/iteration_node.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 5b3853c5d6279c..2dfed2bf6f35c8 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -182,7 +182,6 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: future.add_done_callback(thread_pool.task_done_callback) futures.append(future) succeeded_count = 0 - empty_count = 0 while True: try: event = q.get(timeout=1) @@ -202,6 +201,8 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: if isinstance(event, IterationRunFailedEvent): q.put(None) yield event + if isinstance(event, int): + graph_engine.graph_runtime_state.total_tokens += event except Empty: continue @@ -593,3 +594,4 @@ def _run_single_iter_parallel( parallel_mode_run_id=parallel_mode_run_id, ): q.put(event) + q.put(graph_engine_copy.graph_runtime_state.total_tokens) From 697f5c44014a142141c80fd7c1049370a03129c1 Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Wed, 11 Dec 2024 10:41:29 +0800 Subject: [PATCH 2/2] fix: test cases error --- api/core/workflow/nodes/iteration/iteration_node.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 2dfed2bf6f35c8..cbabe7a3c56faf 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -201,8 +201,6 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: if isinstance(event, IterationRunFailedEvent): q.put(None) yield event - if isinstance(event, int): - graph_engine.graph_runtime_state.total_tokens += event except Empty: continue @@ -594,4 +592,4 @@ def _run_single_iter_parallel( parallel_mode_run_id=parallel_mode_run_id, ): q.put(event) - q.put(graph_engine_copy.graph_runtime_state.total_tokens) + graph_engine.graph_runtime_state.total_tokens += graph_engine_copy.graph_runtime_state.total_tokens