Skip to content

Commit 869c043

Browse files
authored
Merge pull request OpenDDS#4928 from OpenDDS/rtps-relay-synchronous-output
RtpsRelay output queue may be ill-behaved
2 parents f2e2a64 + d5b1557 commit 869c043

File tree

7 files changed

+90
-48
lines changed

7 files changed

+90
-48
lines changed

docs/devguide/internet_enabled_rtps.rst

+4
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,10 @@ The command-line options for the RtpsRelay:
339339

340340
Use a thread pool with this many threads (default 1) to handle input/output/timer events.
341341

342+
.. option:: -SynchronousOutput 0|1
343+
344+
Send messages immediately, defaults to 0 (disabled).
345+
342346
.. _internet_enabled_rtps--deployment-considerations:
343347

344348
Deployment Considerations
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
.. news-prs: 4928
2+
3+
.. news-start-section: Additions
4+
- Added :option:`RtpsRelay -SynchronousOutput` option to RtpsRelay.
5+
.. news-end-section

tests/DCPS/RtpsRelay/Smoke/run_test.pl

+2-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ sub get_relay_args {
7575
"-VerticalAddress ${port_digit}444",
7676
"-HorizontalAddress 127.0.0.1:11${port_digit}44",
7777
"-MetaDiscoveryAddress 127.0.0.1:808${n}",
78-
"-ORBVerboseLogging 1"
78+
"-ORBVerboseLogging 1",
79+
"-SynchronousOutput 1"
7980
);
8081
}
8182

tools/rtpsrelay/Config.h

+12
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class Config {
3838
, admission_max_participants_high_water_(0)
3939
, admission_max_participants_low_water_(0)
4040
, handler_threads_(1)
41+
, synchronous_output_(false)
4142
{}
4243

4344
void relay_id(const std::string& value)
@@ -360,6 +361,16 @@ class Config {
360361
return handler_threads_;
361362
}
362363

364+
void synchronous_output(bool flag)
365+
{
366+
synchronous_output_ = flag;
367+
}
368+
369+
bool synchronous_output() const
370+
{
371+
return synchronous_output_;
372+
}
373+
363374
private:
364375
std::string relay_id_;
365376
OpenDDS::DCPS::GUID_t application_participant_guid_;
@@ -393,6 +404,7 @@ class Config {
393404
size_t admission_max_participants_high_water_;
394405
size_t admission_max_participants_low_water_;
395406
size_t handler_threads_;
407+
bool synchronous_output_;
396408
};
397409

398410
}

tools/rtpsrelay/RelayHandler.cpp

+57-42
Original file line numberDiff line numberDiff line change
@@ -175,39 +175,16 @@ int RelayHandler::handle_output(ACE_HANDLE)
175175

176176
if (!outgoing_.empty()) {
177177
const auto& out = outgoing_.front();
178+
size_t total_bytes;
178179

179-
const int BUFFERS_SIZE = 2;
180-
iovec buffers[BUFFERS_SIZE];
181-
size_t total_bytes = 0;
182-
183-
int idx = 0;
184-
for (ACE_Message_Block* block = out.message_block.get(); block && idx < BUFFERS_SIZE; block = block->cont(), ++idx) {
185-
buffers[idx].iov_base = block->rd_ptr();
186-
#ifdef _MSC_VER
187-
#pragma warning(push)
188-
// iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
189-
// since on other platforms iov_len is 64-bit
190-
#pragma warning(disable : 4267)
191-
#endif
192-
buffers[idx].iov_len = block->length();
193-
total_bytes += buffers[idx].iov_len;
194-
#ifdef _MSC_VER
195-
#pragma warning(pop)
196-
#endif
197-
}
198-
199-
const auto bytes = socket_.send(buffers, idx, out.address, 0);
200-
201-
if (bytes < 0) {
180+
if (send_i(out, total_bytes) < 0) {
202181
HANDLER_ERROR((LM_ERROR, "(%P|%t) ERROR: RelayHandler::handle_output %C failed to send to %C: %m\n",
203182
name_.c_str(), OpenDDS::DCPS::LogAddr(out.address).c_str()));
204183
const auto new_now = OpenDDS::DCPS::MonotonicTimePoint::now();
205-
stats_reporter_.dropped_message(
206-
total_bytes, new_now - now, new_now - out.timestamp, now, out.type);
184+
stats_reporter_.dropped_message(total_bytes, new_now - now, new_now - out.timestamp, now, out.type);
207185
} else {
208186
const auto new_now = OpenDDS::DCPS::MonotonicTimePoint::now();
209-
stats_reporter_.output_message(
210-
total_bytes, new_now - now, new_now - out.timestamp, now, out.type);
187+
stats_reporter_.output_message(total_bytes, new_now - now, new_now - out.timestamp, now, out.type);
211188
}
212189

213190
outgoing_.pop();
@@ -225,15 +202,55 @@ void RelayHandler::enqueue_message(const ACE_INET_Addr& addr,
225202
const OpenDDS::DCPS::MonotonicTimePoint& now,
226203
MessageType type)
227204
{
228-
ACE_GUARD(ACE_Thread_Mutex, g, outgoing_mutex_);
205+
const Element out(addr, msg, now, type);
206+
if (config_.synchronous_output()) {
207+
size_t total_bytes;
208+
209+
if (send_i(out, total_bytes) < 0) {
210+
HANDLER_ERROR((LM_ERROR, "(%P|%t) ERROR: RelayHandler::enqueue_message %C failed to send to %C: %m\n",
211+
name_.c_str(), OpenDDS::DCPS::LogAddr(out.address).c_str()));
212+
stats_reporter_.dropped_message(total_bytes, OpenDDS::DCPS::TimeDuration::zero_value, OpenDDS::DCPS::TimeDuration::zero_value, now, out.type);
213+
} else {
214+
stats_reporter_.output_message(total_bytes, OpenDDS::DCPS::TimeDuration::zero_value, OpenDDS::DCPS::TimeDuration::zero_value, now, out.type);
215+
}
216+
217+
} else {
218+
ACE_GUARD(ACE_Thread_Mutex, g, outgoing_mutex_);
219+
220+
const auto empty = outgoing_.empty();
221+
222+
outgoing_.push(out);
223+
stats_reporter_.max_queue_size(outgoing_.size(), now);
224+
if (empty) {
225+
reactor()->register_handler(this, WRITE_MASK);
226+
}
227+
}
228+
}
229229

230-
const auto empty = outgoing_.empty();
230+
ssize_t RelayHandler::send_i(const Element& out,
231+
size_t& total_bytes)
232+
{
233+
const int BUFFERS_SIZE = 2;
234+
iovec buffers[BUFFERS_SIZE];
235+
total_bytes = 0;
231236

232-
outgoing_.push(Element(addr, msg, now, type));
233-
stats_reporter_.max_queue_size(outgoing_.size(), now);
234-
if (empty) {
235-
reactor()->register_handler(this, WRITE_MASK);
237+
int idx = 0;
238+
for (ACE_Message_Block* block = out.message_block.get(); block && idx < BUFFERS_SIZE; block = block->cont(), ++idx) {
239+
buffers[idx].iov_base = block->rd_ptr();
240+
#ifdef _MSC_VER
241+
#pragma warning(push)
242+
// iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
243+
// since on other platforms iov_len is 64-bit
244+
#pragma warning(disable : 4267)
245+
#endif
246+
buffers[idx].iov_len = block->length();
247+
total_bytes += buffers[idx].iov_len;
248+
#ifdef _MSC_VER
249+
#pragma warning(pop)
250+
#endif
236251
}
252+
253+
return socket_.send(buffers, idx, out.address, 0);
237254
}
238255

239256
VerticalHandler::VerticalHandler(const Config& config,
@@ -595,7 +612,7 @@ CORBA::ULong VerticalHandler::send(GuidAddrSet::Proxy& proxy,
595612
CORBA::ULong sent = 0;
596613
for (const auto& addr : address_set) {
597614
if (addr != horizontal_address_) {
598-
horizontal_handler_->enqueue_message(addr, to_partitions, to_guids, msg, now);
615+
horizontal_handler_->enqueue_or_send_message(addr, to_partitions, to_guids, msg, now);
599616
++sent;
600617
} else {
601618
// Local recipients.
@@ -641,8 +658,6 @@ size_t VerticalHandler::send(const ACE_INET_Addr& addr,
641658
message.block = block.get();
642659
serializer << message;
643660
RelayHandler::enqueue_message(addr, block, now, type);
644-
const auto new_now = OpenDDS::DCPS::MonotonicTimePoint::now();
645-
stats_reporter_.output_message(length, new_now - now, new_now - now, now, type);
646661
return length;
647662
}
648663

@@ -663,11 +678,11 @@ HorizontalHandler::HorizontalHandler(const Config& config,
663678
, vertical_handler_(nullptr)
664679
{}
665680

666-
void HorizontalHandler::enqueue_message(const ACE_INET_Addr& addr,
667-
const StringSet& to_partitions,
668-
const GuidSet& to_guids,
669-
const OpenDDS::DCPS::Lockable_Message_Block_Ptr& msg,
670-
const OpenDDS::DCPS::MonotonicTimePoint& now)
681+
void HorizontalHandler::enqueue_or_send_message(const ACE_INET_Addr& addr,
682+
const StringSet& to_partitions,
683+
const GuidSet& to_guids,
684+
const OpenDDS::DCPS::Lockable_Message_Block_Ptr& msg,
685+
const OpenDDS::DCPS::MonotonicTimePoint& now)
671686
{
672687
using namespace OpenDDS::DCPS;
673688

@@ -984,7 +999,7 @@ int SpdpHandler::handle_exception(ACE_HANDLE /*fd*/)
984999
const auto pos = proxy.find(fan_in_from_guid);
9851000
if (pos != proxy.end() && pos->second.spdp_message) {
9861001
// Send the SPDP message horizontally. We may be sending to ourselves which is okay.
987-
horizontal_handler_->enqueue_message(fan_in_replay_address, StringSet(), fan_in_to_guid_set, pos->second.spdp_message, now);
1002+
horizontal_handler_->enqueue_or_send_message(fan_in_replay_address, StringSet(), fan_in_to_guid_set, pos->second.spdp_message, now);
9881003
}
9891004
}
9901005

tools/rtpsrelay/RelayHandler.h

+7-5
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ class RelayHandler : public ACE_Event_Handler {
7575
, type(type)
7676
{}
7777
};
78+
ssize_t send_i(const Element& out,
79+
size_t& total_bytes);
7880
using OutgoingType = std::queue<Element>;
7981
OutgoingType outgoing_;
8082
mutable ACE_Thread_Mutex outgoing_mutex_;
@@ -199,11 +201,11 @@ class HorizontalHandler : public RelayHandler {
199201

200202
void vertical_handler(VerticalHandler* vertical_handler) { vertical_handler_ = vertical_handler; }
201203

202-
void enqueue_message(const ACE_INET_Addr& addr,
203-
const StringSet& to_partitions,
204-
const GuidSet& to_guids,
205-
const OpenDDS::DCPS::Lockable_Message_Block_Ptr& msg,
206-
const OpenDDS::DCPS::MonotonicTimePoint& now);
204+
void enqueue_or_send_message(const ACE_INET_Addr& addr,
205+
const StringSet& to_partitions,
206+
const GuidSet& to_guids,
207+
const OpenDDS::DCPS::Lockable_Message_Block_Ptr& msg,
208+
const OpenDDS::DCPS::MonotonicTimePoint& now);
207209

208210
private:
209211
const GuidPartitionTable& guid_partition_table_;

tools/rtpsrelay/RtpsRelay.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,9 @@ int run(int argc, ACE_TCHAR* argv[])
235235
} else if ((arg = args.get_the_parameter("-HandlerThreads"))) {
236236
config.handler_threads(std::atoi(arg));
237237
args.consume_arg();
238+
} else if ((arg = args.get_the_parameter("-SynchronousOutput"))) {
239+
config.synchronous_output(ACE_OS::atoi(arg));
240+
args.consume_arg();
238241
} else if ((arg = args.get_the_parameter("-MaxIpsPerClient"))) {
239242
config.max_ips_per_client(ACE_OS::atoi(arg));
240243
args.consume_arg();

0 commit comments

Comments
 (0)