|
| 1 | +from typing import List, Type |
| 2 | + |
| 3 | +from langchain_aws import ChatBedrock |
| 4 | +from langchain_core.language_models.chat_models import BaseChatModel |
| 5 | +from langchain_core.messages import SystemMessage |
| 6 | +from langchain_core.pydantic_v1 import BaseModel, Field |
| 7 | +from langchain_core.tools import BaseTool |
| 8 | + |
| 9 | +from rai.communication.ros_communication import TF2TransformFetcher |
| 10 | +from rai.scenario_engine.messages import AgentLoop, HumanMultimodalMessage |
| 11 | +from rai.scenario_engine.scenario_engine import ScenarioPartType, ScenarioRunner |
| 12 | +from rai.scenario_engine.tool_runner import run_requested_tools |
| 13 | +from rai.tools.ros.cat_demo_tools import FinishTool |
| 14 | +from rai.tools.ros.cli_tools import Ros2TopicTool, SetGoalPoseTool |
| 15 | +from rai.tools.ros.tools import ( |
| 16 | + GetCameraImageTool, |
| 17 | + GetOccupancyGridTool, |
| 18 | + SetWaypointTool, |
| 19 | +) |
| 20 | + |
| 21 | + |
| 22 | +class DescribeAreaToolInput(BaseModel): |
| 23 | + """Input for the describe_area tool.""" |
| 24 | + |
| 25 | + image_topic: str = Field(..., description="ROS2 image topic to subscribe to") |
| 26 | + |
| 27 | + |
| 28 | +class DescribeAreaTool(BaseTool): |
| 29 | + """Describe the area""" |
| 30 | + |
| 31 | + name: str = "DescribeAreaTool" |
| 32 | + description: str = "A tool for describing the area around the robot." |
| 33 | + args_schema: Type[DescribeAreaToolInput] = DescribeAreaToolInput |
| 34 | + |
| 35 | + llm: BaseChatModel # without tools |
| 36 | + system_message: SystemMessage |
| 37 | + |
| 38 | + def _run(self, image_topic: str): |
| 39 | + get_camera_image_tool = GetCameraImageTool() |
| 40 | + set_waypoint_tool = SetWaypointTool() |
| 41 | + |
| 42 | + current_position = TF2TransformFetcher().get_data() |
| 43 | + image = get_camera_image_tool.run(image_topic)["images"] |
| 44 | + llm_with_tools = self.llm.bind_tools([set_waypoint_tool]) # type: ignore |
| 45 | + human_message = HumanMultimodalMessage( |
| 46 | + content=f"Describe the area around the robot (area, not items). Reason how would you name the room you are currently in" |
| 47 | + f". Use available tooling. Your current position is: {current_position}", |
| 48 | + images=image, |
| 49 | + ) |
| 50 | + messages = [self.system_message, human_message] |
| 51 | + ai_msg = llm_with_tools.invoke(messages) |
| 52 | + messages.append(ai_msg) |
| 53 | + run_requested_tools( |
| 54 | + ai_msg, [set_waypoint_tool], messages, llm_type="bedrock" |
| 55 | + ) # TODO(@maciejmajek): fix this |
| 56 | + return "Description of the area completed." |
| 57 | + |
| 58 | + |
| 59 | +DESCRIBER_PROMPT = """ |
| 60 | +You are an expert in describing the environment around you. Your main goal is to describe the area based on what you see in the image. |
| 61 | +""" |
| 62 | + |
| 63 | + |
| 64 | +def main(): |
| 65 | + simple_llm = ChatBedrock( |
| 66 | + model_id="anthropic.claude-3-haiku-20240307-v1:0", region_name="us-west-2" # type: ignore |
| 67 | + ) |
| 68 | + tools = [ |
| 69 | + GetOccupancyGridTool(), |
| 70 | + SetGoalPoseTool(), |
| 71 | + Ros2TopicTool(), |
| 72 | + DescribeAreaTool( |
| 73 | + llm=simple_llm, system_message=SystemMessage(content=DESCRIBER_PROMPT) |
| 74 | + ), |
| 75 | + FinishTool(), |
| 76 | + ] |
| 77 | + |
| 78 | + scenario: List[ScenarioPartType] = [ |
| 79 | + SystemMessage( |
| 80 | + content="You are an autonomous agent. Your main goal is to fulfill the user's requests. " |
| 81 | + "Do not make assumptions about the environment you are currently in. " |
| 82 | + "Use the tooling provided to gather information about the environment." |
| 83 | + ), |
| 84 | + HumanMultimodalMessage( |
| 85 | + content="Describe your surroundings and gather more information as needed. " |
| 86 | + "Move to explore further, aiming for clear areas near the robot (red arrow). Make sure to describe the area during movement." |
| 87 | + "Utilize available methods to obtain the map and identify relevant data streams. " |
| 88 | + "Before starting the exploration, find out what kind of tooling is available and based on that plan your exploration." |
| 89 | + ), |
| 90 | + AgentLoop(stop_action=FinishTool().__class__.__name__, stop_iters=10), |
| 91 | + ] |
| 92 | + |
| 93 | + llm = ChatBedrock(model_id="anthropic.claude-3-5-sonnet-20240620-v1:0", region_name="us-west-2") # type: ignore |
| 94 | + runner = ScenarioRunner(scenario, llm=llm, tools=tools, llm_type="bedrock") |
| 95 | + runner.run() |
| 96 | + runner.save_to_html() |
| 97 | + |
| 98 | + |
| 99 | +if __name__ == "__main__": |
| 100 | + main() |
0 commit comments