Skip to content

Commit f99e377

Browse files
committed
Reuse methods and make publish_serialized_message also thread safe
Signed-off-by: Yadunund <[email protected]>
1 parent 0712429 commit f99e377

File tree

6 files changed

+162
-154
lines changed

6 files changed

+162
-154
lines changed

rmw_zenoh_cpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ add_library(rmw_zenoh_cpp SHARED
4747
src/detail/type_support_common.cpp
4848
src/detail/zenoh_config.cpp
4949
src/detail/zenoh_router_check.cpp
50+
src/detail/zenoh_utils.cpp
5051
src/rmw_event.cpp
5152
src/rmw_get_network_flow_endpoints.cpp
5253
src/rmw_get_node_info_and_types.cpp

rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp

Lines changed: 51 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "message_type_support.hpp"
2828
#include "logging_macros.hpp"
2929
#include "qos.hpp"
30+
#include "zenoh_utils.hpp"
3031

3132
#include "rcpputils/scope_exit.hpp"
3233

@@ -36,50 +37,6 @@
3637

3738
namespace rmw_zenoh_cpp
3839
{
39-
namespace
40-
{
41-
z_owned_bytes_map_t
42-
create_map_and_set_sequence_num(int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE])
43-
{
44-
z_owned_bytes_map_t map = z_bytes_map_new();
45-
if (!z_check(map)) {
46-
RMW_SET_ERROR_MSG("failed to allocate map for sequence number");
47-
return z_bytes_map_null();
48-
}
49-
auto free_attachment_map = rcpputils::make_scope_exit(
50-
[&map]() {
51-
z_bytes_map_drop(z_move(map));
52-
});
53-
54-
// The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807.
55-
// That is 19 characters long, plus one for the trailing \0, means we need 20 bytes.
56-
char seq_id_str[20];
57-
if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) {
58-
RMW_SET_ERROR_MSG("failed to print sequence_number into buffer");
59-
return z_bytes_map_null();
60-
}
61-
z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str));
62-
63-
auto now = std::chrono::system_clock::now().time_since_epoch();
64-
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
65-
char source_ts_str[20];
66-
if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) {
67-
RMW_SET_ERROR_MSG("failed to print sequence_number into buffer");
68-
return z_bytes_map_null();
69-
}
70-
z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str));
71-
72-
z_bytes_t gid_bytes;
73-
gid_bytes.len = RMW_GID_STORAGE_SIZE;
74-
gid_bytes.start = gid;
75-
76-
z_bytes_map_insert_by_copy(&map, z_bytes_new("source_gid"), gid_bytes);
77-
78-
free_attachment_map.cancel();
79-
80-
return map;
81-
}
82-
} // namespace
8340
///=============================================================================
8441
std::shared_ptr<PublisherData> PublisherData::make(
8542
z_session_t session,
@@ -332,10 +289,8 @@ rmw_ret_t PublisherData::publish(
332289

333290
const size_t data_length = ser.get_serialized_data_length();
334291

335-
const int64_t sequence_number = sequence_number_++;
336-
337292
z_owned_bytes_map_t map =
338-
create_map_and_set_sequence_num(sequence_number, gid_);
293+
create_map_and_set_sequence_num(sequence_number_++, gid_);
339294
if (!z_check(map)) {
340295
// create_map_and_set_sequence_num already set the error
341296
return RMW_RET_ERROR;
@@ -373,24 +328,67 @@ rmw_ret_t PublisherData::publish(
373328
}
374329

375330
///=============================================================================
376-
std::size_t PublisherData::guid() const
331+
rmw_ret_t PublisherData::publish_serialized_message(
332+
const rmw_serialized_message_t * serialized_message,
333+
std::optional<zc_owned_shm_manager_t> & /*shm_manager*/)
377334
{
335+
eprosima::fastcdr::FastBuffer buffer(
336+
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
337+
rmw_zenoh_cpp::Cdr ser(buffer);
338+
if (!ser.get_cdr().jump(serialized_message->buffer_length)) {
339+
RMW_SET_ERROR_MSG("cannot correctly set serialized buffer");
340+
return RMW_RET_ERROR;
341+
}
342+
378343
std::lock_guard<std::mutex> lock(mutex_);
379-
return entity_->guid();
344+
345+
z_owned_bytes_map_t map =
346+
rmw_zenoh_cpp::create_map_and_set_sequence_num(sequence_number_++, gid_);
347+
348+
if (!z_check(map)) {
349+
// create_map_and_set_sequence_num already set the error
350+
return RMW_RET_ERROR;
351+
}
352+
auto free_attachment_map = rcpputils::make_scope_exit(
353+
[&map]() {
354+
z_bytes_map_drop(z_move(map));
355+
});
356+
357+
const size_t data_length = ser.get_serialized_data_length();
358+
359+
// The encoding is simply forwarded and is useful when key expressions in the
360+
// session use different encoding formats. In our case, all key expressions
361+
// will be encoded with CDR so it does not really matter.
362+
z_publisher_put_options_t options = z_publisher_put_options_default();
363+
options.attachment = z_bytes_map_as_attachment(&map);
364+
365+
// Returns 0 if success.
366+
int8_t ret = z_publisher_put(
367+
z_loan(pub_),
368+
serialized_message->buffer,
369+
data_length,
370+
&options);
371+
372+
if (ret) {
373+
RMW_SET_ERROR_MSG("unable to publish message");
374+
return RMW_RET_ERROR;
375+
}
376+
377+
return RMW_RET_OK;
380378
}
381379

382380
///=============================================================================
383-
liveliness::TopicInfo PublisherData::topic_info() const
381+
std::size_t PublisherData::guid() const
384382
{
385383
std::lock_guard<std::mutex> lock(mutex_);
386-
return entity_->topic_info().value();
384+
return entity_->guid();
387385
}
388386

389387
///=============================================================================
390-
size_t PublisherData::get_next_sequence_number()
388+
liveliness::TopicInfo PublisherData::topic_info() const
391389
{
392390
std::lock_guard<std::mutex> lock(mutex_);
393-
return sequence_number_++;
391+
return entity_->topic_info().value();
394392
}
395393

396394
///=============================================================================
@@ -400,13 +398,6 @@ const uint8_t * PublisherData::gid() const
400398
return gid_;
401399
}
402400

403-
///=============================================================================
404-
z_publisher_t PublisherData::publisher() const
405-
{
406-
std::lock_guard<std::mutex> lock(mutex_);
407-
return z_loan(pub_);
408-
}
409-
410401
///=============================================================================
411402
bool PublisherData::liveliness_is_valid() const
412403
{

rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,10 @@ class PublisherData final
5959
const void * ros_message,
6060
std::optional<zc_owned_shm_manager_t> & shm_manager);
6161

62-
// Get the next sequence number.
63-
size_t get_next_sequence_number();
62+
// Publish a serialized ROS message.
63+
rmw_ret_t publish_serialized_message(
64+
const rmw_serialized_message_t * serialized_message,
65+
std::optional<zc_owned_shm_manager_t> & shm_manager);
6466

6567
// Get a copy of the GUID of this PublisherData's liveliness::Entity.
6668
std::size_t guid() const;
@@ -71,9 +73,6 @@ class PublisherData final
7173
// Get the GID of this PublisherData.
7274
const uint8_t * gid() const;
7375

74-
// Get the Zenoh publisher.
75-
z_publisher_t publisher() const;
76-
7776
// Returns true if liveliness token is still valid.
7877
bool liveliness_is_valid() const;
7978

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright 2024 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "zenoh_utils.hpp"
16+
17+
#include <chrono>
18+
#include <cinttypes>
19+
20+
#include "rcpputils/scope_exit.hpp"
21+
22+
#include "rmw/error_handling.h"
23+
24+
namespace rmw_zenoh_cpp
25+
{
26+
///=============================================================================
27+
z_owned_bytes_map_t
28+
create_map_and_set_sequence_num(int64_t sequence_number, const uint8_t gid[RMW_GID_STORAGE_SIZE])
29+
{
30+
z_owned_bytes_map_t map = z_bytes_map_new();
31+
if (!z_check(map)) {
32+
RMW_SET_ERROR_MSG("failed to allocate map for sequence number");
33+
return z_bytes_map_null();
34+
}
35+
auto free_attachment_map = rcpputils::make_scope_exit(
36+
[&map]() {
37+
z_bytes_map_drop(z_move(map));
38+
});
39+
40+
// The largest possible int64_t number is INT64_MAX, i.e. 9223372036854775807.
41+
// That is 19 characters long, plus one for the trailing \0, means we need 20 bytes.
42+
char seq_id_str[20];
43+
if (rcutils_snprintf(seq_id_str, sizeof(seq_id_str), "%" PRId64, sequence_number) < 0) {
44+
RMW_SET_ERROR_MSG("failed to print sequence_number into buffer");
45+
return z_bytes_map_null();
46+
}
47+
z_bytes_map_insert_by_copy(&map, z_bytes_new("sequence_number"), z_bytes_new(seq_id_str));
48+
49+
auto now = std::chrono::system_clock::now().time_since_epoch();
50+
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
51+
char source_ts_str[20];
52+
if (rcutils_snprintf(source_ts_str, sizeof(source_ts_str), "%" PRId64, now_ns.count()) < 0) {
53+
RMW_SET_ERROR_MSG("failed to print sequence_number into buffer");
54+
return z_bytes_map_null();
55+
}
56+
z_bytes_map_insert_by_copy(&map, z_bytes_new("source_timestamp"), z_bytes_new(source_ts_str));
57+
58+
z_bytes_t gid_bytes;
59+
gid_bytes.len = RMW_GID_STORAGE_SIZE;
60+
gid_bytes.start = gid;
61+
62+
z_bytes_map_insert_by_copy(&map, z_bytes_new("source_gid"), gid_bytes);
63+
64+
free_attachment_map.cancel();
65+
66+
return map;
67+
}
68+
} // namespace rmw_zenoh_cpp
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright 2024 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef DETAIL__ZENOH_UTILS_HPP_
16+
#define DETAIL__ZENOH_UTILS_HPP_
17+
18+
#include <zenoh.h>
19+
20+
#include "rmw/types.h"
21+
22+
namespace rmw_zenoh_cpp
23+
{
24+
///=============================================================================
25+
z_owned_bytes_map_t
26+
create_map_and_set_sequence_num(int64_t sequence_number, const uint8_t gid[RMW_GID_STORAGE_SIZE]);
27+
28+
} // namespace rmw_zenoh_cpp
29+
30+
#endif // DETAIL__ZENOH_UTILS_HPP_

0 commit comments

Comments
 (0)