Skip to content

Commit 8d8f1ca

Browse files
Mario-DLmergify[bot]
authored andcommitted
Fix destruction data-race on participant removal in intra-process (#5034)
* Refs #21293: Add BB test Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: Reinforce test to fail more frequently Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: Add RefCountedPointer.hpp to utils Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: Add unittests for RefCountedPointer Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: LocalReaderPointer.hpp Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: BaseReader aggregates LocalReaderPointer Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: ReaderLocator aggregates LocalReaderPointer Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: RTPSDomainImpl::find_local_reader returns a sared_ptr<LocalReaderPointer> and properly calls local_actions_on_reader_removed() Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: RTPSWriters properly using LocalReaderPointer::Instance when accessing local reader Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: Linter Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: Fix windows warnings Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: Address Miguel's review Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: Apply last comment Signed-off-by: Mario Dominguez <[email protected]> * Refs #21293: NIT Signed-off-by: Mario Dominguez <[email protected]> --------- Signed-off-by: Mario Dominguez <[email protected]> (cherry picked from commit 456e45f)
1 parent c8c8020 commit 8d8f1ca

17 files changed

+623
-43
lines changed

src/cpp/rtps/RTPSDomain.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <rtps/network/utils/external_locators.hpp>
4242
#include <rtps/participant/RTPSParticipantImpl.hpp>
4343
#include <rtps/reader/BaseReader.hpp>
44+
#include <rtps/reader/LocalReaderPointer.hpp>
4445
#include <rtps/RTPSDomainImpl.hpp>
4546
#include <rtps/transport/TCPv4Transport.h>
4647
#include <rtps/transport/TCPv6Transport.h>
@@ -690,7 +691,7 @@ RTPSParticipantImpl* RTPSDomainImpl::find_local_participant(
690691
return nullptr;
691692
}
692693

693-
BaseReader* RTPSDomainImpl::find_local_reader(
694+
std::shared_ptr<LocalReaderPointer> RTPSDomainImpl::find_local_reader(
694695
const GUID_t& reader_guid)
695696
{
696697
auto instance = get_instance();
@@ -704,7 +705,7 @@ BaseReader* RTPSDomainImpl::find_local_reader(
704705
}
705706
}
706707

707-
return nullptr;
708+
return std::shared_ptr<LocalReaderPointer>(nullptr);
708709
}
709710

710711
BaseWriter* RTPSDomainImpl::find_local_writer(

src/cpp/rtps/RTPSDomainImpl.hpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <fastdds/rtps/writer/RTPSWriter.hpp>
3131

3232
#include <rtps/reader/BaseReader.hpp>
33+
#include <rtps/reader/LocalReaderPointer.hpp>
3334
#include <rtps/writer/BaseWriter.hpp>
3435
#include <utils/shared_memory/BoostAtExitRegistry.hpp>
3536
#include <utils/SystemInfo.hpp>
@@ -175,7 +176,7 @@ class RTPSDomainImpl
175176
*
176177
* @returns A pointer to a local reader given its endpoint guid, or nullptr if not found.
177178
*/
178-
static BaseReader* find_local_reader(
179+
static std::shared_ptr<LocalReaderPointer> find_local_reader(
179180
const GUID_t& reader_guid);
180181

181182
/**

src/cpp/rtps/participant/RTPSParticipantImpl.cpp

+20-5
Original file line numberDiff line numberDiff line change
@@ -1350,7 +1350,7 @@ bool RTPSParticipantImpl::createReader(
13501350
return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback);
13511351
}
13521352

1353-
BaseReader* RTPSParticipantImpl::find_local_reader(
1353+
std::shared_ptr<LocalReaderPointer> RTPSParticipantImpl::find_local_reader(
13541354
const GUID_t& reader_guid)
13551355
{
13561356
shared_lock<shared_mutex> _(endpoints_list_mutex);
@@ -1359,11 +1359,11 @@ BaseReader* RTPSParticipantImpl::find_local_reader(
13591359
{
13601360
if (reader->getGuid() == reader_guid)
13611361
{
1362-
return reader;
1362+
return reader->get_local_pointer();
13631363
}
13641364
}
13651365

1366-
return nullptr;
1366+
return std::shared_ptr<LocalReaderPointer>();
13671367
}
13681368

13691369
BaseWriter* RTPSParticipantImpl::find_local_writer(
@@ -1960,6 +1960,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
19601960

19611961
bool found = false, found_in_users = false;
19621962
Endpoint* p_endpoint = nullptr;
1963+
BaseReader* reader = nullptr;
19631964

19641965
if (endpoint.entityId.is_writer())
19651966
{
@@ -1994,6 +1995,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
19941995
{
19951996
if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it
19961997
{
1998+
reader = *rit;
19971999
m_userReaderList.erase(rit);
19982000
found_in_users = true;
19992001
break;
@@ -2004,6 +2006,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
20042006
{
20052007
if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it
20062008
{
2009+
reader = *rit;
20072010
p_endpoint = *rit;
20082011
m_allReaderList.erase(rit);
20092012
found = true;
@@ -2062,6 +2065,10 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
20622065
#endif // if HAVE_SECURITY
20632066
}
20642067

2068+
if (reader)
2069+
{
2070+
reader->local_actions_on_reader_removed();
2071+
}
20652072
delete(p_endpoint);
20662073
return true;
20672074
}
@@ -2149,6 +2156,11 @@ void RTPSParticipantImpl::deleteAllUserEndpoints()
21492156
}
21502157
#endif // if HAVE_SECURITY
21512158

2159+
if (kind == READER)
2160+
{
2161+
static_cast<BaseReader*>(endpoint)->local_actions_on_reader_removed();
2162+
}
2163+
21522164
// remove the endpoints
21532165
delete(endpoint);
21542166
}
@@ -2837,8 +2849,11 @@ bool RTPSParticipantImpl::register_in_reader(
28372849
}
28382850
else if (!fastdds::statistics::is_statistics_builtin(reader_guid.entityId))
28392851
{
2840-
BaseReader* reader = find_local_reader(reader_guid);
2841-
res = reader->add_statistics_listener(listener);
2852+
LocalReaderPointer::Instance local_reader(find_local_reader(reader_guid));
2853+
if (local_reader)
2854+
{
2855+
res = local_reader->add_statistics_listener(listener);
2856+
}
28422857
}
28432858

28442859
return res;

src/cpp/rtps/participant/RTPSParticipantImpl.hpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
#include <rtps/messages/SendBuffersManager.hpp>
5757
#include <rtps/network/NetworkFactory.hpp>
5858
#include <rtps/network/ReceiverResource.h>
59+
#include <rtps/reader/LocalReaderPointer.hpp>
5960
#include <rtps/resources/ResourceEvent.h>
6061
#include <statistics/rtps/monitor-service/interfaces/IConnectionsObserver.hpp>
6162
#include <statistics/rtps/monitor-service/interfaces/IConnectionsQueryable.hpp>
@@ -477,7 +478,7 @@ class RTPSParticipantImpl
477478
/***
478479
* @returns A pointer to a local reader given its endpoint guid, or nullptr if not found.
479480
*/
480-
BaseReader* find_local_reader(
481+
std::shared_ptr<LocalReaderPointer> find_local_reader(
481482
const GUID_t& reader_guid);
482483

483484
/***

src/cpp/rtps/reader/BaseReader.cpp

+12
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ BaseReader::BaseReader(
107107
setup_datasharing(att);
108108
}
109109

110+
void BaseReader::local_actions_on_reader_removed()
111+
{
112+
local_ptr_->deactivate();
113+
}
114+
110115
BaseReader::~BaseReader()
111116
{
112117
EPROSIMA_LOG_INFO(RTPS_READER, "Removing reader " << this->getGuid().entityId);
@@ -272,6 +277,11 @@ void BaseReader::allow_unknown_writers()
272277
accept_messages_from_unkown_writers_ = true;
273278
}
274279

280+
std::shared_ptr<LocalReaderPointer> BaseReader::get_local_pointer()
281+
{
282+
return local_ptr_;
283+
}
284+
275285
bool BaseReader::reserve_cache(
276286
uint32_t cdr_payload_size,
277287
fastdds::rtps::CacheChange_t*& change)
@@ -501,6 +511,8 @@ void BaseReader::init(
501511
fixed_payload_size_ = history_->m_att.payloadMaxSize;
502512
}
503513

514+
local_ptr_ = std::make_shared<LocalReaderPointer>(this);
515+
504516
EPROSIMA_LOG_INFO(RTPS_READER, "RTPSReader created correctly");
505517
}
506518

src/cpp/rtps/reader/BaseReader.hpp

+18
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
#include <fastdds/statistics/rtps/StatisticsCommon.hpp>
4343
#include <fastdds/utils/TimedConditionVariable.hpp>
4444

45+
#include <rtps/reader/LocalReaderPointer.hpp>
46+
4547
namespace eprosima {
4648

4749
namespace fastdds {
@@ -163,6 +165,14 @@ class BaseReader
163165
return datasharing_listener_;
164166
}
165167

168+
/**
169+
* @brief Retrieves the local pointer to this reader
170+
* to be used by other local entities.
171+
*
172+
* @return Local pointer to this reader.
173+
*/
174+
std::shared_ptr<LocalReaderPointer> get_local_pointer();
175+
166176
/**
167177
* @brief Reserve a CacheChange_t.
168178
*
@@ -296,6 +306,11 @@ class BaseReader
296306
const fastdds::rtps::SequenceNumberSet_t& gapList,
297307
VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0;
298308

309+
/**
310+
* @brief Waits for not being referenced/used by any other entity.
311+
*/
312+
virtual void local_actions_on_reader_removed();
313+
299314
#ifdef FASTDDS_STATISTICS
300315

301316
bool add_statistics_listener(
@@ -455,6 +470,9 @@ class BaseReader
455470
/// Trusted writer (for Builtin)
456471
fastdds::rtps::EntityId_t trusted_writer_entity_id_;
457472

473+
/// RefCountedPointer of this instance.
474+
std::shared_ptr<LocalReaderPointer> local_ptr_;
475+
458476
private:
459477

460478
/**
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* @file LocalReaderPointer.hpp
17+
*/
18+
19+
#ifndef FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP
20+
#define FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP
21+
22+
#include <utils/RefCountedPointer.hpp>
23+
24+
namespace eprosima {
25+
namespace fastdds {
26+
namespace rtps {
27+
28+
class BaseReader;
29+
30+
using LocalReaderPointer = RefCountedPointer<BaseReader>;
31+
32+
} // namespace rtps
33+
} // namespace fastdds
34+
} // namespace eprosima
35+
36+
#endif // FASTDDS_RTPS_READER__LOCALREADERPOINTER_HPP

src/cpp/rtps/writer/ReaderLocator.cpp

+10-12
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ ReaderLocator::ReaderLocator(
4545
, async_locator_info_(max_unicast_locators, max_multicast_locators)
4646
, expects_inline_qos_(false)
4747
, is_local_reader_(false)
48-
, local_reader_(nullptr)
48+
, local_reader_()
4949
, guid_prefix_as_vector_(1u)
5050
, guid_as_vector_(1u)
5151
, datasharing_notifier_(nullptr)
@@ -84,7 +84,7 @@ bool ReaderLocator::start(
8484

8585
is_local_reader_ = RTPSDomainImpl::should_intraprocess_between(owner_->getGuid(), remote_guid);
8686
is_datasharing &= !is_local_reader_;
87-
local_reader_ = nullptr;
87+
local_reader_.reset();
8888

8989
if (!is_local_reader_ && !is_datasharing)
9090
{
@@ -177,7 +177,7 @@ void ReaderLocator::stop()
177177
guid_prefix_as_vector_.at(0) = c_GuidPrefix_Unknown;
178178
expects_inline_qos_ = false;
179179
is_local_reader_ = false;
180-
local_reader_ = nullptr;
180+
local_reader_.reset();
181181
}
182182

183183
bool ReaderLocator::send(
@@ -206,13 +206,13 @@ bool ReaderLocator::send(
206206
return true;
207207
}
208208

209-
BaseReader* ReaderLocator::local_reader()
209+
LocalReaderPointer::Instance ReaderLocator::local_reader()
210210
{
211211
if (!local_reader_)
212212
{
213213
local_reader_ = RTPSDomainImpl::find_local_reader(general_locator_info_.remote_guid);
214214
}
215-
return local_reader_;
215+
return LocalReaderPointer::Instance(local_reader_);
216216
}
217217

218218
bool ReaderLocator::is_datasharing_reader() const
@@ -222,15 +222,13 @@ bool ReaderLocator::is_datasharing_reader() const
222222

223223
void ReaderLocator::datasharing_notify()
224224
{
225-
RTPSReader* reader = nullptr;
226225
if (is_local_reader())
227226
{
228-
reader = local_reader();
229-
}
230-
231-
if (reader)
232-
{
233-
BaseReader::downcast(reader)->datasharing_listener()->notify(true);
227+
LocalReaderPointer::Instance reader = local_reader();
228+
if (reader)
229+
{
230+
reader->datasharing_listener()->notify(true);
231+
}
234232
}
235233
else
236234
{

src/cpp/rtps/writer/ReaderLocator.hpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
2626
#include <fastdds/rtps/common/LocatorSelectorEntry.hpp>
2727

28+
#include <rtps/reader/LocalReaderPointer.hpp>
29+
2830
namespace eprosima {
2931
namespace fastdds {
3032
namespace rtps {
@@ -67,10 +69,10 @@ class ReaderLocator : public RTPSMessageSenderInterface
6769
return is_local_reader_;
6870
}
6971

70-
BaseReader* local_reader();
72+
LocalReaderPointer::Instance local_reader();
7173

7274
void local_reader(
73-
BaseReader* local_reader)
75+
std::shared_ptr<LocalReaderPointer> local_reader)
7476
{
7577
local_reader_ = local_reader;
7678
}
@@ -260,7 +262,7 @@ class ReaderLocator : public RTPSMessageSenderInterface
260262
LocatorSelectorEntry async_locator_info_;
261263
bool expects_inline_qos_;
262264
bool is_local_reader_;
263-
BaseReader* local_reader_;
265+
std::shared_ptr<LocalReaderPointer> local_reader_;
264266
std::vector<GuidPrefix_t> guid_prefix_as_vector_;
265267
std::vector<GUID_t> guid_as_vector_;
266268
IDataSharingNotifier* datasharing_notifier_;

src/cpp/rtps/writer/ReaderProxy.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ class ReaderProxy
290290
* Get the local reader on the same process (if any).
291291
* @return The local reader on the same process.
292292
*/
293-
inline BaseReader* local_reader()
293+
inline LocalReaderPointer::Instance local_reader()
294294
{
295295
return locator_info_.local_reader();
296296
}

0 commit comments

Comments
 (0)