Skip to content

Feature/newnew workflow loop node #14863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
babf826
feat: loop node
arkunzz Mar 3, 2025
8c0ffb9
feat: Loop and Sleep nodes.
Woo0ood Jan 8, 2025
733e7a3
feat: remove `Sleep` node.
Woo0ood Jan 20, 2025
f741e7a
feat: improve and fix part of `Loop` node logic.
Woo0ood Jan 20, 2025
d3a1f89
fix: temporarily hide error handling field in `Loop` node panel.
Woo0ood Jan 24, 2025
8256029
fix: correct nested level in tracking logs.
Woo0ood Jan 24, 2025
f6aa1f0
chore: remove some temporarily unused code.
Woo0ood Jan 24, 2025
5b2ac1e
fix: correct the sorting issue of Tracing information.
Woo0ood Jan 26, 2025
3e18b5d
fix: resolve issue where Loop node on-step-run could not view Loop de…
Woo0ood Jan 26, 2025
8d910c7
fix: prevent Loop and Iteration nodes from nesting within each other.
Woo0ood Jan 26, 2025
cc97abd
feat: add missing i18n items.
Woo0ood Jan 26, 2025
5b8d71c
feat: update Loop node icon.
Woo0ood Jan 26, 2025
e1df399
fix: fixed ESLint errors.
Woo0ood Feb 5, 2025
2c0e2c1
chore: merge code for version 1.0.0.
Woo0ood Mar 3, 2025
712b953
fix: correct log format issue.
Woo0ood Mar 3, 2025
1aa0d1c
fix: loop node metadata
arkunzz Mar 4, 2025
5962b55
fix: loop node import
arkunzz Mar 4, 2025
93842ee
fix: correct log format issue.
Woo0ood Mar 4, 2025
3e75912
chore: remove unnecessary yarn.lock file.
Woo0ood Mar 4, 2025
d96fee1
feat: update i18n.
Woo0ood Mar 4, 2025
c3c7dea
feat: update i18n.
Woo0ood Mar 4, 2025
93b614b
chore: reformatted.
Woo0ood Mar 4, 2025
90c89bf
feat: support re-selection of variables for loop termination condition.
Woo0ood Mar 4, 2025
6588bfd
fix: fix tracking display order issue.
Woo0ood Mar 4, 2025
d6169e0
feat: configure maximum loop count using environment variable.
Woo0ood Mar 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions api/controllers/console/app/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,80 @@ def post(self, app_model: App, node_id: str):
raise InternalServerError()


class AdvancedChatDraftRunLoopNodeApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT])
def post(self, app_model: App, node_id: str):
"""
Run draft workflow loop node
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()

if not isinstance(current_user, Account):
raise Forbidden()

parser = reqparse.RequestParser()
parser.add_argument("inputs", type=dict, location="json")
args = parser.parse_args()

try:
response = AppGenerateService.generate_single_loop(
app_model=app_model, user=current_user, node_id=node_id, args=args, streaming=True
)

return helper.compact_generate_response(response)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
raise ConversationCompletedError()
except ValueError as e:
raise e
except Exception:
logging.exception("internal server error.")
raise InternalServerError()


class WorkflowDraftRunLoopNodeApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
"""
Run draft workflow loop node
"""
# The role of the current user in the ta table must be admin, owner, or editor
if not current_user.is_editor:
raise Forbidden()

if not isinstance(current_user, Account):
raise Forbidden()

parser = reqparse.RequestParser()
parser.add_argument("inputs", type=dict, location="json")
args = parser.parse_args()

try:
response = AppGenerateService.generate_single_loop(
app_model=app_model, user=current_user, node_id=node_id, args=args, streaming=True
)

return helper.compact_generate_response(response)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.conversation.ConversationCompletedError:
raise ConversationCompletedError()
except ValueError as e:
raise e
except Exception:
logging.exception("internal server error.")
raise InternalServerError()


class DraftWorkflowRunApi(Resource):
@setup_required
@login_required
Expand Down Expand Up @@ -512,6 +586,11 @@ def get(self, app_model: App):
api.add_resource(
WorkflowDraftRunIterationNodeApi, "/apps/<uuid:app_id>/workflows/draft/iteration/nodes/<string:node_id>/run"
)
api.add_resource(
AdvancedChatDraftRunLoopNodeApi,
"/apps/<uuid:app_id>/advanced-chat/workflows/draft/loop/nodes/<string:node_id>/run",
)
api.add_resource(WorkflowDraftRunLoopNodeApi, "/apps/<uuid:app_id>/workflows/draft/loop/nodes/<string:node_id>/run")
api.add_resource(PublishedWorkflowApi, "/apps/<uuid:app_id>/workflows/publish")
api.add_resource(PublishedAllWorkflowApi, "/apps/<uuid:app_id>/workflows")
api.add_resource(DefaultBlockConfigsApi, "/apps/<uuid:app_id>/workflows/default-workflow-block-configs")
Expand Down
55 changes: 55 additions & 0 deletions api/core/app/apps/advanced_chat/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,61 @@ def single_iteration_generate(
stream=streaming,
)

def single_loop_generate(
self,
app_model: App,
workflow: Workflow,
node_id: str,
user: Account | EndUser,
args: Mapping,
streaming: bool = True,
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], Any, None]:
"""
Generate App response.

:param app_model: App
:param workflow: Workflow
:param user: account or end user
:param args: request args
:param invoke_from: invoke from source
:param stream: is stream
"""
if not node_id:
raise ValueError("node_id is required")

if args.get("inputs") is None:
raise ValueError("inputs is required")

# convert to app config
app_config = AdvancedChatAppConfigManager.get_app_config(app_model=app_model, workflow=workflow)

# init application generate entity
application_generate_entity = AdvancedChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
conversation_id=None,
inputs={},
query="",
files=[],
user_id=user.id,
stream=streaming,
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
single_loop_run=AdvancedChatAppGenerateEntity.SingleLoopRunEntity(node_id=node_id, inputs=args["inputs"]),
)
contexts.tenant_id.set(application_generate_entity.app_config.tenant_id)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())

return self._generate(
workflow=workflow,
user=user,
invoke_from=InvokeFrom.DEBUGGER,
application_generate_entity=application_generate_entity,
conversation=None,
stream=streaming,
)

def _generate(
self,
*,
Expand Down
7 changes: 7 additions & 0 deletions api/core/app/apps/advanced_chat/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ def run(self) -> None:
node_id=self.application_generate_entity.single_iteration_run.node_id,
user_inputs=dict(self.application_generate_entity.single_iteration_run.inputs),
)
elif self.application_generate_entity.single_loop_run:
# if only single loop run is requested
graph, variable_pool = self._get_graph_and_variable_pool_of_single_loop(
workflow=workflow,
node_id=self.application_generate_entity.single_loop_run.node_id,
user_inputs=dict(self.application_generate_entity.single_loop_run.inputs),
)
else:
inputs = self.application_generate_entity.inputs
query = self.application_generate_entity.query
Expand Down
60 changes: 59 additions & 1 deletion api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
QueueIterationCompletedEvent,
QueueIterationNextEvent,
QueueIterationStartEvent,
QueueLoopCompletedEvent,
QueueLoopNextEvent,
QueueLoopStartEvent,
QueueMessageReplaceEvent,
QueueNodeExceptionEvent,
QueueNodeFailedEvent,
QueueNodeInIterationFailedEvent,
QueueNodeInLoopFailedEvent,
QueueNodeRetryEvent,
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
Expand Down Expand Up @@ -372,7 +376,13 @@ def _process_stream_response(

if node_finish_resp:
yield node_finish_resp
elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent):
elif isinstance(
event,
QueueNodeFailedEvent
| QueueNodeInIterationFailedEvent
| QueueNodeInLoopFailedEvent
| QueueNodeExceptionEvent,
):
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed(
session=session, event=event
Expand Down Expand Up @@ -472,6 +482,54 @@ def _process_stream_response(
)

yield iter_finish_resp
elif isinstance(event, QueueLoopStartEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
loop_start_resp = self._workflow_cycle_manager._workflow_loop_start_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)

yield loop_start_resp
elif isinstance(event, QueueLoopNextEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
loop_next_resp = self._workflow_cycle_manager._workflow_loop_next_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)

yield loop_next_resp
elif isinstance(event, QueueLoopCompletedEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")

with Session(db.engine, expire_on_commit=False) as session:
workflow_run = self._workflow_cycle_manager._get_workflow_run(
session=session, workflow_run_id=self._workflow_run_id
)
loop_finish_resp = self._workflow_cycle_manager._workflow_loop_completed_to_stream_response(
session=session,
task_id=self._application_generate_entity.task_id,
workflow_run=workflow_run,
event=event,
)

yield loop_finish_resp
elif isinstance(event, QueueWorkflowSucceededEvent):
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
Expand Down
54 changes: 54 additions & 0 deletions api/core/app/apps/workflow/app_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,60 @@ def single_iteration_generate(
streaming=streaming,
)

def single_loop_generate(
self,
app_model: App,
workflow: Workflow,
node_id: str,
user: Account | EndUser,
args: Mapping[str, Any],
streaming: bool = True,
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], None, None]:
"""
Generate App response.

:param app_model: App
:param workflow: Workflow
:param user: account or end user
:param args: request args
:param invoke_from: invoke from source
:param stream: is stream
"""
if not node_id:
raise ValueError("node_id is required")

if args.get("inputs") is None:
raise ValueError("inputs is required")

# convert to app config
app_config = WorkflowAppConfigManager.get_app_config(app_model=app_model, workflow=workflow)

# init application generate entity
application_generate_entity = WorkflowAppGenerateEntity(
task_id=str(uuid.uuid4()),
app_config=app_config,
inputs={},
files=[],
user_id=user.id,
stream=streaming,
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
single_loop_run=WorkflowAppGenerateEntity.SingleLoopRunEntity(node_id=node_id, inputs=args["inputs"]),
workflow_run_id=str(uuid.uuid4()),
)
contexts.tenant_id.set(application_generate_entity.app_config.tenant_id)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())

return self._generate(
app_model=app_model,
workflow=workflow,
user=user,
invoke_from=InvokeFrom.DEBUGGER,
application_generate_entity=application_generate_entity,
streaming=streaming,
)

def _generate_worker(
self,
flask_app: Flask,
Expand Down
7 changes: 7 additions & 0 deletions api/core/app/apps/workflow/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ def run(self) -> None:
node_id=self.application_generate_entity.single_iteration_run.node_id,
user_inputs=self.application_generate_entity.single_iteration_run.inputs,
)
elif self.application_generate_entity.single_loop_run:
# if only single loop run is requested
graph, variable_pool = self._get_graph_and_variable_pool_of_single_loop(
workflow=workflow,
node_id=self.application_generate_entity.single_loop_run.node_id,
user_inputs=self.application_generate_entity.single_loop_run.inputs,
)
else:
inputs = self.application_generate_entity.inputs
files = self.application_generate_entity.files
Expand Down
Loading
Loading