|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
| 15 | +from subprocess import PIPE, Popen |
| 16 | +from threading import Timer |
| 17 | +from typing import List, Literal, Optional |
15 | 18 |
|
16 |
| -import subprocess |
17 |
| -from typing import Type |
| 19 | +from langchain_core.tools import tool |
18 | 20 |
|
19 |
| -from langchain.tools import BaseTool |
20 |
| -from pydantic import BaseModel, Field |
| 21 | +FORBIDDEN_CHARACTERS = ["&", ";", "|", "&&", "||", "(", ")", "<", ">", ">>", "<<"] |
21 | 22 |
|
22 | 23 |
|
23 |
| -class Ros2TopicToolInput(BaseModel): |
24 |
| - """Input for the ros2_topic tool.""" |
| 24 | +def run_with_timeout(cmd: List[str], timeout_sec: int): |
| 25 | + proc = Popen(cmd, stdout=PIPE, stderr=PIPE) |
| 26 | + timer = Timer(timeout_sec, proc.kill) |
| 27 | + try: |
| 28 | + timer.start() |
| 29 | + stdout, stderr = proc.communicate() |
| 30 | + return stdout, stderr |
| 31 | + finally: |
| 32 | + timer.cancel() |
25 | 33 |
|
26 |
| - command: str = Field(..., description="The command to run") |
27 | 34 |
|
28 |
| - |
29 |
| -class Ros2TopicTool(BaseTool): |
30 |
| - """Tool for interacting with ROS2 topics.""" |
31 |
| - |
32 |
| - name: str = "Ros2TopicTool" |
33 |
| - description: str = """ |
34 |
| - usage: ros2 topic [-h] [--include-hidden-topics] Call `ros2 topic <command> -h` for more detailed usage. ... |
35 |
| -
|
36 |
| - Various topic related sub-commands |
37 |
| -
|
38 |
| - options: |
39 |
| - -h, --help show this help message and exit |
40 |
| - --include-hidden-topics |
41 |
| - Consider hidden topics as well |
42 |
| -
|
43 |
| - Commands: |
44 |
| - bw Display bandwidth used by topic |
45 |
| - delay Display delay of topic from timestamp in header |
46 |
| - echo Output messages from a topic |
47 |
| - find Output a list of available topics of a given type |
48 |
| - hz Print the average publishing rate to screen |
49 |
| - info Print information about a topic |
50 |
| - list Output a list of available topics |
51 |
| - pub Publish a message to a topic |
52 |
| - type Print a topic's type |
53 |
| -
|
54 |
| - Call `ros2 topic <command> -h` for more detailed usage. |
55 |
| - """ |
56 |
| - args_schema: Type[Ros2TopicToolInput] = Ros2TopicToolInput |
57 |
| - |
58 |
| - def _run(self, command: str): |
59 |
| - """Executes the specified ROS2 topic command.""" |
60 |
| - result = subprocess.run( |
61 |
| - f"ros2 topic {command}", shell=True, capture_output=True, timeout=2 |
| 35 | +def run_command(cmd: List[str], timeout: int = 5): |
| 36 | + # Validate command safety by checking for shell operators |
| 37 | + # Block potentially dangerous characters |
| 38 | + if any(char in " ".join(cmd) for char in FORBIDDEN_CHARACTERS): |
| 39 | + raise ValueError( |
| 40 | + "Command is not safe to run. The command contains forbidden characters." |
62 | 41 | )
|
63 |
| - return result |
64 |
| - |
65 |
| - |
66 |
| -class Ros2InterafaceToolInput(BaseModel): |
67 |
| - """Input for the ros2_interface tool.""" |
68 |
| - |
69 |
| - command: str = Field(..., description="The command to run") |
70 |
| - |
71 |
| - |
72 |
| -class Ros2InterfaceTool(BaseTool): |
73 |
| - |
74 |
| - name: str = "Ros2InterfaceTool" |
75 |
| - |
76 |
| - description: str = """ |
77 |
| - usage: ros2 interface [-h] Call `ros2 interface <command> -h` for more detailed usage. ... |
78 |
| -
|
79 |
| - Show information about ROS interfaces |
80 |
| -
|
81 |
| - options: |
82 |
| - -h, --help show this help message and exit |
83 |
| -
|
84 |
| - Commands: |
85 |
| - list List all interface types available |
86 |
| - package Output a list of available interface types within one package |
87 |
| - packages Output a list of packages that provide interfaces |
88 |
| - proto Output an interface prototype |
89 |
| - show Output the interface definition |
90 |
| -
|
91 |
| - Call `ros2 interface <command> -h` for more detailed usage. |
| 42 | + stdout, stderr = run_with_timeout(cmd, timeout) |
| 43 | + output = {} |
| 44 | + if stdout: |
| 45 | + output["stdout"] = stdout.decode("utf-8") |
| 46 | + else: |
| 47 | + output["stdout"] = "Command returned no stdout output" |
| 48 | + if stderr: |
| 49 | + output["stderr"] = stderr.decode("utf-8") |
| 50 | + else: |
| 51 | + output["stderr"] = "Command returned no stderr output" |
| 52 | + return str(output) |
| 53 | + |
| 54 | + |
| 55 | +@tool |
| 56 | +def ros2_action( |
| 57 | + command: Literal["info", "list", "type", "send_goal"], |
| 58 | + arguments: Optional[List[str]] = None, |
| 59 | + timeout: int = 5, |
| 60 | +): |
| 61 | + """Run a ROS2 action command |
| 62 | + Args: |
| 63 | + command: The action command to run (info/list/type) |
| 64 | + arguments: Additional arguments for the command as a list of strings |
| 65 | + timeout: Command timeout in seconds |
92 | 66 | """
|
93 |
| - |
94 |
| - args_schema: Type[Ros2InterafaceToolInput] = Ros2InterafaceToolInput |
95 |
| - |
96 |
| - def _run(self, command: str): |
97 |
| - command = f"ros2 interface {command}" |
98 |
| - result = subprocess.run(command, shell=True, capture_output=True, timeout=2) |
99 |
| - return result |
100 |
| - |
101 |
| - |
102 |
| -class Ros2ServiceToolInput(BaseModel): |
103 |
| - """Input for the ros2_service tool.""" |
104 |
| - |
105 |
| - command: str = Field(..., description="The command to run") |
106 |
| - |
107 |
| - |
108 |
| -class Ros2ServiceTool(BaseTool): |
109 |
| - name: str = "Ros2ServiceTool" |
110 |
| - |
111 |
| - description: str = """ |
112 |
| - usage: ros2 service [-h] [--include-hidden-services] Call `ros2 service <command> -h` for more detailed usage. ... |
113 |
| -
|
114 |
| - Various service related sub-commands |
115 |
| -
|
116 |
| - options: |
117 |
| - -h, --help show this help message and exit |
118 |
| - --include-hidden-services |
119 |
| - Consider hidden services as well |
120 |
| -
|
121 |
| - Commands: |
122 |
| - call Call a service |
123 |
| - find Output a list of available services of a given type |
124 |
| - list Output a list of available services |
125 |
| - type Output a service's type |
| 67 | + cmd = ["ros2", "action", command] |
| 68 | + if arguments: |
| 69 | + cmd.extend(arguments) |
| 70 | + return run_command(cmd, timeout) |
| 71 | + |
| 72 | + |
| 73 | +@tool |
| 74 | +def ros2_service( |
| 75 | + command: Literal["call", "find", "info", "list", "type"], |
| 76 | + arguments: Optional[List[str]] = None, |
| 77 | + timeout: int = 5, |
| 78 | +): |
| 79 | + """Run a ROS2 service command |
| 80 | + Args: |
| 81 | + command: The service command to run |
| 82 | + arguments: Additional arguments for the command as a list of strings |
| 83 | + timeout: Command timeout in seconds |
126 | 84 | """
|
127 |
| - |
128 |
| - args_schema: Type[Ros2ServiceToolInput] = Ros2ServiceToolInput |
129 |
| - |
130 |
| - def _run(self, command: str): |
131 |
| - command = f"ros2 service {command}" |
132 |
| - result = subprocess.run(command, shell=True, capture_output=True, timeout=2) |
133 |
| - return result |
134 |
| - |
135 |
| - |
136 |
| -class Ros2ActionToolInput(BaseModel): |
137 |
| - """Input for the ros2_action tool.""" |
138 |
| - |
139 |
| - command: str = Field(..., description="The command to run") |
140 |
| - |
141 |
| - |
142 |
| -class Ros2ActionTool(BaseTool): |
143 |
| - name: str = "Ros2ActionTool" |
144 |
| - |
145 |
| - description: str = """ |
146 |
| - usage: ros2 action [-h] Call `ros2 action <command> -h` for more detailed usage. ... |
147 |
| -
|
148 |
| - Various action related sub-commands |
149 |
| -
|
150 |
| - options: |
151 |
| - -h, --help show this help message and exit |
152 |
| -
|
153 |
| - Commands: |
154 |
| - info Print information about an action |
155 |
| - list Output a list of action names |
156 |
| - send_goal Send an action goal |
157 |
| - type Print a action's type |
158 |
| -
|
159 |
| - Call `ros2 action <command> -h` for more detailed usage. |
| 85 | + cmd = ["ros2", "service", command] |
| 86 | + if arguments: |
| 87 | + cmd.extend(arguments) |
| 88 | + return run_command(cmd, timeout) |
| 89 | + |
| 90 | + |
| 91 | +@tool |
| 92 | +def ros2_node( |
| 93 | + command: Literal["info", "list"], |
| 94 | + arguments: Optional[List[str]] = None, |
| 95 | + timeout: int = 5, |
| 96 | +): |
| 97 | + """Run a ROS2 node command |
| 98 | + Args: |
| 99 | + command: The node command to run |
| 100 | + arguments: Additional arguments for the command as a list of strings |
| 101 | + timeout: Command timeout in seconds |
160 | 102 | """
|
161 |
| - |
162 |
| - args_schema: Type[Ros2ActionToolInput] = Ros2ActionToolInput |
163 |
| - |
164 |
| - def _run(self, command: str): |
165 |
| - command = f"ros2 action {command}" |
166 |
| - result = subprocess.run(command, shell=True, capture_output=True) |
167 |
| - return result |
| 103 | + cmd = ["ros2", "node", command] |
| 104 | + if arguments: |
| 105 | + cmd.extend(arguments) |
| 106 | + return run_command(cmd, timeout) |
| 107 | + |
| 108 | + |
| 109 | +@tool |
| 110 | +def ros2_param( |
| 111 | + command: Literal["delete", "describe", "dump", "get", "list", "set"], |
| 112 | + arguments: Optional[List[str]] = None, |
| 113 | + timeout: int = 5, |
| 114 | +): |
| 115 | + """Run a ROS2 parameter command |
| 116 | + Args: |
| 117 | + command: The parameter command to run |
| 118 | + arguments: Additional arguments for the command as a list of strings |
| 119 | + timeout: Command timeout in seconds |
| 120 | + """ |
| 121 | + cmd = ["ros2", "param", command] |
| 122 | + if arguments: |
| 123 | + cmd.extend(arguments) |
| 124 | + return run_command(cmd, timeout) |
| 125 | + |
| 126 | + |
| 127 | +@tool |
| 128 | +def ros2_interface( |
| 129 | + command: Literal["list", "package", "packages", "proto", "show"], |
| 130 | + arguments: Optional[List[str]] = None, |
| 131 | + timeout: int = 5, |
| 132 | +): |
| 133 | + """Run a ROS2 interface command |
| 134 | + Args: |
| 135 | + command: The interface command to run |
| 136 | + arguments: Additional arguments for the command as a list of strings |
| 137 | + timeout: Command timeout in seconds |
| 138 | + """ |
| 139 | + cmd = ["ros2", "interface", command] |
| 140 | + if arguments: |
| 141 | + cmd.extend(arguments) |
| 142 | + return run_command(cmd, timeout) |
| 143 | + |
| 144 | + |
| 145 | +@tool |
| 146 | +def ros2_topic( |
| 147 | + command: Literal[ |
| 148 | + "bw", "delay", "echo", "find", "hz", "info", "list", "pub", "type" |
| 149 | + ], |
| 150 | + arguments: Optional[List[str]] = None, |
| 151 | + timeout: int = 5, |
| 152 | +): |
| 153 | + """Run a ROS2 topic command |
| 154 | + Args: |
| 155 | + command: The topic command to run: |
| 156 | + - bw: Display bandwidth used by topic |
| 157 | + - delay: Display delay of topic from timestamp in header |
| 158 | + - echo: Output messages from a topic |
| 159 | + - find: Output a list of available topics of a given type |
| 160 | + - hz: Print the average publishing rate to screen |
| 161 | + - info: Print information about a topic |
| 162 | + - list: Output a list of available topics |
| 163 | + - pub: Publish a message to a topic |
| 164 | + - type: Print a topic's type |
| 165 | + arguments: Additional arguments for the command as a list of strings |
| 166 | + timeout: Command timeout in seconds |
| 167 | + """ |
| 168 | + cmd = ["ros2", "topic", command] |
| 169 | + if arguments: |
| 170 | + cmd.extend(arguments) |
| 171 | + return run_command(cmd, timeout) |
0 commit comments