Skip to content

Commit d4b8482

Browse files
authored
fix: apply gevent threading patch early and ensure unique workflow node execution IDs (#12196)
Signed-off-by: -LAN- <[email protected]>
1 parent fc29f20 commit d4b8482

File tree

4 files changed

+16
-43
lines changed

4 files changed

+16
-43
lines changed

api/app.py

+13-9
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
1-
from libs import version_utils
2-
3-
# preparation before creating app
4-
version_utils.check_supported_python_version()
1+
import os
2+
import sys
53

64

75
def is_db_command():
8-
import sys
9-
106
if len(sys.argv) > 1 and sys.argv[0].endswith("flask") and sys.argv[1] == "db":
117
return True
128
return False
@@ -18,10 +14,18 @@ def is_db_command():
1814

1915
app = create_migrations_app()
2016
else:
21-
from app_factory import create_app
22-
from libs import threadings_utils
17+
if os.environ.get("FLASK_DEBUG", "False") != "True":
18+
from gevent import monkey # type: ignore
19+
20+
# gevent
21+
monkey.patch_all()
2322

24-
threadings_utils.apply_gevent_threading_patch()
23+
from grpc.experimental import gevent as grpc_gevent # type: ignore
24+
25+
# grpc gevent
26+
grpc_gevent.init_gevent()
27+
28+
from app_factory import create_app
2529

2630
app = create_app()
2731
celery = app.extensions["celery"]

api/core/app/task_pipeline/workflow_cycle_manage.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ def _handle_node_execution_start(
274274
self, *, session: Session, workflow_run: WorkflowRun, event: QueueNodeStartedEvent
275275
) -> WorkflowNodeExecution:
276276
workflow_node_execution = WorkflowNodeExecution()
277-
workflow_node_execution.id = event.node_execution_id
277+
workflow_node_execution.id = str(uuid4())
278278
workflow_node_execution.tenant_id = workflow_run.tenant_id
279279
workflow_node_execution.app_id = workflow_run.app_id
280280
workflow_node_execution.workflow_id = workflow_run.workflow_id
@@ -391,7 +391,7 @@ def _handle_workflow_node_execution_retried(
391391
execution_metadata = json.dumps(merged_metadata)
392392

393393
workflow_node_execution = WorkflowNodeExecution()
394-
workflow_node_execution.id = event.node_execution_id
394+
workflow_node_execution.id = str(uuid4())
395395
workflow_node_execution.tenant_id = workflow_run.tenant_id
396396
workflow_node_execution.app_id = workflow_run.app_id
397397
workflow_node_execution.workflow_id = workflow_run.workflow_id
@@ -824,7 +824,7 @@ def _get_workflow_run(self, *, session: Session, workflow_run_id: str) -> Workfl
824824
return workflow_run
825825

826826
def _get_workflow_node_execution(self, session: Session, node_execution_id: str) -> WorkflowNodeExecution:
827-
stmt = select(WorkflowNodeExecution).where(WorkflowNodeExecution.id == node_execution_id)
827+
stmt = select(WorkflowNodeExecution).where(WorkflowNodeExecution.node_execution_id == node_execution_id)
828828
workflow_node_execution = session.scalar(stmt)
829829
if not workflow_node_execution:
830830
raise WorkflowNodeExecutionNotFoundError(node_execution_id)

api/libs/threadings_utils.py

-19
This file was deleted.

api/libs/version_utils.py

-12
This file was deleted.

0 commit comments

Comments
 (0)