Skip to content

Commit 145a849

Browse files
committed
Create PublisherData within NodeData
Signed-off-by: Yadunund <[email protected]>
1 parent 824886a commit 145a849

14 files changed

+888
-433
lines changed

rmw_zenoh_cpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ add_library(rmw_zenoh_cpp SHARED
4040
src/detail/qos.cpp
4141
src/detail/rmw_context_impl_s.cpp
4242
src/detail/rmw_data_types.cpp
43+
src/detail/rmw_publisher_data.cpp
4344
src/detail/rmw_node_data.cpp
4445
src/detail/service_type_support.cpp
4546
src/detail/type_support.cpp

rmw_zenoh_cpp/src/detail/graph_cache.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -852,25 +852,24 @@ rmw_ret_t GraphCache::get_topic_names_and_types(
852852

853853
///=============================================================================
854854
rmw_ret_t GraphCache::publisher_count_matched_subscriptions(
855-
const rmw_publisher_t * publisher,
855+
PublisherDataConstPtr pub_data,
856856
size_t * subscription_count)
857857
{
858858
// TODO(Yadunund): Replace this logic by returning a number that is tracked once
859859
// we support matched qos events.
860860
*subscription_count = 0;
861-
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(publisher->topic_name);
861+
auto topic_info = pub_data->entity()->topic_info().value();
862+
GraphNode::TopicMap::const_iterator topic_it = graph_topics_.find(topic_info.name_);
862863
if (topic_it != graph_topics_.end()) {
863-
rmw_publisher_data_t * pub_data =
864-
static_cast<rmw_publisher_data_t *>(publisher->data);
865864
GraphNode::TopicTypeMap::const_iterator topic_data_it = topic_it->second.find(
866-
pub_data->type_support->get_name());
865+
topic_info.type_);
867866
if (topic_data_it != topic_it->second.end()) {
868867
for (const auto & [_, topic_data] : topic_data_it->second) {
869868
// If a subscription exists with compatible QoS, update the subscription count.
870869
if (!topic_data->subs_.empty()) {
871870
rmw_qos_compatibility_type_t is_compatible;
872871
rmw_ret_t ret = rmw_qos_profile_check_compatible(
873-
pub_data->adapted_qos_profile,
872+
pub_data->adapted_qos_profile(),
874873
topic_data->info_.qos_,
875874
&is_compatible,
876875
nullptr,
@@ -1348,7 +1347,7 @@ void GraphCache::set_querying_subscriber_callback(
13481347
const rmw_subscription_data_t * sub_data,
13491348
QueryingSubscriberCallback cb)
13501349
{
1351-
const std::string keyexpr = sub_data->entity->topic_info()->topic_keyexpr_;
1350+
const std::string keyexpr = sub_data->entity->topic_info().value().topic_keyexpr_;
13521351
auto cb_it = querying_subs_cbs_.find(keyexpr);
13531352
if (cb_it == querying_subs_cbs_.end()) {
13541353
querying_subs_cbs_[keyexpr] = std::move(

rmw_zenoh_cpp/src/detail/graph_cache.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "event.hpp"
2828
#include "liveliness_utils.hpp"
2929
#include "ordered_map.hpp"
30+
#include "rmw_publisher_data.hpp"
3031

3132
#include "rcutils/allocator.h"
3233
#include "rcutils/types.h"
@@ -133,7 +134,7 @@ class GraphCache final
133134
rmw_names_and_types_t * topic_names_and_types) const;
134135

135136
rmw_ret_t publisher_count_matched_subscriptions(
136-
const rmw_publisher_t * publisher,
137+
PublisherDataConstPtr pub_data,
137138
size_t * subscription_count);
138139

139140
rmw_ret_t subscription_count_matched_publishers(

rmw_zenoh_cpp/src/detail/liveliness_utils.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
#include "liveliness_utils.hpp"
1616

1717
#include <functional>
18+
#include <limits>
1819
#include <optional>
20+
#include <random>
1921
#include <sstream>
2022
#include <stdexcept>
2123
#include <string>
@@ -609,6 +611,12 @@ std::string Entity::node_enclave() const
609611
return this->node_info_.enclave_;
610612
}
611613

614+
///=============================================================================
615+
NodeInfo Entity::node_info() const
616+
{
617+
return this->node_info_;
618+
}
619+
612620
///=============================================================================
613621
std::optional<TopicInfo> Entity::topic_info() const
614622
{
@@ -657,4 +665,17 @@ std::string demangle_name(const std::string & input)
657665
return output;
658666
}
659667
} // namespace liveliness
668+
669+
void
670+
generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE])
671+
{
672+
std::random_device dev;
673+
std::mt19937 rng(dev());
674+
std::uniform_int_distribution<std::mt19937::result_type> dist(
675+
std::numeric_limits<unsigned char>::min(), std::numeric_limits<unsigned char>::max());
676+
677+
for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
678+
gid[i] = dist(rng);
679+
}
680+
}
660681
} // namespace rmw_zenoh_cpp

rmw_zenoh_cpp/src/detail/liveliness_utils.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,10 @@ class Entity
160160

161161
std::string node_enclave() const;
162162

163-
/// Get the topic_info.
163+
/// Get the NodeInfo.
164+
NodeInfo node_info() const;
165+
166+
/// Get the TopicInfo if present.
164167
std::optional<TopicInfo> topic_info() const;
165168

166169
/// Get the liveliness keyexpr for this entity.
@@ -230,8 +233,13 @@ std::optional<rmw_qos_profile_t> keyexpr_to_qos(const std::string & keyexpr);
230233
/// Convert a Zenoh id to a string.
231234
std::string zid_to_str(const z_id_t & id);
232235
} // namespace liveliness
236+
237+
///=============================================================================
238+
// Helper function to generate a randon GID.
239+
void generate_random_gid(uint8_t gid[RMW_GID_STORAGE_SIZE]);
233240
} // namespace rmw_zenoh_cpp
234241

242+
235243
///=============================================================================
236244
// Allow Entity to be hashed and used as a key in unordered_maps/sets
237245
namespace std

rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,17 @@ rmw_ret_t rmw_context_impl_s::Data::subscribe_to_ros_graph()
149149
rmw_ret_t rmw_context_impl_s::Data::shutdown()
150150
{
151151
std::lock_guard<std::recursive_mutex> lock(mutex_);
152+
rmw_ret_t ret = RMW_RET_OK;
152153
if (is_shutdown_) {
153-
return RMW_RET_OK;
154+
return ret;
155+
}
156+
157+
// Shutdown all the nodes in this context.
158+
for (auto node_it = nodes_.begin(); node_it != nodes_.end(); ++node_it) {
159+
ret = node_it->second->shutdown();
160+
if (ret != RMW_RET_OK) {
161+
return ret;
162+
}
154163
}
155164

156165
z_undeclare_subscriber(z_move(graph_subscriber_));
@@ -170,6 +179,7 @@ rmw_ret_t rmw_context_impl_s::Data::shutdown()
170179
rmw_context_impl_s::Data::~Data()
171180
{
172181
auto ret = this->shutdown();
182+
nodes_.clear();
173183
static_cast<void>(ret);
174184
}
175185

@@ -407,6 +417,7 @@ bool rmw_context_impl_s::create_node_data(
407417
}
408418

409419
auto node_data = rmw_zenoh_cpp::NodeData::make(
420+
node,
410421
this->get_next_entity_id(),
411422
z_loan(data_->session_),
412423
data_->domain_id_,

rmw_zenoh_cpp/src/detail/rmw_data_types.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,6 @@ saved_msg_data::~saved_msg_data()
7272
z_drop(z_move(payload));
7373
}
7474

75-
///=============================================================================
76-
size_t rmw_publisher_data_t::get_next_sequence_number()
77-
{
78-
std::lock_guard<std::mutex> lock(sequence_number_mutex_);
79-
return sequence_number_++;
80-
}
81-
8275
///=============================================================================
8376
bool rmw_subscription_data_t::queue_has_data_and_attach_condition_if_not(
8477
rmw_wait_set_data_t * wait_set_data)

rmw_zenoh_cpp/src/detail/rmw_data_types.hpp

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -44,45 +44,6 @@
4444

4545
namespace rmw_zenoh_cpp
4646
{
47-
///=============================================================================
48-
class rmw_publisher_data_t final
49-
{
50-
public:
51-
// The Entity generated for the publisher.
52-
std::shared_ptr<liveliness::Entity> entity;
53-
54-
// An owned publisher.
55-
z_owned_publisher_t pub;
56-
57-
// Optional publication cache when durability is transient_local.
58-
std::optional<ze_owned_publication_cache_t> pub_cache;
59-
60-
// Store the actual QoS profile used to configure this publisher.
61-
rmw_qos_profile_t adapted_qos_profile;
62-
63-
// Liveliness token for the publisher.
64-
zc_owned_liveliness_token_t token;
65-
66-
// Type support fields
67-
const void * type_support_impl;
68-
const char * typesupport_identifier;
69-
const rosidl_type_hash_t * type_hash;
70-
MessageTypeSupport * type_support;
71-
72-
// Context for memory allocation for messages.
73-
rmw_context_t * context;
74-
75-
uint8_t pub_gid[RMW_GID_STORAGE_SIZE];
76-
77-
size_t get_next_sequence_number();
78-
79-
EventsManager events_mgr;
80-
81-
private:
82-
std::mutex sequence_number_mutex_;
83-
size_t sequence_number_{1};
84-
};
85-
8647
///=============================================================================
8748
// z_owned_closure_sample_t
8849
void sub_data_handler(const z_sample_t * sample, void * sub_data);

rmw_zenoh_cpp/src/detail/rmw_node_data.cpp

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ namespace rmw_zenoh_cpp
2828
{
2929
///=============================================================================
3030
std::shared_ptr<NodeData> NodeData::make(
31+
const rmw_node_t * const node,
3132
std::size_t id,
3233
z_session_t session,
3334
std::size_t domain_id,
@@ -75,6 +76,7 @@ std::shared_ptr<NodeData> NodeData::make(
7576

7677
return std::shared_ptr<NodeData>(
7778
new NodeData{
79+
node,
7880
id,
7981
std::move(entity),
8082
std::move(token)
@@ -83,20 +85,31 @@ std::shared_ptr<NodeData> NodeData::make(
8385

8486
///=============================================================================
8587
NodeData::NodeData(
88+
const rmw_node_t * const node,
8689
std::size_t id,
8790
std::shared_ptr<liveliness::Entity> entity,
8891
zc_owned_liveliness_token_t token)
89-
: id_(std::move(id)),
92+
: node_(node),
93+
id_(std::move(id)),
9094
entity_(std::move(entity)),
91-
token_(std::move(token))
95+
token_(std::move(token)),
96+
is_shutdown_(false),
97+
pubs_({})
9298
{
9399
// Do nothing.
94100
}
95101

96102
///=============================================================================
97103
NodeData::~NodeData()
98104
{
99-
zc_liveliness_undeclare_token(z_move(token_));
105+
const rmw_ret_t ret = this->shutdown();
106+
if (ret != RMW_RET_OK) {
107+
RMW_ZENOH_LOG_ERROR_NAMED(
108+
"rmw_zenoh_cpp",
109+
"Error destructing node /%s.",
110+
entity_->node_name().c_str()
111+
);
112+
}
100113
}
101114

102115
///=============================================================================
@@ -105,4 +118,103 @@ std::size_t NodeData::id() const
105118
std::lock_guard<std::mutex> lock(mutex_);
106119
return id_;
107120
}
121+
122+
///=============================================================================
123+
bool NodeData::create_pub_data(
124+
const rmw_publisher_t * const publisher,
125+
z_session_t session,
126+
std::size_t id,
127+
const std::string & topic_name,
128+
const rosidl_message_type_support_t * type_support,
129+
const rmw_qos_profile_t * qos_profile)
130+
{
131+
std::lock_guard<std::mutex> lock_guard(mutex_);
132+
if (is_shutdown_) {
133+
RMW_ZENOH_LOG_ERROR_NAMED(
134+
"rmw_zenoh_cpp",
135+
"Unable to create PublisherData as the NodeData has been shutdown.");
136+
return false;
137+
}
138+
139+
if (pubs_.count(publisher) > 0) {
140+
RMW_ZENOH_LOG_ERROR_NAMED(
141+
"rmw_zenoh_cpp",
142+
"PublisherData already exists.");
143+
return false;
144+
}
145+
146+
auto pub_data = PublisherData::make(
147+
std::move(session),
148+
node_,
149+
entity_->node_info(),
150+
id_,
151+
std::move(id),
152+
std::move(topic_name),
153+
type_support,
154+
qos_profile);
155+
if (pub_data == nullptr) {
156+
RMW_ZENOH_LOG_ERROR_NAMED(
157+
"rmw_zenoh_cpp",
158+
"Unable to make PublisherData.");
159+
return false;
160+
}
161+
162+
auto insertion = pubs_.insert(std::make_pair(publisher, std::move(pub_data)));
163+
if (!insertion.second) {
164+
return false;
165+
}
166+
return true;
167+
}
168+
169+
///=============================================================================
170+
PublisherDataPtr NodeData::get_pub_data(const rmw_publisher_t * const publisher)
171+
{
172+
std::lock_guard<std::mutex> lock_guard(mutex_);
173+
auto it = pubs_.find(publisher);
174+
if (it == pubs_.end()) {
175+
return nullptr;
176+
}
177+
178+
return it->second;
179+
}
180+
181+
///=============================================================================
182+
void NodeData::delete_pub_data(const rmw_publisher_t * const publisher)
183+
{
184+
std::lock_guard<std::mutex> lock_guard(mutex_);
185+
pubs_.erase(publisher);
186+
}
187+
188+
///=============================================================================
189+
rmw_ret_t NodeData::shutdown()
190+
{
191+
std::lock_guard<std::mutex> lock(mutex_);
192+
rmw_ret_t ret = RMW_RET_OK;
193+
if (is_shutdown_) {
194+
return ret;
195+
}
196+
197+
// Shutdown all the entities within this node.
198+
for (auto pub_it = pubs_.begin(); pub_it != pubs_.end(); ++pub_it) {
199+
ret = pub_it->second->shutdown();
200+
if (ret != RMW_RET_OK) {
201+
return ret;
202+
}
203+
}
204+
205+
// Unregister this node from the ROS graph.
206+
zc_liveliness_undeclare_token(z_move(token_));
207+
208+
is_shutdown_ = true;
209+
return RMW_RET_OK;
210+
}
211+
212+
///=============================================================================
213+
// Check if the Node is shutdown.
214+
bool NodeData::is_shutdown() const
215+
{
216+
std::lock_guard<std::mutex> lock(mutex_);
217+
return is_shutdown_;
218+
}
219+
108220
} // namespace rmw_zenoh_cpp

0 commit comments

Comments
 (0)