Skip to content

Commit 2e41110

Browse files
Refs #12368: Included PR #2122 requested changes
Signed-off-by: Juan López Fernández <[email protected]>
1 parent 14ffbe6 commit 2e41110

File tree

8 files changed

+402
-252
lines changed

8 files changed

+402
-252
lines changed

examples/C++/DDS/HelloWorldExampleDS/HelloWorldPublisher.cpp

Lines changed: 69 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,9 @@
3232
using namespace eprosima::fastdds::dds;
3333
using namespace eprosima::fastdds::rtps;
3434

35-
namespace pub_ns {
36-
bool stop;
37-
} // namespace pub_ns
38-
39-
using namespace pub_ns;
35+
std::atomic<bool> HelloWorldPublisher::stop_(false);
36+
std::mutex HelloWorldPublisher::PubListener::wait_matched_cv_mtx_;
37+
std::condition_variable HelloWorldPublisher::PubListener::wait_matched_cv_;
4038

4139
HelloWorldPublisher::HelloWorldPublisher()
4240
: participant_(nullptr)
@@ -47,14 +45,27 @@ HelloWorldPublisher::HelloWorldPublisher()
4745
{
4846
}
4947

48+
bool HelloWorldPublisher::is_stopped()
49+
{
50+
return stop_;
51+
}
52+
53+
void HelloWorldPublisher::stop()
54+
{
55+
stop_ = true;
56+
PubListener::awake();
57+
}
58+
5059
bool HelloWorldPublisher::init(
5160
const std::string& topic_name,
61+
uint32_t num_wait_matched,
5262
eprosima::fastdds::rtps::Locator server_address)
5363
{
5464
hello_.index(0);
5565
hello_.message("HelloWorld");
5666
DomainParticipantQos pqos;
5767
pqos.name("Participant_pub");
68+
listener_.set_num_wait_matched(num_wait_matched);
5869

5970
// Set participant as DS CLIENT
6071
pqos.wire_protocol().builtin.discovery_config.discoveryProtocol =
@@ -131,6 +142,10 @@ void HelloWorldPublisher::PubListener::on_publication_matched(
131142
{
132143
matched_ = info.total_count;
133144
std::cout << "Publisher matched." << std::endl;
145+
if (enough_matched())
146+
{
147+
awake();
148+
}
134149
}
135150
else if (info.current_count_change == -1)
136151
{
@@ -144,52 +159,82 @@ void HelloWorldPublisher::PubListener::on_publication_matched(
144159
}
145160
}
146161

162+
void HelloWorldPublisher::PubListener::set_num_wait_matched(
163+
uint32_t num_wait_matched)
164+
{
165+
num_wait_matched_ = num_wait_matched;
166+
}
167+
168+
bool HelloWorldPublisher::PubListener::enough_matched()
169+
{
170+
return matched_ >= num_wait_matched_;
171+
}
172+
173+
void HelloWorldPublisher::PubListener::wait()
174+
{
175+
std::unique_lock<std::mutex> lck(wait_matched_cv_mtx_);
176+
wait_matched_cv_.wait(lck, [this]
177+
{
178+
return enough_matched() || is_stopped();
179+
});
180+
}
181+
182+
void HelloWorldPublisher::PubListener::awake()
183+
{
184+
wait_matched_cv_.notify_one();
185+
}
186+
147187
void HelloWorldPublisher::runThread(
148188
uint32_t samples,
149-
uint32_t sleep,
150-
uint32_t numWaitMatched)
189+
uint32_t sleep)
151190
{
152191
if (samples == 0)
153192
{
154-
while (!stop)
193+
while (!is_stopped())
155194
{
156-
if (publish(numWaitMatched))
195+
if (listener_.enough_matched())
157196
{
197+
publish();
158198
std::cout << "Message: " << hello_.message() << " with index: " << hello_.index()
159199
<< " SENT" << std::endl;
200+
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
201+
}
202+
else
203+
{
204+
listener_.wait();
160205
}
161-
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
162206
}
163207
}
164208
else
165209
{
166210
for (uint32_t i = 0; i < samples; ++i)
167211
{
168-
if (stop)
212+
if (is_stopped())
169213
{
170214
break;
171215
}
172-
if (!publish(numWaitMatched))
216+
if (listener_.enough_matched())
173217
{
174-
--i;
218+
publish();
219+
std::cout << "Message: " << hello_.message() << " with index: " << hello_.index()
220+
<< " SENT" << std::endl;
221+
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
175222
}
176223
else
177224
{
178-
std::cout << "Message: " << hello_.message() << " with index: " << hello_.index()
179-
<< " SENT" << std::endl;
225+
--i;
226+
listener_.wait();
180227
}
181-
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
182228
}
183229
}
184230
}
185231

186232
void HelloWorldPublisher::run(
187233
uint32_t samples,
188-
uint32_t sleep,
189-
uint32_t numWaitMatched)
234+
uint32_t sleep)
190235
{
191-
stop = false;
192-
std::thread thread(&HelloWorldPublisher::runThread, this, samples, sleep, numWaitMatched);
236+
stop_ = false;
237+
std::thread thread(&HelloWorldPublisher::runThread, this, samples, sleep);
193238
if (samples == 0)
194239
{
195240
std::cout << "Publisher running. Please press CTRL+C to stop the Publisher at any time." << std::endl;
@@ -200,19 +245,13 @@ void HelloWorldPublisher::run(
200245
}
201246
signal(SIGINT, [](int signum)
202247
{
203-
static_cast<void>(signum); stop = true;
248+
static_cast<void>(signum); HelloWorldPublisher::stop();
204249
});
205250
thread.join();
206251
}
207252

208-
bool HelloWorldPublisher::publish(
209-
uint32_t numWaitMatched)
253+
void HelloWorldPublisher::publish()
210254
{
211-
if (listener_.matched_ >= numWaitMatched)
212-
{
213-
hello_.index(hello_.index() + 1);
214-
writer_->write(&hello_);
215-
return true;
216-
}
217-
return false;
255+
hello_.index(hello_.index() + 1);
256+
writer_->write(&hello_);
218257
}

examples/C++/DDS/HelloWorldExampleDS/HelloWorldPublisher.h

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
#include <fastdds/dds/topic/TypeSupport.hpp>
2727
#include <fastdds/dds/domain/DomainParticipant.hpp>
2828

29+
#include <atomic>
30+
#include <condition_variable>
31+
#include <mutex>
32+
2933
class HelloWorldPublisher
3034
{
3135
public:
@@ -37,17 +41,20 @@ class HelloWorldPublisher
3741
//!Initialize
3842
bool init(
3943
const std::string& topic_name,
44+
uint32_t num_wait_matched,
4045
eprosima::fastdds::rtps::Locator server_address);
4146

4247
//!Publish a sample
43-
bool publish(
44-
uint32_t numWaitMatched);
48+
void publish();
4549

4650
//!Run for number samples
4751
void run(
4852
uint32_t number,
49-
uint32_t sleep,
50-
uint32_t numWaitMatched);
53+
uint32_t sleep);
54+
55+
static bool is_stopped();
56+
57+
static void stop();
5158

5259
private:
5360

@@ -67,6 +74,7 @@ class HelloWorldPublisher
6774

6875
PubListener()
6976
: matched_(0)
77+
, num_wait_matched_(0)
7078
{
7179
}
7280

@@ -78,17 +86,34 @@ class HelloWorldPublisher
7886
eprosima::fastdds::dds::DataWriter* writer,
7987
const eprosima::fastdds::dds::PublicationMatchedStatus& info) override;
8088

81-
uint32_t matched_;
89+
void set_num_wait_matched(
90+
uint32_t num_wait_matched);
91+
92+
bool enough_matched();
93+
94+
void wait();
95+
96+
static void awake();
8297

98+
private:
99+
100+
std::atomic<std::uint32_t> matched_;
101+
102+
uint32_t num_wait_matched_;
103+
104+
static std::mutex wait_matched_cv_mtx_;
105+
106+
static std::condition_variable wait_matched_cv_;
83107
}
84108
listener_;
85109

86110
void runThread(
87111
uint32_t number,
88-
uint32_t sleep,
89-
uint32_t numWaitMatched);
112+
uint32_t sleep);
90113

91114
eprosima::fastdds::dds::TypeSupport type_;
115+
116+
static std::atomic<bool> stop_;
92117
};
93118

94119

examples/C++/DDS/HelloWorldExampleDS/HelloWorldServer.cpp

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,26 @@
2828
using namespace eprosima::fastdds::dds;
2929
using namespace eprosima::fastdds::rtps;
3030

31-
namespace server_ns {
32-
bool stop;
33-
std::mutex mtx;
34-
std::condition_variable terminate_cv;
35-
} // namespace server_ns
36-
37-
using namespace server_ns;
31+
std::atomic<bool> HelloWorldServer::stop_(false);
32+
std::mutex HelloWorldServer::terminate_cv_mtx_;
33+
std::condition_variable HelloWorldServer::terminate_cv_;
3834

3935
HelloWorldServer::HelloWorldServer()
4036
: participant_(nullptr)
4137
{
4238
}
4339

40+
bool HelloWorldServer::is_stopped()
41+
{
42+
return stop_;
43+
}
44+
45+
void HelloWorldServer::stop()
46+
{
47+
stop_ = true;
48+
terminate_cv_.notify_one();
49+
}
50+
4451
bool HelloWorldServer::init(
4552
eprosima::fastdds::rtps::Locator server_address)
4653
{
@@ -74,15 +81,15 @@ HelloWorldServer::~HelloWorldServer()
7481

7582
void HelloWorldServer::run()
7683
{
77-
std::cout << "Server running. Please press CTRL+C to stop the Server at any time." << std::endl;
78-
stop = false;
84+
stop_ = false;
85+
std::cout << "Server running. Please press CTRL+C to stop the Server" << std::endl;
7986
signal(SIGINT, [](int signum)
8087
{
81-
static_cast<void>(signum); stop = true; terminate_cv.notify_one();
88+
static_cast<void>(signum); HelloWorldServer::stop();
8289
});
83-
std::unique_lock<std::mutex> lck(mtx);
84-
terminate_cv.wait(lck, []
90+
std::unique_lock<std::mutex> lck(terminate_cv_mtx_);
91+
terminate_cv_.wait(lck, []
8592
{
86-
return stop;
93+
return is_stopped();
8794
});
8895
}

examples/C++/DDS/HelloWorldExampleDS/HelloWorldServer.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222

2323
#include <fastdds/dds/domain/DomainParticipant.hpp>
2424

25+
#include <atomic>
26+
#include <condition_variable>
27+
#include <mutex>
28+
2529
class HelloWorldServer
2630
{
2731
public:
@@ -37,9 +41,19 @@ class HelloWorldServer
3741
//!Run
3842
void run();
3943

44+
static bool is_stopped();
45+
46+
static void stop();
47+
4048
private:
4149

4250
eprosima::fastdds::dds::DomainParticipant* participant_;
51+
52+
static std::atomic<bool> stop_;
53+
54+
static std::mutex terminate_cv_mtx_;
55+
56+
static std::condition_variable terminate_cv_;
4357
};
4458

4559

0 commit comments

Comments
 (0)