12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
+ import os
15
16
import threading
17
+ import uuid
18
+ from typing import List
16
19
17
20
import numpy as np
21
+ import pytest
18
22
import rclpy
19
23
from rclpy .node import Node
20
- from rclpy .qos import qos_profile_sensor_data
24
+ from rclpy .qos import QoSPresetProfiles , QoSProfile
21
25
from sensor_msgs .msg import Image
22
26
from std_msgs .msg import String
23
27
24
28
from rai .node import RaiBaseNode
25
29
26
30
31
+ def get_qos_profiles () -> List [str ]:
32
+ ros_distro = os .environ .get ("ROS_DISTRO" )
33
+ match ros_distro :
34
+ case "humble" :
35
+ return [
36
+ QoSPresetProfiles .SYSTEM_DEFAULT .name ,
37
+ QoSPresetProfiles .SENSOR_DATA .name ,
38
+ QoSPresetProfiles .SERVICES_DEFAULT .name ,
39
+ QoSPresetProfiles .PARAMETERS .name ,
40
+ QoSPresetProfiles .PARAMETER_EVENTS .name ,
41
+ QoSPresetProfiles .ACTION_STATUS_DEFAULT .name ,
42
+ ]
43
+ case "jazzy" :
44
+ return [
45
+ QoSPresetProfiles .DEFAULT .name ,
46
+ QoSPresetProfiles .SYSTEM_DEFAULT .name ,
47
+ QoSPresetProfiles .SENSOR_DATA .name ,
48
+ QoSPresetProfiles .SERVICES_DEFAULT .name ,
49
+ QoSPresetProfiles .PARAMETERS .name ,
50
+ QoSPresetProfiles .PARAMETER_EVENTS .name ,
51
+ QoSPresetProfiles .ACTION_STATUS_DEFAULT .name ,
52
+ QoSPresetProfiles .BEST_AVAILABLE .name ,
53
+ ]
54
+ case _:
55
+ raise ValueError (f"Invalid ROS_DISTRO: { ros_distro } " )
56
+
57
+
27
58
class TestPublisher (Node ):
28
- def __init__ (self ):
29
- super ().__init__ ("test_publisher" )
30
- self .image_publisher = self .create_publisher (
31
- Image , "image" , qos_profile_sensor_data
32
- )
33
- self .text_publisher = self .create_publisher (String , "text" , 10 )
59
+ def __init__ (self , qos_profile : QoSProfile ):
60
+ super ().__init__ ("test_publisher_" + str (uuid .uuid4 ()).replace ("-" , "" ))
61
+
62
+ self .image_publisher = self .create_publisher (Image , "image" , qos_profile )
63
+ self .text_publisher = self .create_publisher (String , "text" , qos_profile )
34
64
35
65
self .image_timer = self .create_timer (0.1 , self .image_callback )
36
66
self .text_timer = self .create_timer (0.1 , self .text_callback )
@@ -52,18 +82,27 @@ def text_callback(self):
52
82
self .text_publisher .publish (msg )
53
83
54
84
55
- def test_transport ():
56
- rclpy .init ()
57
- publisher = TestPublisher ()
85
+ @pytest .mark .parametrize (
86
+ "qos_profile" ,
87
+ get_qos_profiles (),
88
+ )
89
+ def test_transport (qos_profile : str ):
90
+ if not rclpy .ok ():
91
+ rclpy .init ()
92
+ publisher = TestPublisher (QoSPresetProfiles .get_from_short_key (qos_profile ))
58
93
thread = threading .Thread (target = rclpy .spin , args = (publisher ,))
59
94
thread .start ()
60
95
61
- rai_base_node = RaiBaseNode (node_name = "test_transport" )
96
+ rai_base_node = RaiBaseNode (
97
+ node_name = "test_transport_" + str (uuid .uuid4 ()).replace ("-" , "" )
98
+ )
62
99
topics = ["/image" , "/text" ]
63
- for topic in topics :
64
- output = rai_base_node .get_raw_message_from_topic (topic )
65
- assert output is not None
66
-
67
- publisher .destroy_node ()
68
- rclpy .shutdown ()
69
- thread .join ()
100
+ try :
101
+ for topic in topics :
102
+ output = rai_base_node .get_raw_message_from_topic (topic , timeout_sec = 5.0 )
103
+ assert not isinstance (output , Exception ), f"Exception: { output } "
104
+ finally :
105
+ # TODO: Fix errors regarding node destruction in thread
106
+ publisher .destroy_node ()
107
+ rclpy .shutdown ()
108
+ thread .join ()
0 commit comments