Skip to content

Commit 9bf5b74

Browse files
authored
Thread-safe access to PublisherData (#278)
* Create PublisherData within NodeData Signed-off-by: Yadunund <[email protected]> * Continue shutting down other nodes if one fails Signed-off-by: Yadunund <[email protected]> * includes Signed-off-by: Yadunund <[email protected]> * Expose required methods from Entity instead Signed-off-by: Yadunund <[email protected]> * Reuse methods and make publish_serialized_message also thread safe Signed-off-by: Yadunund <[email protected]> * Address alex's feedback Signed-off-by: Yadunund <[email protected]> * Safely copy GID Signed-off-by: Yadunund <[email protected]> --------- Signed-off-by: Yadunund <[email protected]>
1 parent 824886a commit 9bf5b74

16 files changed

+1047
-533
lines changed

rmw_zenoh_cpp/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ add_library(rmw_zenoh_cpp SHARED
4040
src/detail/qos.cpp
4141
src/detail/rmw_context_impl_s.cpp
4242
src/detail/rmw_data_types.cpp
43+
src/detail/rmw_publisher_data.cpp
4344
src/detail/rmw_node_data.cpp
4445
src/detail/service_type_support.cpp
4546
src/detail/type_support.cpp
4647
src/detail/type_support_common.cpp
4748
src/detail/zenoh_config.cpp
4849
src/detail/zenoh_router_check.cpp
50+
src/detail/zenoh_utils.cpp
4951
src/rmw_event.cpp
5052
src/rmw_get_network_flow_endpoints.cpp
5153
src/rmw_get_node_info_and_types.cpp

rmw_zenoh_cpp/src/detail/graph_cache.cpp

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ void GraphCache::take_entities_with_events(const EntityEventMap & entities_with_
319319
for (const auto & [local_entity, event_set] : entities_with_events) {
320320
// Trigger callback set for this entity for the event type.
321321
GraphEventCallbackMap::const_iterator event_callbacks_it =
322-
event_callbacks_.find(local_entity);
322+
event_callbacks_.find(local_entity->guid());
323323
if (event_callbacks_it != event_callbacks_.end()) {
324324
for (const rmw_zenoh_event_type_t & event_type : event_set) {
325325
GraphEventCallbacks::const_iterator callback_it =
@@ -852,25 +852,24 @@ rmw_ret_t GraphCache::get_topic_names_and_types(
852852

853853
///=============================================================================
854854
rmw_ret_t GraphCache::publisher_count_matched_subscriptions(
855-
const rmw_publisher_t * publisher,
855+
PublisherDataConstPtr pub_data,
856856
size_t * subscription_count)
857857
{
858858
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
859859
// we support matched qos events.
860860
*subscription_count = 0;
861-
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name);
861+
auto topic_info = pub_data->topic_info();
862+
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_);
862863
if (topic_it != graph_topics_.end()) {
863-
rmw_publisher_data_t * pub_data =
864-
static_cast<rmw_publisher_data_t *>(publisher->data);
865864
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
866-
pub_data->type_support->get_name());
865+
topic_info.type_);
867866
if (topic_data_it != topic_it->second.end()) {
868867
for (const auto & [_, topic_data] : topic_data_it->second) {
869868
// If a subscription exists with compatible QoS, update the subscription count.
870869
if (!topic_data->subs_.empty()) {
871870
rmw_qos_compatibility_type_t is_compatible;
872871
rmw_ret_t ret = rmw_qos_profile_check_compatible(
873-
pub_data->adapted_qos_profile,
872+
pub_data->adapted_qos_profile(),
874873
topic_data->info_.qos_,
875874
&is_compatible,
876875
nullptr,
@@ -1244,7 +1243,7 @@ rmw_ret_t GraphCache::service_server_is_available(
12441243

12451244
///=============================================================================
12461245
void GraphCache::set_qos_event_callback(
1247-
liveliness::ConstEntityPtr entity,
1246+
std::size_t entity_guid,
12481247
const rmw_zenoh_event_type_t & event_type,
12491248
GraphCacheEventCallback callback)
12501249
{
@@ -1257,20 +1256,20 @@ void GraphCache::set_qos_event_callback(
12571256
return;
12581257
}
12591258

1260-
const GraphEventCallbackMap::iterator event_cb_it = event_callbacks_.find(entity);
1259+
const GraphEventCallbackMap::iterator event_cb_it = event_callbacks_.find(entity_guid);
12611260
if (event_cb_it == event_callbacks_.end()) {
12621261
// First time a callback is being set for this entity.
1263-
event_callbacks_[entity] = {std::make_pair(event_type, std::move(callback))};
1262+
event_callbacks_[entity_guid] = {std::make_pair(event_type, std::move(callback))};
12641263
return;
12651264
}
12661265
event_cb_it->second[event_type] = std::move(callback);
12671266
}
12681267

12691268
///=============================================================================
1270-
void GraphCache::remove_qos_event_callbacks(liveliness::ConstEntityPtr entity)
1269+
void GraphCache::remove_qos_event_callbacks(std::size_t entity_guid)
12711270
{
12721271
std::lock_guard<std::mutex> lock(graph_mutex_);
1273-
event_callbacks_.erase(entity);
1272+
event_callbacks_.erase(entity_guid);
12741273
}
12751274

12761275
///=============================================================================
@@ -1348,7 +1347,7 @@ void GraphCache::set_querying_subscriber_callback(
13481347
const rmw_subscription_data_t * sub_data,
13491348
QueryingSubscriberCallback cb)
13501349
{
1351-
const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_;
1350+
const std::string keyexpr = sub_data->entity->topic_info().value().topic_keyexpr_;
13521351
auto cb_it = querying_subs_cbs_.find(keyexpr);
13531352
if (cb_it == querying_subs_cbs_.end()) {
13541353
querying_subs_cbs_[keyexpr] = std::move(

rmw_zenoh_cpp/src/detail/graph_cache.hpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef DETAIL__GRAPH_CACHE_HPP_
1616
#define DETAIL__GRAPH_CACHE_HPP_
1717

18+
#include <cstddef>
1819
#include <functional>
1920
#include <map>
2021
#include <memory>
@@ -27,6 +28,7 @@
2728
#include "event.hpp"
2829
#include "liveliness_utils.hpp"
2930
#include "ordered_map.hpp"
31+
#include "rmw_publisher_data.hpp"
3032

3133
#include "rcutils/allocator.h"
3234
#include "rcutils/types.h"
@@ -133,7 +135,7 @@ class GraphCache final
133135
rmw_names_and_types_t * topic_names_and_types) const;
134136

135137
rmw_ret_t publisher_count_matched_subscriptions(
136-
const rmw_publisher_t * publisher,
138+
PublisherDataConstPtr pub_data,
137139
size_t * subscription_count);
138140

139141
rmw_ret_t subscription_count_matched_publishers(
@@ -183,12 +185,12 @@ class GraphCache final
183185
/// Set a qos event callback for an entity from the current session.
184186
/// @note The callback will be removed when the entity is removed from the graph.
185187
void set_qos_event_callback(
186-
liveliness::ConstEntityPtr entity,
188+
std::size_t entity_guid,
187189
const rmw_zenoh_event_type_t & event_type,
188190
GraphCacheEventCallback callback);
189191

190192
/// Remove all qos event callbacks for an entity.
191-
void remove_qos_event_callbacks(liveliness::ConstEntityPtr entity);
193+
void remove_qos_event_callbacks(std::size_t entity_guid);
192194

193195
/// Returns true if the entity is a publisher or client. False otherwise.
194196
static bool is_entity_pub(const liveliness::Entity & entity);
@@ -289,13 +291,13 @@ class GraphCache final
289291
GraphNode::TopicMap graph_services_ = {};
290292

291293
using GraphEventCallbacks = std::unordered_map<rmw_zenoh_event_type_t, GraphCacheEventCallback>;
292-
// Map entity (based on uuid) to a map of event callbacks.
294+
// Map an entity's guid to a map of event callbacks.
293295
// Note: Since we use unordered_map, we will only store a single callback for an
294296
// entity string. So we do not support the case where a node create a duplicate
295297
// pub/sub with the exact same topic, type & QoS but registers a different callback
296298
// for the same event type. We could switch to a multimap here but removing the callback
297299
// will be impossible right now since entities do not have unique IDs.
298-
using GraphEventCallbackMap = std::unordered_map<liveliness::ConstEntityPtr, GraphEventCallbacks>;
300+
using GraphEventCallbackMap = std::unordered_map<std::size_t, GraphEventCallbacks>;
299301
// EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
300302
GraphEventCallbackMap event_callbacks_;
301303
// Map keyexpressions to QueryingSubscriberCallback.

rmw_zenoh_cpp/src/detail/liveliness_utils.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
#include "liveliness_utils.hpp"
1616

1717
#include <functional>
18+
#include <limits>
1819
#include <optional>
20+
#include <random>
1921
#include <sstream>
2022
#include <stdexcept>
2123
#include <string>
@@ -609,6 +611,12 @@ std::string Entity::node_enclave() const
609611
return this->node_info_.enclave_;
610612
}
611613

614+
///=============================================================================
615+
NodeInfo Entity::node_info() const
616+
{
617+
return this->node_info_;
618+
}
619+
612620
///=============================================================================
613621
std::optional<TopicInfo> Entity::topic_info() const
614622
{
@@ -657,4 +665,17 @@ std::string demangle_name(const std::string & input)
657665
return output;
658666
}
659667
} // namespace liveliness
668+
669+
void
670+
generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE])
671+
{
672+
std::random_device dev;
673+
std::mt19937 rng(dev());
674+
std::uniform_int_distribution<std::mt19937::result_type> dist(
675+
std::numeric_limits<unsigned char>::min(), std::numeric_limits<unsigned char>::max());
676+
677+
for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
678+
gid[i] = dist(rng);
679+
}
680+
}
660681
} // namespace rmw_zenoh_cpp

rmw_zenoh_cpp/src/detail/liveliness_utils.hpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include <zenoh.h>
1919

20+
#include <cstddef>
21+
#include <cstdint>
2022
#include <functional>
2123
#include <memory>
2224
#include <optional>
@@ -160,7 +162,10 @@ class Entity
160162

161163
std::string node_enclave() const;
162164

163-
/// Get the topic_info.
165+
/// Get the NodeInfo.
166+
NodeInfo node_info() const;
167+
168+
/// Get the TopicInfo if present.
164169
std::optional<TopicInfo> topic_info() const;
165170

166171
/// Get the liveliness keyexpr for this entity.
@@ -230,6 +235,10 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr);
230235
/// Convert a Zenoh id to a string.
231236
std::string zid_to_str(const z_id_t & id);
232237
} // namespace liveliness
238+
239+
///=============================================================================
240+
// Helper function to generate a randon GID.
241+
void generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]);
233242
} // namespace rmw_zenoh_cpp
234243

235244
///=============================================================================

rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,22 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
149149
rmw_ret_t rmw_context_impl_s::Data::shutdown()
150150
{
151151
std::lock_guard<std::recursive_mutex> lock(mutex_);
152+
rmw_ret_t ret = RMW_RET_OK;
152153
if (is_shutdown_) {
153-
return RMW_RET_OK;
154+
return ret;
155+
}
156+
157+
// Shutdown all the nodes in this context.
158+
for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
159+
ret = node_it->second->shutdown();
160+
if (ret != RMW_RET_OK) {
161+
RMW_ZENOH_LOG_ERROR_NAMED(
162+
"rmw_zenoh_cpp",
163+
"Unable to shutdown node with id %zu. rmw_ret_t code: %zu.",
164+
node_it->second->id(),
165+
ret
166+
);
167+
}
154168
}
155169

156170
z_undeclare_subscriber(z_move(graph_subscriber_));
@@ -170,6 +184,7 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()
170184
rmw_context_impl_s::Data::~Data()
171185
{
172186
auto ret = this->shutdown();
187+
nodes_.clear();
173188
static_cast<void>(ret);
174189
}
175190

@@ -407,6 +422,7 @@ bool rmw_context_impl_s::create_node_data(
407422
}
408423

409424
auto node_data = rmw_zenoh_cpp::NodeData::make(
425+
node,
410426
this->get_next_entity_id(),
411427
z_loan(data_->session_),
412428
data_->domain_id_,

rmw_zenoh_cpp/src/detail/rmw_data_types.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,6 @@ saved_msg_data::~saved_msg_data()
7272
z_drop(z_move(payload));
7373
}
7474

75-
///=============================================================================
76-
size_t rmw_publisher_data_t::get_next_sequence_number()
77-
{
78-
std::lock_guard<std::mutex> lock(sequence_number_mutex_);
79-
return sequence_number_++;
80-
}
81-
8275
///=============================================================================
8376
bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not(
8477
rmw_wait_set_data_t * wait_set_data)

rmw_zenoh_cpp/src/detail/rmw_data_types.hpp

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -44,45 +44,6 @@
4444

4545
namespace rmw_zenoh_cpp
4646
{
47-
///=============================================================================
48-
class rmw_publisher_data_t final
49-
{
50-
public:
51-
// The Entity generated for the publisher.
52-
std::shared_ptr<liveliness::Entity> entity;
53-
54-
// An owned publisher.
55-
z_owned_publisher_t pub;
56-
57-
// Optional publication cache when durability is transient_local.
58-
std::optional<ze_owned_publication_cache_t> pub_cache;
59-
60-
// Store the actual QoS profile used to configure this publisher.
61-
rmw_qos_profile_t adapted_qos_profile;
62-
63-
// Liveliness token for the publisher.
64-
zc_owned_liveliness_token_t token;
65-
66-
// Type support fields
67-
const void * type_support_impl;
68-
const char * typesupport_identifier;
69-
const rosidl_type_hash_t * type_hash;
70-
MessageTypeSupport * type_support;
71-
72-
// Context for memory allocation for messages.
73-
rmw_context_t * context;
74-
75-
uint8_t pub_gid[RMW_GID_STORAGE_SIZE];
76-
77-
size_t get_next_sequence_number();
78-
79-
EventsManager events_mgr;
80-
81-
private:
82-
std::mutex sequence_number_mutex_;
83-
size_t sequence_number_{1};
84-
};
85-
8647
///=============================================================================
8748
// z_owned_closure_sample_t
8849
void sub_data_handler(const z_sample_t * sample, void * sub_data);

0 commit comments

Comments
 (0)