Skip to content

Commit a5a7d16

Browse files
authored
Fix singleton destruction order (#1756)
* Fix singleton destruction order (port of #1748) Signed-off-by: Iker Luengo <[email protected]> * uncrustify Signed-off-by: Iker Luengo <[email protected]>
1 parent 3da0cba commit a5a7d16

File tree

9 files changed

+119
-46
lines changed

9 files changed

+119
-46
lines changed

src/cpp/fastdds/domain/DomainParticipantFactory.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
#include <fastrtps/types/DynamicDataFactory.h>
3333
#include <fastrtps/types/TypeObjectFactory.h>
3434

35+
#include <rtps/history/TopicPayloadPoolRegistry.hpp>
36+
3537
using namespace eprosima::fastrtps::xmlparser;
3638

3739
using eprosima::fastrtps::ParticipantAttributes;
@@ -97,6 +99,10 @@ DomainParticipantFactory::~DomainParticipantFactory()
9799

98100
DomainParticipantFactory* DomainParticipantFactory::get_instance()
99101
{
102+
// Keep a reference to the topic payload pool to avoid it to be destroyed before our own instance
103+
using pool_registry_ref = eprosima::fastrtps::rtps::TopicPayloadPoolRegistry::reference;
104+
static pool_registry_ref topic_pool_registry = eprosima::fastrtps::rtps::TopicPayloadPoolRegistry::instance();
105+
100106
static DomainParticipantFactory instance;
101107
return &instance;
102108
}

src/cpp/rtps/history/TopicPayloadPoolRegistry.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,16 @@ namespace eprosima {
2828
namespace fastrtps {
2929
namespace rtps {
3030

31+
const TopicPayloadPoolRegistry::reference& TopicPayloadPoolRegistry::instance()
32+
{
33+
return detail::TopicPayloadPoolRegistry::instance();
34+
}
35+
3136
std::shared_ptr<ITopicPayloadPool> TopicPayloadPoolRegistry::get(
3237
const std::string& topic_name,
3338
const BasicPoolConfig& config)
3439
{
35-
return detail::TopicPayloadPoolRegistry::instance().get(topic_name, config);
40+
return detail::TopicPayloadPoolRegistry::instance()->get(topic_name, config);
3641
}
3742

3843
void TopicPayloadPoolRegistry::release(
@@ -43,7 +48,7 @@ void TopicPayloadPoolRegistry::release(
4348

4449
if (topic_pool)
4550
{
46-
detail::TopicPayloadPoolRegistry::instance().release(topic_pool);
51+
detail::TopicPayloadPoolRegistry::instance()->release(topic_pool);
4752
}
4853
}
4954

src/cpp/rtps/history/TopicPayloadPoolRegistry.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,19 @@ namespace eprosima {
3030
namespace fastrtps {
3131
namespace rtps {
3232

33+
namespace detail {
34+
class TopicPayloadPoolRegistry;
35+
} // namespace detail
36+
3337
class TopicPayloadPoolRegistry
3438
{
3539

3640
public:
3741

42+
using reference = std::shared_ptr<detail::TopicPayloadPoolRegistry>;
43+
44+
static const reference& instance();
45+
3846
static std::shared_ptr<ITopicPayloadPool> get(
3947
const std::string& topic_name,
4048
const BasicPoolConfig& config);

src/cpp/rtps/history/TopicPayloadPoolRegistry_impl/TopicPayloadPoolRegistry.hpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,10 @@ class TopicPayloadPoolRegistry
3737

3838
public:
3939

40-
~TopicPayloadPoolRegistry() = default;
41-
4240
/// @return reference to singleton instance
43-
static TopicPayloadPoolRegistry& instance()
41+
static const std::shared_ptr<TopicPayloadPoolRegistry>& instance()
4442
{
45-
static TopicPayloadPoolRegistry pool_registry_instance;
43+
static std::shared_ptr<TopicPayloadPoolRegistry> pool_registry_instance(new TopicPayloadPoolRegistry());
4644
return pool_registry_instance;
4745
}
4846

src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ class SharedMemGlobal
127127

128128
PortNode* node_;
129129

130-
std::unique_ptr<MultiProducerConsumerRingBuffer<BufferDescriptor> > buffer_;
130+
std::unique_ptr<MultiProducerConsumerRingBuffer<BufferDescriptor>> buffer_;
131131

132132
uint64_t overflows_count_;
133133

@@ -163,10 +163,10 @@ class SharedMemGlobal
163163
MultiProducerConsumerRingBuffer<BufferDescriptor>* buffer;
164164
};
165165

166-
static WatchTask& get()
166+
static const std::shared_ptr<WatchTask>& get()
167167
{
168-
static WatchTask watch_task;
169-
return watch_task;
168+
static std::shared_ptr<WatchTask> watch_task_instance(new WatchTask());
169+
return watch_task_instance;
170170
}
171171

172172
/**
@@ -202,19 +202,23 @@ class SharedMemGlobal
202202

203203
}
204204

205+
virtual ~WatchTask()
206+
{
207+
shared_mem_watchdog_->remove_task(this);
208+
}
209+
205210
private:
206211

207-
std::vector<std::shared_ptr<PortContext> > watched_ports_;
212+
std::vector<std::shared_ptr<PortContext>> watched_ports_;
208213
std::mutex watched_ports_mutex_;
209214

210-
WatchTask()
211-
{
212-
SharedMemWatchdog::get().add_task(this);
213-
}
215+
// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyed
216+
std::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_;
214217

215-
~WatchTask()
218+
WatchTask()
219+
: shared_mem_watchdog_(SharedMemWatchdog::get())
216220
{
217-
SharedMemWatchdog::get().remove_task(this);
221+
shared_mem_watchdog_->add_task(this);
218222
}
219223

220224
bool update_status_all_listeners(
@@ -339,6 +343,9 @@ class SharedMemGlobal
339343
return (listeners_found == node_->num_listeners);
340344
}
341345

346+
// Keep a reference to the WatchTask so that it is not destroyed until the last Port instance is destroyed
347+
std::shared_ptr<WatchTask> watch_task_;
348+
342349
public:
343350

344351
/**
@@ -378,26 +385,27 @@ class SharedMemGlobal
378385
, node_(node)
379386
, overflows_count_(0)
380387
, read_exclusive_lock_(std::move(read_exclusive_lock))
388+
, watch_task_(WatchTask::get())
381389
{
382390
auto buffer_base = static_cast<MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell*>(
383391
port_segment_->get_address_from_offset(node_->buffer));
384392

385393
auto buffer_node = static_cast<MultiProducerConsumerRingBuffer<BufferDescriptor>::Node*>(
386394
port_segment_->get_address_from_offset(node_->buffer_node));
387395

388-
buffer_ = std::unique_ptr<MultiProducerConsumerRingBuffer<BufferDescriptor> >(
396+
buffer_ = std::unique_ptr<MultiProducerConsumerRingBuffer<BufferDescriptor>>(
389397
new MultiProducerConsumerRingBuffer<BufferDescriptor>(buffer_base, buffer_node));
390398

391399
node_->ref_counter.fetch_add(1);
392400

393401
auto port_context = std::make_shared<Port::WatchTask::PortContext>();
394402
*port_context = {port_segment_, node_, buffer_.get()};
395-
Port::WatchTask::get().add_port(std::move(port_context));
403+
Port::WatchTask::get()->add_port(std::move(port_context));
396404
}
397405

398406
~Port()
399407
{
400-
Port::WatchTask::get().remove_port(node_);
408+
Port::WatchTask::get()->remove_port(node_);
401409

402410
if (node_->ref_counter.fetch_sub(1) == 1)
403411
{

src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ class SharedMemManager :
229229
const std::string& domain_name)
230230
: segments_mem_(0)
231231
, global_segment_(domain_name)
232+
, watch_task_(SegmentWrapper::WatchTask::get())
232233
{
233234
static_assert(std::alignment_of<BufferNode>::value % 8 == 0, "SharedMemManager::BufferNode bad alignment");
234235

@@ -671,7 +672,7 @@ class SharedMemManager :
671672
SharedMemGlobal::PortCell* head_cell = nullptr;
672673
buffer_ref.reset();
673674

674-
while ( !is_closed_.load() && nullptr == (head_cell = global_listener_->head()) )
675+
while ( !is_closed_.load() && nullptr == (head_cell = global_listener_->head()))
675676
{
676677
// Wait until there's data to pop
677678
global_port_->wait_pop(*global_listener_, is_closed_, listener_index_);
@@ -692,7 +693,7 @@ class SharedMemManager :
692693
auto segment = shared_mem_manager_->find_segment(buffer_descriptor.source_segment_id);
693694
auto buffer_node =
694695
static_cast<BufferNode*>(segment->get_address_from_offset(buffer_descriptor.
695-
buffer_node_offset));
696+
buffer_node_offset));
696697

697698
// TODO(Adolfo) : Dynamic allocation. Use foonathan to convert it to static allocation
698699
buffer_ref = std::make_shared<SharedMemBuffer>(segment, buffer_descriptor.source_segment_id,
@@ -999,10 +1000,10 @@ class SharedMemManager :
9991000
{
10001001
public:
10011002

1002-
static WatchTask& get()
1003+
static std::shared_ptr<WatchTask>& get()
10031004
{
1004-
static WatchTask watch_task;
1005-
return watch_task;
1005+
static std::shared_ptr<WatchTask> watch_task_instance(new WatchTask());
1006+
return watch_task_instance;
10061007
}
10071008

10081009
void add_segment(
@@ -1023,24 +1024,28 @@ class SharedMemManager :
10231024
to_remove_.push_back(segment);
10241025
}
10251026

1027+
virtual ~WatchTask()
1028+
{
1029+
shared_mem_watchdog_->remove_task(this);
1030+
}
1031+
10261032
private:
10271033

10281034
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t> watched_segments_;
10291035
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t>::iterator watched_it_;
10301036

10311037
std::mutex to_add_remove_mutex_;
1032-
std::vector<std::shared_ptr<SegmentWrapper> > to_add_;
1033-
std::vector<std::shared_ptr<SegmentWrapper> > to_remove_;
1038+
std::vector<std::shared_ptr<SegmentWrapper>> to_add_;
1039+
std::vector<std::shared_ptr<SegmentWrapper>> to_remove_;
1040+
1041+
// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyed
1042+
std::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_;
10341043

10351044
WatchTask()
10361045
: watched_it_(watched_segments_.end())
1046+
, shared_mem_watchdog_(SharedMemWatchdog::get())
10371047
{
1038-
SharedMemWatchdog::get().add_task(this);
1039-
}
1040-
1041-
~WatchTask()
1042-
{
1043-
SharedMemWatchdog::get().remove_task(this);
1048+
shared_mem_watchdog_->add_task(this);
10441049
}
10451050

10461051
void update_watched_segments()
@@ -1185,12 +1190,15 @@ class SharedMemManager :
11851190
uint32_t per_allocation_extra_size_;
11861191

11871192
std::unordered_map<SharedMemSegment::Id::type, std::shared_ptr<SegmentWrapper>,
1188-
std::hash<SharedMemSegment::Id::type> > ids_segments_;
1193+
std::hash<SharedMemSegment::Id::type>> ids_segments_;
11891194
std::mutex ids_segments_mutex_;
11901195
uint64_t segments_mem_;
11911196

11921197
SharedMemGlobal global_segment_;
11931198

1199+
// Keep a reference to the WatchTask so that it is not destroyed until all Manger instances are destroyed
1200+
std::shared_ptr<SegmentWrapper::WatchTask> watch_task_;
1201+
11941202
std::shared_ptr<SharedMemSegment> find_segment(
11951203
SharedMemSegment::Id id)
11961204
{
@@ -1212,7 +1220,7 @@ class SharedMemManager :
12121220
ids_segments_[id.get()] = segment_wrapper;
12131221
segments_mem_ += segment->mem_size();
12141222

1215-
SegmentWrapper::WatchTask::get().add_segment(segment_wrapper);
1223+
SegmentWrapper::WatchTask::get()->add_segment(segment_wrapper);
12161224
}
12171225

12181226
return segment;
@@ -1243,7 +1251,7 @@ class SharedMemManager :
12431251

12441252
for (auto segment : ids_segments_)
12451253
{
1246-
SegmentWrapper::WatchTask::get().remove_segment(segment.second);
1254+
SegmentWrapper::WatchTask::get()->remove_segment(segment.second);
12471255
}
12481256
}
12491257

src/cpp/rtps/transport/shared_mem/SharedMemWatchdog.hpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ class SharedMemWatchdog
4040
virtual void run() = 0;
4141
};
4242

43-
static SharedMemWatchdog& get()
43+
static std::shared_ptr<SharedMemWatchdog>& get()
4444
{
45-
static SharedMemWatchdog watch_dog;
46-
return watch_dog;
45+
static std::shared_ptr<SharedMemWatchdog> watch_dog_instance(new SharedMemWatchdog());
46+
return watch_dog_instance;
4747
}
4848

4949
/**
@@ -79,6 +79,13 @@ class SharedMemWatchdog
7979
return std::chrono::milliseconds(1000);
8080
}
8181

82+
~SharedMemWatchdog()
83+
{
84+
exit_thread_ = true;
85+
wake_up();
86+
thread_run_.join();
87+
}
88+
8289
private:
8390

8491
std::unordered_set<Task*> tasks_;
@@ -98,13 +105,6 @@ class SharedMemWatchdog
98105
thread_run_ = std::thread(&SharedMemWatchdog::run, this);
99106
}
100107

101-
~SharedMemWatchdog()
102-
{
103-
exit_thread_ = true;
104-
wake_up();
105-
thread_run_.join();
106-
}
107-
108108
/**
109109
* Forces Wake-up of the checking thread
110110
*/

test/unittest/dds/publisher/DataWriterTests.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,25 @@ TEST(DataWriterTests, SetListener)
269269
ASSERT_TRUE(DomainParticipantFactory::get_instance()->delete_participant(participant) == ReturnCode_t::RETCODE_OK);
270270
}
271271

272+
TEST(DataWriterTests, TerminateWithoutDestroyingWriter)
273+
{
274+
DomainParticipant* participant =
275+
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
276+
ASSERT_NE(participant, nullptr);
277+
278+
Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT);
279+
ASSERT_NE(publisher, nullptr);
280+
281+
TypeSupport type(new TopicDataTypeMock());
282+
type.register_type(participant);
283+
284+
Topic* topic = participant->create_topic("footopic", type.get_type_name(), TOPIC_QOS_DEFAULT);
285+
ASSERT_NE(topic, nullptr);
286+
287+
DataWriter* datawriter = publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
288+
ASSERT_NE(datawriter, nullptr);
289+
}
290+
272291
} // namespace dds
273292
} // namespace fastdds
274293
} // namespace eprosima

test/unittest/dds/subscriber/DataReaderTests.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,27 @@ TEST(DataReaderTests, ReadData)
159159

160160

161161

162+
TEST(DataReaderTests, TerminateWithoutDestroyingReader)
163+
{
164+
DomainParticipant* participant =
165+
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
166+
ASSERT_NE(participant, nullptr);
167+
168+
Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
169+
ASSERT_NE(subscriber, nullptr);
170+
171+
TypeSupport type(new TopicDataTypeMock());
172+
type.register_type(participant);
173+
174+
Topic* topic = participant->create_topic("footopic", type.get_type_name(), TOPIC_QOS_DEFAULT);
175+
ASSERT_NE(topic, nullptr);
176+
177+
DataReader* data_reader = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
178+
ASSERT_NE(data_reader, nullptr);
179+
180+
// Do not destroy entities
181+
}
182+
162183
void set_listener_test (
163184
DataReader* reader,
164185
DataReaderListener* listener,

0 commit comments

Comments
 (0)