-
Notifications
You must be signed in to change notification settings - Fork 7k
Separate agent controller and server via EventStream #1538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
574ab6b
75d3a0d
428f099
9859eab
01ac7e3
1a94829
d21b17b
b0f28e7
b6b25df
04044fb
e7c4745
3765b74
39f5d1f
ae10589
a1fcba2
2873e85
0fa3806
855d8c1
3d7543e
59c6a02
71fea89
6fb0331
6473272
6296309
175ba48
0f68f47
d3325d6
a4edefa
626a97e
f8954c7
d2523b7
0eb027a
38df5ba
bd9472e
f92cc32
2a7b6e8
f5d48e8
0a41af8
14617eb
c62f6e6
f9b7eaf
5633269
963f2ab
41340ca
1e856ae
a9ef73c
3553e08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
AgentFinishAction, | ||
AgentTalkAction, | ||
ChangeAgentStateAction, | ||
MessageAction, | ||
NullAction, | ||
) | ||
from opendevin.events.event import Event | ||
|
@@ -114,7 +115,9 @@ def update_state_after_step(self): | |
async def add_error_to_history(self, message: str): | ||
await self.add_history(NullAction(), AgentErrorObservation(message)) | ||
|
||
async def add_history(self, action: Action, observation: Observation): | ||
async def add_history( | ||
self, action: Action, observation: Observation, add_to_stream=True | ||
): | ||
if self.state is None: | ||
raise ValueError('Added history while state was None') | ||
if not isinstance(action, Action): | ||
|
@@ -127,8 +130,9 @@ async def add_history(self, action: Action, observation: Observation): | |
) | ||
self.state.history.append((action, observation)) | ||
self.state.updated_info.append((action, observation)) | ||
await self.event_stream.add_event(action, EventSource.AGENT) | ||
await self.event_stream.add_event(observation, EventSource.AGENT) | ||
if add_to_stream: | ||
await self.event_stream.add_event(action, EventSource.AGENT) | ||
await self.event_stream.add_event(observation, EventSource.AGENT) | ||
|
||
async def _run(self): | ||
if self.state is None: | ||
|
@@ -183,6 +187,8 @@ async def on_event(self, event: Event): | |
await self.set_agent_state_to(AgentState.FINISHED) | ||
else: | ||
logger.warning(f'Unknown agent state: {event.agent_state}') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it seems there're several other states in AgentState, like: ERROR/INIT/AWAITING_USER_INPUT, maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call! |
||
elif isinstance(event, MessageAction) and event.source == EventSource.USER: | ||
self._await_user_message_queue.put_nowait(event) | ||
|
||
async def reset_task(self): | ||
if self.agent_task is not None: | ||
|
@@ -217,7 +223,7 @@ def get_agent_state(self): | |
"""Returns the current state of the agent task.""" | ||
return self._agent_state | ||
|
||
async def wait_for_user_input(self) -> UserMessageObservation: | ||
async def wait_for_user_input(self) -> MessageAction: | ||
await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT) | ||
# FIXME: need a way to handle CLI input | ||
user_message_observation = await self._await_user_message_queue.get() | ||
|
@@ -274,12 +280,16 @@ async def step(self, i: int) -> bool: | |
|
||
self.update_state_after_step() | ||
|
||
# whether to await for user messages | ||
if isinstance(action, AgentTalkAction): | ||
# await for the next user messages | ||
user_message_observation = await self.wait_for_user_input() | ||
logger.info(user_message_observation, extra={'msg_type': 'OBSERVATION'}) | ||
await self.add_history(action, user_message_observation) | ||
await self.event_stream.add_event(action, EventSource.AGENT) | ||
user_message = await self.wait_for_user_input() | ||
logger.info(user_message, extra={'msg_type': 'ACTION'}) | ||
# FIXME: we're hacking a message action into a user message observation, for the benefit of CodeAct | ||
await self.add_history( | ||
action, | ||
UserMessageObservation(user_message.content), | ||
add_to_stream=False, | ||
) | ||
enyst marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return False | ||
|
||
finished = isinstance(action, AgentFinishAction) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from dataclasses import dataclass | ||
|
||
from opendevin.core.schema import ActionType | ||
|
||
from .action import Action | ||
|
||
|
||
@dataclass | ||
class MessageAction(Action): | ||
content: str | ||
action: str = ActionType.MESSAGE | ||
|
||
@property | ||
def message(self) -> str: | ||
return self.content |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this if/elif more simplified? now, agent_state == StateX, set_agent_state_to = StateX. maybe an array contains all allowed states, and if event.agent_state is in the array, just set it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had this because the typing was complaining (
event.agent_state
is str instead of AgentState). But I just added a type: ignore 😄