Open
Description
Bug report
Required Info:
- Operating System:
- Ubuntu 22.04
- Installation type:
- binaries
- Version or commit hash:
- Humble
- DDS implementation:
- default
- Client library (if applicable):
- rclpy
Steps to reproduce issue
Similar to this question, I am writing some pytests to test a Node I am developing. I am using launch_pytest to setup the ROS Nodes I want to test. And within a test, I am creating a helper node to publish some messages.
However, I am getting the following error when I publish something with the helper node: The following exception was never retrieved: cannot use Destroyable because destruction was requested
. The error is triggered with this node.diagnostics_pub.publish(diag_msg)
I don' t get any errors when running the node normally, only when testing.
import launch
import launch_pytest
import launch_ros
from pathlib import Path
import pytest
import rclpy
import traceback
from threading import Event
from threading import Thread
import sys
from diagnostic_msgs.msg import DiagnosticArray
from diagnostic_msgs.msg import DiagnosticStatus
from diagnostic_msgs.msg import KeyValue
from lifecycle_msgs.srv import ChangeState
from lifecycle_msgs.srv import GetState
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from ros_typedb_msgs.srv import Query
@launch_pytest.fixture
def generate_test_description():
path_to_pkg = Path(__file__).parents[1]
path_config = path_to_pkg / 'config'
path_test_data = path_to_pkg / 'test' / 'test_data'
metacontrol_kb_node = launch_ros.actions.Node(
executable=sys.executable,
arguments=[
str(path_to_pkg / 'metacontrol_kb' / 'metacontrol_kb_typedb.py')],
additional_env={'PYTHONUNBUFFERED': '1'},
name='metacontrol_kb',
output='screen',
parameters=[{
'schema_path': str(path_config / 'schema.tql'),
'data_path': str(path_test_data / 'test_data.tql')
}]
)
return launch.LaunchDescription([
metacontrol_kb_node,
])
@pytest.mark.launch(fixture=generate_test_description)
def test_metacontrol_kb_diagnostics():
rclpy.init()
traceback_logger = rclpy.logging.get_logger('node_class_traceback_logger')
try:
node = MakeTestNode()
node.start_node()
node.activate_metacontrol_kb()
status_msg = DiagnosticStatus()
status_msg.level = DiagnosticStatus.OK
status_msg.name = ''
key_value = KeyValue()
key_value.key = 'ea_measurement'
key_value.value = str(1.72)
status_msg.values.append(key_value)
status_msg.message = 'QA status'
diag_msg = DiagnosticArray()
diag_msg.header.stamp = node.get_clock().now().to_msg()
diag_msg.status.append(status_msg)
node.diagnostics_pub.publish(diag_msg)
query_req = Query.Request()
query_req.query_type = 'match'
query_req.query = """
match $ea isa Attribute,
has attribute-name "ea_measurement",
has attribute-measurement $measurement;
get $measurement;
"""
query_res = node.call_service(node.query_srv, query_req)
correct_measurement = False
for r in query_res.result:
if r.attribute_name == 'attribute-measurement' \
and r.value.double_value == 1.72:
correct_measurement = True
assert correct_measurement
except Exception as exception:
traceback_logger.error(traceback.format_exc())
raise exception
finally:
rclpy.shutdown()
class MakeTestNode(Node):
def __init__(self, name='test_node'):
super().__init__(name)
self.diagnostics_pub = self.create_publisher(
DiagnosticArray,
'/diagnostics',
1)
self.change_state_srv = self.create_client(
ChangeState, '/metacontrol_kb/change_state')
self.get_state_srv = self.create_client(
GetState, '/metacontrol_kb/get_state')
self.query_srv = self.create_client(
Query, '/typedb/query')
def start_node(self):
self.ros_spin_thread = Thread(
target=lambda node: rclpy.spin(node),
args=(self,))
self.ros_spin_thread.start()
def change_node_state(self, transition_id):
change_state_req = ChangeState.Request()
change_state_req.transition.id = transition_id
return self.call_service(self.change_state_srv, change_state_req)
def get_node_state(self):
get_state_req = GetState.Request()
return self.call_service(self.get_state_srv, get_state_req)
def call_service(self, cli, request):
if cli.wait_for_service(timeout_sec=5.0) is False:
self.get_logger().error(
'service not available {}'.format(cli.srv_name))
return None
future = cli.call_async(request)
if self.executor.spin_until_future_complete(
future, timeout_sec=5.0) is False:
self.get_logger().error(
'Future not completed {}'.format(cli.srv_name))
return None
return future.result()
def activate_metacontrol_kb(self):
state_req = 1
res = self.change_node_state(state_req)
if res.success is True:
state_req = 3
res = self.change_node_state(state_req)
if res.success is False:
self.get_logger().error(
'State change req error. Requested {}'.format(state_req))
Expected behavior
No error returned
Actual behavior
Error: [python3-1] The following exception was never retrieved: cannot use Destroyable because destruction was requested
Metadata
Metadata
Assignees
Labels
No labels