Skip to content

Commit bc8f5bf

Browse files
committed
fix(workflow_service): assign UUID to workflow_node_execution id and update optional fields in WorkflowRun and WorkflowNodeExecution models (#12096)
Signed-off-by: -LAN- <[email protected]>
1 parent 19d3d91 commit bc8f5bf

File tree

5 files changed

+368
-332
lines changed

5 files changed

+368
-332
lines changed

api/core/app/apps/advanced_chat/generate_task_pipeline.py

+120-68
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,12 @@ def __init__(
101101
)
102102

103103
if isinstance(user, EndUser):
104-
self._user_id = user.session_id
104+
self._user_id = user.id
105+
user_session_id = user.session_id
105106
self._created_by_role = CreatedByRole.END_USER
106107
elif isinstance(user, Account):
107108
self._user_id = user.id
109+
user_session_id = user.id
108110
self._created_by_role = CreatedByRole.ACCOUNT
109111
else:
110112
raise NotImplementedError(f"User type not supported: {type(user)}")
@@ -122,7 +124,7 @@ def __init__(
122124
SystemVariableKey.QUERY: message.query,
123125
SystemVariableKey.FILES: application_generate_entity.files,
124126
SystemVariableKey.CONVERSATION_ID: conversation.id,
125-
SystemVariableKey.USER_ID: self._user_id,
127+
SystemVariableKey.USER_ID: user_session_id,
126128
SystemVariableKey.DIALOGUE_COUNT: dialogue_count,
127129
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
128130
SystemVariableKey.WORKFLOW_ID: workflow.id,
@@ -134,6 +136,7 @@ def __init__(
134136

135137
self._conversation_name_generate_thread = None
136138
self._recorded_files: list[Mapping[str, Any]] = []
139+
self._workflow_run_id = ""
137140

138141
def process(self):
139142
"""
@@ -262,8 +265,7 @@ def _process_stream_response(
262265
:return:
263266
"""
264267
# init fake graph runtime state
265-
graph_runtime_state = None
266-
workflow_run = None
268+
graph_runtime_state: Optional[GraphRuntimeState] = None
267269

268270
for queue_message in self._queue_manager.listen():
269271
event = queue_message.event
@@ -288,111 +290,163 @@ def _process_stream_response(
288290
user_id=self._user_id,
289291
created_by_role=self._created_by_role,
290292
)
293+
self._workflow_run_id = workflow_run.id
291294
message = self._get_message(session=session)
292295
if not message:
293296
raise ValueError(f"Message not found: {self._message_id}")
294297
message.workflow_run_id = workflow_run.id
295-
session.commit()
296-
297298
workflow_start_resp = self._workflow_start_to_stream_response(
298299
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
299300
)
301+
session.commit()
302+
300303
yield workflow_start_resp
301304
elif isinstance(
302305
event,
303306
QueueNodeRetryEvent,
304307
):
305-
if not workflow_run:
308+
if not self._workflow_run_id:
306309
raise ValueError("workflow run not initialized.")
307-
workflow_node_execution = self._handle_workflow_node_execution_retried(
308-
workflow_run=workflow_run, event=event
309-
)
310310

311-
node_retry_resp = self._workflow_node_retry_to_stream_response(
312-
event=event,
313-
task_id=self._application_generate_entity.task_id,
314-
workflow_node_execution=workflow_node_execution,
315-
)
311+
with Session(db.engine) as session:
312+
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
313+
workflow_node_execution = self._handle_workflow_node_execution_retried(
314+
session=session, workflow_run=workflow_run, event=event
315+
)
316+
node_retry_resp = self._workflow_node_retry_to_stream_response(
317+
session=session,
318+
event=event,
319+
task_id=self._application_generate_entity.task_id,
320+
workflow_node_execution=workflow_node_execution,
321+
)
322+
session.commit()
316323

317324
if node_retry_resp:
318325
yield node_retry_resp
319326
elif isinstance(event, QueueNodeStartedEvent):
320-
if not workflow_run:
327+
if not self._workflow_run_id:
321328
raise ValueError("workflow run not initialized.")
322329

323-
workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
330+
with Session(db.engine) as session:
331+
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
332+
workflow_node_execution = self._handle_node_execution_start(
333+
session=session, workflow_run=workflow_run, event=event
334+
)
324335

325-
node_resp = self._workflow_node_start_to_stream_response(
326-
event=event,
327-
task_id=self._application_generate_entity.task_id,
328-
workflow_node_execution=workflow_node_execution,
329-
)
336+
node_resp = self._workflow_node_start_to_stream_response(
337+
session=session,
338+
event=event,
339+
task_id=self._application_generate_entity.task_id,
340+
workflow_node_execution=workflow_node_execution,
341+
)
342+
session.commit()
330343

331344
if node_resp:
332345
yield node_resp
333346
elif isinstance(event, QueueNodeSucceededEvent):
334-
workflow_node_execution = self._handle_workflow_node_execution_success(event)
335-
336347
# Record files if it's an answer node or end node
337348
if event.node_type in [NodeType.ANSWER, NodeType.END]:
338349
self._recorded_files.extend(self._fetch_files_from_node_outputs(event.outputs or {}))
339350

340-
node_finish_resp = self._workflow_node_finish_to_stream_response(
341-
event=event,
342-
task_id=self._application_generate_entity.task_id,
343-
workflow_node_execution=workflow_node_execution,
344-
)
351+
with Session(db.engine) as session:
352+
workflow_node_execution = self._handle_workflow_node_execution_success(session=session, event=event)
353+
354+
node_finish_resp = self._workflow_node_finish_to_stream_response(
355+
session=session,
356+
event=event,
357+
task_id=self._application_generate_entity.task_id,
358+
workflow_node_execution=workflow_node_execution,
359+
)
360+
session.commit()
345361

346362
if node_finish_resp:
347363
yield node_finish_resp
348364
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent):
349-
workflow_node_execution = self._handle_workflow_node_execution_failed(event)
365+
with Session(db.engine) as session:
366+
workflow_node_execution = self._handle_workflow_node_execution_failed(session=session, event=event)
367+
368+
node_finish_resp = self._workflow_node_finish_to_stream_response(
369+
session=session,
370+
event=event,
371+
task_id=self._application_generate_entity.task_id,
372+
workflow_node_execution=workflow_node_execution,
373+
)
374+
session.commit()
350375

351-
node_finish_resp = self._workflow_node_finish_to_stream_response(
352-
event=event,
353-
task_id=self._application_generate_entity.task_id,
354-
workflow_node_execution=workflow_node_execution,
355-
)
356376
if node_finish_resp:
357377
yield node_finish_resp
358-
359378
elif isinstance(event, QueueParallelBranchRunStartedEvent):
360-
if not workflow_run:
379+
if not self._workflow_run_id:
361380
raise ValueError("workflow run not initialized.")
362381

363-
yield self._workflow_parallel_branch_start_to_stream_response(
364-
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
365-
)
382+
with Session(db.engine) as session:
383+
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
384+
parallel_start_resp = self._workflow_parallel_branch_start_to_stream_response(
385+
session=session,
386+
task_id=self._application_generate_entity.task_id,
387+
workflow_run=workflow_run,
388+
event=event,
389+
)
390+
391+
yield parallel_start_resp
366392
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
367-
if not workflow_run:
393+
if not self._workflow_run_id:
368394
raise ValueError("workflow run not initialized.")
369395

370-
yield self._workflow_parallel_branch_finished_to_stream_response(
371-
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
372-
)
396+
with Session(db.engine) as session:
397+
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
398+
parallel_finish_resp = self._workflow_parallel_branch_finished_to_stream_response(
399+
session=session,
400+
task_id=self._application_generate_entity.task_id,
401+
workflow_run=workflow_run,
402+
event=event,
403+
)
404+
405+
yield parallel_finish_resp
373406
elif isinstance(event, QueueIterationStartEvent):
374-
if not workflow_run:
407+
if not self._workflow_run_id:
375408
raise ValueError("workflow run not initialized.")
376409

377-
yield self._workflow_iteration_start_to_stream_response(
378-
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
379-
)
410+
with Session(db.engine) as session:
411+
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
412+
iter_start_resp = self._workflow_iteration_start_to_stream_response(
413+
session=session,
414+
task_id=self._application_generate_entity.task_id,
415+
workflow_run=workflow_run,
416+
event=event,
417+
)
418+
419+
yield iter_start_resp
380420
elif isinstance(event, QueueIterationNextEvent):
381-
if not workflow_run:
421+
if not self._workflow_run_id:
382422
raise ValueError("workflow run not initialized.")
383423

384-
yield self._workflow_iteration_next_to_stream_response(
385-
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
386-
)
424+
with Session(db.engine) as session:
425+
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
426+
iter_next_resp = self._workflow_iteration_next_to_stream_response(
427+
session=session,
428+
task_id=self._application_generate_entity.task_id,
429+
workflow_run=workflow_run,
430+
event=event,
431+
)
432+
433+
yield iter_next_resp
387434
elif isinstance(event, QueueIterationCompletedEvent):
388-
if not workflow_run:
435+
if not self._workflow_run_id:
389436
raise ValueError("workflow run not initialized.")
390437

391-
yield self._workflow_iteration_completed_to_stream_response(
392-
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
393-
)
438+
with Session(db.engine) as session:
439+
workflow_run = self._get_workflow_run(session=session, workflow_run_id=self._workflow_run_id)
440+
iter_finish_resp = self._workflow_iteration_completed_to_stream_response(
441+
session=session,
442+
task_id=self._application_generate_entity.task_id,
443+
workflow_run=workflow_run,
444+
event=event,
445+
)
446+
447+
yield iter_finish_resp
394448
elif isinstance(event, QueueWorkflowSucceededEvent):
395-
if not workflow_run:
449+
if not self._workflow_run_id:
396450
raise ValueError("workflow run not initialized.")
397451

398452
if not graph_runtime_state:
@@ -401,7 +455,7 @@ def _process_stream_response(
401455
with Session(db.engine) as session:
402456
workflow_run = self._handle_workflow_run_success(
403457
session=session,
404-
workflow_run=workflow_run,
458+
workflow_run_id=self._workflow_run_id,
405459
start_at=graph_runtime_state.start_at,
406460
total_tokens=graph_runtime_state.total_tokens,
407461
total_steps=graph_runtime_state.node_run_steps,
@@ -418,16 +472,15 @@ def _process_stream_response(
418472
yield workflow_finish_resp
419473
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
420474
elif isinstance(event, QueueWorkflowPartialSuccessEvent):
421-
if not workflow_run:
475+
if not self._workflow_run_id:
422476
raise ValueError("workflow run not initialized.")
423-
424477
if not graph_runtime_state:
425478
raise ValueError("graph runtime state not initialized.")
426479

427480
with Session(db.engine) as session:
428481
workflow_run = self._handle_workflow_run_partial_success(
429482
session=session,
430-
workflow_run=workflow_run,
483+
workflow_run_id=self._workflow_run_id,
431484
start_at=graph_runtime_state.start_at,
432485
total_tokens=graph_runtime_state.total_tokens,
433486
total_steps=graph_runtime_state.node_run_steps,
@@ -436,7 +489,6 @@ def _process_stream_response(
436489
conversation_id=None,
437490
trace_manager=trace_manager,
438491
)
439-
440492
workflow_finish_resp = self._workflow_finish_to_stream_response(
441493
session=session, task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
442494
)
@@ -445,16 +497,15 @@ def _process_stream_response(
445497
yield workflow_finish_resp
446498
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
447499
elif isinstance(event, QueueWorkflowFailedEvent):
448-
if not workflow_run:
500+
if not self._workflow_run_id:
449501
raise ValueError("workflow run not initialized.")
450-
451502
if not graph_runtime_state:
452503
raise ValueError("graph runtime state not initialized.")
453504

454505
with Session(db.engine) as session:
455506
workflow_run = self._handle_workflow_run_failed(
456507
session=session,
457-
workflow_run=workflow_run,
508+
workflow_run_id=self._workflow_run_id,
458509
start_at=graph_runtime_state.start_at,
459510
total_tokens=graph_runtime_state.total_tokens,
460511
total_steps=graph_runtime_state.node_run_steps,
@@ -470,15 +521,16 @@ def _process_stream_response(
470521
err_event = QueueErrorEvent(error=ValueError(f"Run failed: {workflow_run.error}"))
471522
err = self._handle_error(event=err_event, session=session, message_id=self._message_id)
472523
session.commit()
524+
473525
yield workflow_finish_resp
474526
yield self._error_to_stream_response(err)
475527
break
476528
elif isinstance(event, QueueStopEvent):
477-
if workflow_run and graph_runtime_state:
529+
if self._workflow_run_id and graph_runtime_state:
478530
with Session(db.engine) as session:
479531
workflow_run = self._handle_workflow_run_failed(
480532
session=session,
481-
workflow_run=workflow_run,
533+
workflow_run_id=self._workflow_run_id,
482534
start_at=graph_runtime_state.start_at,
483535
total_tokens=graph_runtime_state.total_tokens,
484536
total_steps=graph_runtime_state.node_run_steps,
@@ -487,7 +539,6 @@ def _process_stream_response(
487539
conversation_id=self._conversation_id,
488540
trace_manager=trace_manager,
489541
)
490-
491542
workflow_finish_resp = self._workflow_finish_to_stream_response(
492543
session=session,
493544
task_id=self._application_generate_entity.task_id,
@@ -496,6 +547,7 @@ def _process_stream_response(
496547
# Save message
497548
self._save_message(session=session, graph_runtime_state=graph_runtime_state)
498549
session.commit()
550+
499551
yield workflow_finish_resp
500552

501553
yield self._message_end_to_stream_response()

0 commit comments

Comments
 (0)