Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event synchronously #2700

Merged
merged 9 commits into from
Jul 4, 2024
8 changes: 4 additions & 4 deletions opendevin/controller/agent_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def report_error(self, message: str, exception: Exception | None = None):
self.state.error = message
if exception:
self.state.error += f': {exception}'
await self.event_stream.add_event(ErrorObservation(message), EventSource.AGENT)
self.event_stream.add_event(ErrorObservation(message), EventSource.AGENT)

async def add_history(self, action: Action, observation: Observation):
if isinstance(action, NullAction) and isinstance(observation, NullObservation):
Expand Down Expand Up @@ -193,7 +193,7 @@ async def set_agent_state_to(self, new_state: AgentState):
if new_state == AgentState.STOPPED or new_state == AgentState.ERROR:
self.reset_task()

await self.event_stream.add_event(
self.event_stream.add_event(
AgentStateChangedObservation('', self.state.agent_state), EventSource.AGENT
)

Expand Down Expand Up @@ -281,7 +281,7 @@ async def _step(self):
# clean up delegate status
self.delegate = None
self.delegateAction = None
await self.event_stream.add_event(obs, EventSource.AGENT)
self.event_stream.add_event(obs, EventSource.AGENT)
return

logger.info(
Expand Down Expand Up @@ -325,7 +325,7 @@ async def _step(self):
await self.add_history(action, NullObservation(''))

if not isinstance(action, NullAction):
await self.event_stream.add_event(action, EventSource.AGENT)
self.event_stream.add_event(action, EventSource.AGENT)

await self.update_state_after_step()

Expand Down
4 changes: 4 additions & 0 deletions opendevin/core/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ def get_console_handler():
"""
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
if config.debug:
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(console_formatter)
return console_handler

Expand Down Expand Up @@ -157,6 +159,8 @@ def log_uncaught_exceptions(ex_cls, ex, tb):

opendevin_logger = logging.getLogger('opendevin')
opendevin_logger.setLevel(logging.INFO)
if config.debug:
opendevin_logger.setLevel(logging.DEBUG)
opendevin_logger.addHandler(get_file_handler())
opendevin_logger.addHandler(get_console_handler())
opendevin_logger.addFilter(SensitiveDataFilter(opendevin_logger.name))
Expand Down
4 changes: 2 additions & 2 deletions opendevin/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def main(
task = f.read()
logger.info(f'Dynamic Eval task: {task}')

await event_stream.add_event(MessageAction(content=task), EventSource.USER)
event_stream.add_event(MessageAction(content=task), EventSource.USER)

async def on_event(event: Event):
if isinstance(event, AgentStateChangedObservation):
Expand All @@ -122,7 +122,7 @@ async def on_event(event: Event):
else:
message = fake_user_response_fn(controller.get_state())
action = MessageAction(content=message)
await event_stream.add_event(action, EventSource.USER)
event_stream.add_event(action, EventSource.USER)

event_stream.subscribe(EventStreamSubscriber.MAIN, on_event)
while controller.get_agent_state() not in [
Expand Down
12 changes: 6 additions & 6 deletions opendevin/events/stream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import threading
from datetime import datetime
from enum import Enum
from typing import Callable, Iterable
Expand All @@ -25,15 +26,15 @@ class EventStream:
# when there are agent delegates
_subscribers: dict[str, list[Callable]]
_cur_id: int
_lock: asyncio.Lock
_lock: threading.Lock
_file_store: FileStore

def __init__(self, sid: str):
self.sid = sid
self._file_store = get_file_store()
self._subscribers = {}
self._cur_id = 0
self._lock = asyncio.Lock()
self._lock = threading.Lock()
self._reinitialize_from_file_store()

def _reinitialize_from_file_store(self):
Expand Down Expand Up @@ -93,10 +94,9 @@ def unsubscribe(self, id: EventStreamSubscriber):
if len(self._subscribers[id]) == 0:
del self._subscribers[id]

# TODO: make this not async
async def add_event(self, event: Event, source: EventSource):
def add_event(self, event: Event, source: EventSource):
logger.debug(f'Adding event {event} from {source}')
async with self._lock:
with self._lock:
event._id = self._cur_id # type: ignore [attr-defined]
self._cur_id += 1
event._timestamp = datetime.now() # type: ignore [attr-defined]
Expand All @@ -109,4 +109,4 @@ async def add_event(self, event: Event, source: EventSource):
for stack in self._subscribers.values():
callback = stack[-1]
logger.debug(f'Notifying subscriber {callback} of event {event}')
await callback(event)
asyncio.create_task(callback(event))
4 changes: 2 additions & 2 deletions opendevin/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async def on_event(self, event: Event) -> None:
observation = await self.run_action(event)
observation._cause = event.id # type: ignore[attr-defined]
source = event.source if event.source else EventSource.AGENT
await self.event_stream.add_event(observation, source)
self.event_stream.add_event(observation, source)

async def run_action(self, action: Action) -> Observation:
"""
Expand Down Expand Up @@ -149,7 +149,7 @@ async def submit_background_obs(self):
for _id, cmd in self.sandbox.background_commands.items():
output = cmd.read_logs()
if output:
await self.event_stream.add_event(
self.event_stream.add_event(
CmdOutputObservation(
content=output, command_id=_id, command=cmd.command
),
Expand Down
8 changes: 4 additions & 4 deletions opendevin/server/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ async def loop_recv(self):
logger.exception('Error in loop_recv: %s', e)

async def _initialize_agent(self, data: dict):
await self.agent_session.event_stream.add_event(
self.agent_session.event_stream.add_event(
ChangeAgentStateAction(AgentState.LOADING), EventSource.USER
)
await self.agent_session.event_stream.add_event(
self.agent_session.event_stream.add_event(
AgentStateChangedObservation('', AgentState.LOADING), EventSource.AGENT
)
try:
Expand All @@ -75,7 +75,7 @@ async def _initialize_agent(self, data: dict):
f'Error creating controller. Please check Docker is running and visit `{TROUBLESHOOTING_URL}` for more debugging information..'
)
return
await self.agent_session.event_stream.add_event(
self.agent_session.event_stream.add_event(
ChangeAgentStateAction(AgentState.INIT), EventSource.USER
)

Expand All @@ -102,7 +102,7 @@ async def dispatch(self, data: dict):
await self._initialize_agent(data)
return
event = event_from_dict(data.copy())
await self.agent_session.event_stream.add_event(event, EventSource.USER)
self.agent_session.event_stream.add_event(event, EventSource.USER)

async def send(self, data: dict[str, object]) -> bool:
try:
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_event_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ def collect_events(stream):
@pytest.mark.asyncio
async def test_basic_flow():
stream = EventStream('abc')
await stream.add_event(NullAction(), EventSource.AGENT)
stream.add_event(NullAction(), EventSource.AGENT)
assert len(collect_events(stream)) == 1


@pytest.mark.asyncio
async def test_stream_storage():
stream = EventStream('def')
await stream.add_event(NullObservation(''), EventSource.AGENT)
stream.add_event(NullObservation(''), EventSource.AGENT)
assert len(collect_events(stream)) == 1
content = stream._file_store.read('sessions/def/events/0.json')
assert content is not None
Expand All @@ -41,8 +41,8 @@ async def test_stream_storage():
@pytest.mark.asyncio
async def test_rehydration():
stream1 = EventStream('es1')
await stream1.add_event(NullObservation('obs1'), EventSource.AGENT)
await stream1.add_event(NullObservation('obs2'), EventSource.AGENT)
stream1.add_event(NullObservation('obs1'), EventSource.AGENT)
stream1.add_event(NullObservation('obs2'), EventSource.AGENT)
assert len(collect_events(stream1)) == 2

stream2 = EventStream('es2')
Expand Down