Skip to content

refactor(tools): remove repeated ros2 tools #364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions src/rai/rai/apps/high_level_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@
from langchain_core.messages import BaseMessage, HumanMessage

from rai.agents.conversational_agent import create_conversational_agent
from rai.tools.ros.cli import (
Ros2ActionTool,
Ros2InterfaceTool,
Ros2ServiceTool,
Ros2TopicTool,
)
from rai.tools.ros.cli import ros2_action, ros2_interface, ros2_service, ros2_topic
from rai.utils.model_initialization import get_llm_model


Expand All @@ -37,10 +32,10 @@ class ROS2Agent(Agent):
def __init__(self):
super().__init__()
self.tools = [
Ros2TopicTool(),
Ros2InterfaceTool(),
Ros2ServiceTool(),
Ros2ActionTool(),
ros2_topic,
ros2_interface,
ros2_service,
ros2_action,
]
self.agent = create_conversational_agent(
self.llm, self.tools, "You are a ROS2 expert."
Expand Down
2 changes: 1 addition & 1 deletion src/rai/rai/tools/debugging_assistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from rai.agents.conversational_agent import create_conversational_agent
from rai.agents.integrations.streamlit import get_streamlit_cb, streamlit_invoke
from rai.tools.ros.debugging import (
from rai.tools.ros.cli import (
ros2_action,
ros2_interface,
ros2_node,
Expand Down
18 changes: 14 additions & 4 deletions src/rai/rai/tools/ros/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
# limitations under the License.


from .cli import Ros2InterfaceTool, Ros2ServiceTool, Ros2TopicTool
from .cli import (
ros2_action,
ros2_interface,
ros2_node,
ros2_param,
ros2_service,
ros2_topic,
)
from .native import Ros2BaseInput, Ros2BaseTool
from .tools import (
AddDescribedWaypointToDatabaseTool,
Expand All @@ -22,9 +29,12 @@
)

__all__ = [
"Ros2TopicTool",
"Ros2InterfaceTool",
"Ros2ServiceTool",
"ros2_action",
"ros2_interface",
"ros2_node",
"ros2_topic",
"ros2_param",
"ros2_service",
"Ros2BaseTool",
"Ros2BaseInput",
"AddDescribedWaypointToDatabaseTool",
Expand Down
290 changes: 147 additions & 143 deletions src/rai/rai/tools/ros/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,156 +12,160 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from subprocess import PIPE, Popen
from threading import Timer
from typing import List, Literal, Optional

import subprocess
from typing import Type
from langchain_core.tools import tool

from langchain.tools import BaseTool
from pydantic import BaseModel, Field
FORBIDDEN_CHARACTERS = ["&", ";", "|", "&&", "||", "(", ")", "<", ">", ">>", "<<"]


class Ros2TopicToolInput(BaseModel):
"""Input for the ros2_topic tool."""
def run_with_timeout(cmd: List[str], timeout_sec: int):
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
timer = Timer(timeout_sec, proc.kill)
try:
timer.start()
stdout, stderr = proc.communicate()
return stdout, stderr
finally:
timer.cancel()

command: str = Field(..., description="The command to run")


class Ros2TopicTool(BaseTool):
"""Tool for interacting with ROS2 topics."""

name: str = "Ros2TopicTool"
description: str = """
usage: ros2 topic [-h] [--include-hidden-topics] Call `ros2 topic <command> -h` for more detailed usage. ...

Various topic related sub-commands

options:
-h, --help show this help message and exit
--include-hidden-topics
Consider hidden topics as well

Commands:
bw Display bandwidth used by topic
delay Display delay of topic from timestamp in header
echo Output messages from a topic
find Output a list of available topics of a given type
hz Print the average publishing rate to screen
info Print information about a topic
list Output a list of available topics
pub Publish a message to a topic
type Print a topic's type

Call `ros2 topic <command> -h` for more detailed usage.
"""
args_schema: Type[Ros2TopicToolInput] = Ros2TopicToolInput

def _run(self, command: str):
"""Executes the specified ROS2 topic command."""
result = subprocess.run(
f"ros2 topic {command}", shell=True, capture_output=True, timeout=2
def run_command(cmd: List[str], timeout: int = 5):
# Validate command safety by checking for shell operators
# Block potentially dangerous characters
if any(char in " ".join(cmd) for char in FORBIDDEN_CHARACTERS):
raise ValueError(
"Command is not safe to run. The command contains forbidden characters."
)
return result


class Ros2InterafaceToolInput(BaseModel):
"""Input for the ros2_interface tool."""

command: str = Field(..., description="The command to run")


class Ros2InterfaceTool(BaseTool):

name: str = "Ros2InterfaceTool"

description: str = """
usage: ros2 interface [-h] Call `ros2 interface <command> -h` for more detailed usage. ...

Show information about ROS interfaces

options:
-h, --help show this help message and exit

Commands:
list List all interface types available
package Output a list of available interface types within one package
packages Output a list of packages that provide interfaces
proto Output an interface prototype
show Output the interface definition

Call `ros2 interface <command> -h` for more detailed usage.
stdout, stderr = run_with_timeout(cmd, timeout)
output = {}
if stdout:
output["stdout"] = stdout.decode("utf-8")
else:
output["stdout"] = "Command returned no stdout output"
if stderr:
output["stderr"] = stderr.decode("utf-8")
else:
output["stderr"] = "Command returned no stderr output"
return str(output)


@tool
def ros2_action(
command: Literal["info", "list", "type", "send_goal"],
arguments: Optional[List[str]] = None,
timeout: int = 5,
):
"""Run a ROS2 action command
Args:
command: The action command to run (info/list/type)
arguments: Additional arguments for the command as a list of strings
timeout: Command timeout in seconds
"""

args_schema: Type[Ros2InterafaceToolInput] = Ros2InterafaceToolInput

def _run(self, command: str):
command = f"ros2 interface {command}"
result = subprocess.run(command, shell=True, capture_output=True, timeout=2)
return result


class Ros2ServiceToolInput(BaseModel):
"""Input for the ros2_service tool."""

command: str = Field(..., description="The command to run")


class Ros2ServiceTool(BaseTool):
name: str = "Ros2ServiceTool"

description: str = """
usage: ros2 service [-h] [--include-hidden-services] Call `ros2 service <command> -h` for more detailed usage. ...

Various service related sub-commands

options:
-h, --help show this help message and exit
--include-hidden-services
Consider hidden services as well

Commands:
call Call a service
find Output a list of available services of a given type
list Output a list of available services
type Output a service's type
cmd = ["ros2", "action", command]
if arguments:
cmd.extend(arguments)
return run_command(cmd, timeout)


@tool
def ros2_service(
command: Literal["call", "find", "info", "list", "type"],
arguments: Optional[List[str]] = None,
timeout: int = 5,
):
"""Run a ROS2 service command
Args:
command: The service command to run
arguments: Additional arguments for the command as a list of strings
timeout: Command timeout in seconds
"""

args_schema: Type[Ros2ServiceToolInput] = Ros2ServiceToolInput

def _run(self, command: str):
command = f"ros2 service {command}"
result = subprocess.run(command, shell=True, capture_output=True, timeout=2)
return result


class Ros2ActionToolInput(BaseModel):
"""Input for the ros2_action tool."""

command: str = Field(..., description="The command to run")


class Ros2ActionTool(BaseTool):
name: str = "Ros2ActionTool"

description: str = """
usage: ros2 action [-h] Call `ros2 action <command> -h` for more detailed usage. ...

Various action related sub-commands

options:
-h, --help show this help message and exit

Commands:
info Print information about an action
list Output a list of action names
send_goal Send an action goal
type Print a action's type

Call `ros2 action <command> -h` for more detailed usage.
cmd = ["ros2", "service", command]
if arguments:
cmd.extend(arguments)
return run_command(cmd, timeout)


@tool
def ros2_node(
command: Literal["info", "list"],
arguments: Optional[List[str]] = None,
timeout: int = 5,
):
"""Run a ROS2 node command
Args:
command: The node command to run
arguments: Additional arguments for the command as a list of strings
timeout: Command timeout in seconds
"""

args_schema: Type[Ros2ActionToolInput] = Ros2ActionToolInput

def _run(self, command: str):
command = f"ros2 action {command}"
result = subprocess.run(command, shell=True, capture_output=True)
return result
cmd = ["ros2", "node", command]
if arguments:
cmd.extend(arguments)
return run_command(cmd, timeout)


@tool
def ros2_param(
command: Literal["delete", "describe", "dump", "get", "list", "set"],
arguments: Optional[List[str]] = None,
timeout: int = 5,
):
"""Run a ROS2 parameter command
Args:
command: The parameter command to run
arguments: Additional arguments for the command as a list of strings
timeout: Command timeout in seconds
"""
cmd = ["ros2", "param", command]
if arguments:
cmd.extend(arguments)
return run_command(cmd, timeout)


@tool
def ros2_interface(
command: Literal["list", "package", "packages", "proto", "show"],
arguments: Optional[List[str]] = None,
timeout: int = 5,
):
"""Run a ROS2 interface command
Args:
command: The interface command to run
arguments: Additional arguments for the command as a list of strings
timeout: Command timeout in seconds
"""
cmd = ["ros2", "interface", command]
if arguments:
cmd.extend(arguments)
return run_command(cmd, timeout)


@tool
def ros2_topic(
command: Literal[
"bw", "delay", "echo", "find", "hz", "info", "list", "pub", "type"
],
arguments: Optional[List[str]] = None,
timeout: int = 5,
):
"""Run a ROS2 topic command
Args:
command: The topic command to run:
- bw: Display bandwidth used by topic
- delay: Display delay of topic from timestamp in header
- echo: Output messages from a topic
- find: Output a list of available topics of a given type
- hz: Print the average publishing rate to screen
- info: Print information about a topic
- list: Output a list of available topics
- pub: Publish a message to a topic
- type: Print a topic's type
arguments: Additional arguments for the command as a list of strings
timeout: Command timeout in seconds
"""
cmd = ["ros2", "topic", command]
if arguments:
cmd.extend(arguments)
return run_command(cmd, timeout)
Loading