4
4
from langchain .tools import BaseTool , tool
5
5
from langchain_core .pydantic_v1 import BaseModel , Field
6
6
from rclpy .node import Node
7
- from rosidl_parser .definition import NamespacedType
8
- from rosidl_runtime_py .import_message import import_message_from_namespaced_type
9
7
from rosidl_runtime_py .set_message import set_message_fields
10
- from rosidl_runtime_py .utilities import get_namespaced_type
11
8
12
9
from rai .communication .ros_communication import SingleMessageGrabber
13
10
14
11
from .utils import import_message_from_str
15
12
16
13
17
14
@tool
18
- def get_topics_names_and_types ():
15
+ def get_topics_names_and_types_tool ():
19
16
"""Call rclpy.node.Node().get_topics_names_and_types(). Return in a csv format. topic_name, serice_type"""
20
17
21
- node = Node (node_name = "rai_tool_node " )
18
+ node = Node (node_name = "rai_get_topics_names_and_types_tool " )
22
19
rclpy .spin_once (node , timeout_sec = 2 )
23
20
try :
24
21
return [
@@ -84,9 +81,8 @@ class Ros2PubMessageTool(BaseTool):
84
81
85
82
def _build_msg (
86
83
self , msg_type : str , msg_args : Dict [str , Any ]
87
- ) -> Tuple [object , object ]:
88
- msg_namespaced_type : NamespacedType = get_namespaced_type (msg_type )
89
- msg_cls = import_message_from_namespaced_type (msg_namespaced_type )
84
+ ) -> Tuple [object , Type ]:
85
+ msg_cls : Type = import_message_from_str (msg_type )
90
86
msg = msg_cls ()
91
87
set_message_fields (msg , msg_args )
92
88
return msg , msg_cls
@@ -96,7 +92,7 @@ def _run(self, topic_name: str, msg_type: str, msg_args: Dict[str, Any]):
96
92
97
93
msg , msg_cls = self ._build_msg (msg_type , msg_args )
98
94
99
- node = Node (node_name = "RAI_PubRos2MessageTool " )
95
+ node = Node (node_name = f"rai_ { self , __class__ . __name__ } " )
100
96
publisher = node .create_publisher (
101
97
msg_cls , topic_name , 10
102
98
) # TODO(boczekbartek): infer qos profile from topic info
0 commit comments