@@ -47,6 +47,39 @@ namespace fastrtps {
47
47
template <class TypeSupport >
48
48
class PubSubParticipant
49
49
{
50
+ class PrtListener : public ParticipantListener
51
+ {
52
+ friend class PubSubParticipant ;
53
+
54
+ public:
55
+
56
+ PrtListener (
57
+ PubSubParticipant* participant)
58
+ : participant_(participant)
59
+ {
60
+ }
61
+
62
+ ~PrtListener ()
63
+ {
64
+ }
65
+
66
+ void onParticipantDiscovery (
67
+ Participant* part,
68
+ rtps::ParticipantDiscoveryInfo&& info) override
69
+ {
70
+ (void )part;
71
+ (void )info;
72
+ participant_->prt_matched ();
73
+ }
74
+
75
+ private:
76
+
77
+ PrtListener& operator =(
78
+ const PrtListener&) = delete ;
79
+ // ! A pointer to the participant
80
+ PubSubParticipant* participant_;
81
+ };
82
+
50
83
class PubListener : public PublisherListener
51
84
{
52
85
friend class PubSubParticipant ;
@@ -151,6 +184,7 @@ class PubSubParticipant
151
184
, publisher_attr_()
152
185
, pub_listener_(this )
153
186
, sub_listener_(this )
187
+ , prt_matched_(0 )
154
188
, pub_matched_(0 )
155
189
, sub_matched_(0 )
156
190
, pub_times_liveliness_lost_(0 )
@@ -269,6 +303,59 @@ class PubSubParticipant
269
303
publishers_[index ]->assert_liveliness ();
270
304
}
271
305
306
+ bool wait_discovery (
307
+ std::chrono::seconds timeout = std::chrono::seconds::zero(),
308
+ uint8_t matched = 0,
309
+ bool exact = false)
310
+ {
311
+ // No need to wait in this case
312
+ if (exact && matched == prt_matched_)
313
+ {
314
+ return true ;
315
+ }
316
+
317
+ std::unique_lock<std::mutex> lock (prt_mutex_);
318
+ bool ret_value = true ;
319
+ std::cout << " Participant is waiting discovery..." << std::endl;
320
+
321
+ if (timeout == std::chrono::seconds::zero ())
322
+ {
323
+ prt_cv_.wait (lock, [&]()
324
+ {
325
+ if (exact)
326
+ {
327
+ return prt_matched_ == matched;
328
+ }
329
+ return prt_matched_ >= matched;
330
+ });
331
+ }
332
+ else
333
+ {
334
+ if (!prt_cv_.wait_for (lock, timeout, [&]()
335
+ {
336
+ if (exact)
337
+ {
338
+ return prt_matched_ == matched;
339
+ }
340
+ return prt_matched_ >= matched;
341
+ }))
342
+ {
343
+ ret_value = false ;
344
+ }
345
+ }
346
+
347
+ if (ret_value)
348
+ {
349
+ std::cout << " Participant discovery finished successfully..." << std::endl;
350
+ }
351
+ else
352
+ {
353
+ std::cout << " Participant discovery finished unsuccessfully..." << std::endl;
354
+ }
355
+
356
+ return ret_value;
357
+ }
358
+
272
359
void pub_wait_discovery (
273
360
std::chrono::seconds timeout = std::chrono::seconds::zero())
274
361
{
@@ -437,6 +524,13 @@ class PubSubParticipant
437
524
return *this ;
438
525
}
439
526
527
+ PubSubParticipant& wire_protocol_builtin (
528
+ const eprosima::fastrtps::rtps::BuiltinAttributes& wire_protocol_builtin)
529
+ {
530
+ participant_attr_.rtps .builtin = wire_protocol_builtin;
531
+ return *this ;
532
+ }
533
+
440
534
PubSubParticipant& initial_peers (
441
535
const eprosima::fastrtps::rtps::LocatorList_t& initial_peers)
442
536
{
@@ -601,6 +695,20 @@ class PubSubParticipant
601
695
PubSubParticipant& operator =(
602
696
const PubSubParticipant&) = delete ;
603
697
698
+ void prt_matched ()
699
+ {
700
+ std::unique_lock<std::mutex> lock (prt_mutex_);
701
+ ++prt_matched_;
702
+ prt_cv_.notify_one ();
703
+ }
704
+
705
+ void prt_unmatched ()
706
+ {
707
+ std::unique_lock<std::mutex> lock (prt_mutex_);
708
+ --prt_matched_;
709
+ prt_cv_.notify_one ();
710
+ }
711
+
604
712
void pub_matched ()
605
713
{
606
714
std::unique_lock<std::mutex> lock (pub_mutex_);
@@ -654,10 +762,13 @@ class PubSubParticipant
654
762
// ! A listener for subscribers
655
763
SubListener sub_listener_;
656
764
765
+ std::mutex prt_mutex_;
657
766
std::mutex pub_mutex_;
658
767
std::mutex sub_mutex_;
768
+ std::condition_variable prt_cv_;
659
769
std::condition_variable pub_cv_;
660
770
std::condition_variable sub_cv_;
771
+ std::atomic<unsigned int > prt_matched_;
661
772
std::atomic<unsigned int > pub_matched_;
662
773
std::atomic<unsigned int > sub_matched_;
663
774
0 commit comments