Skip to content

Commit 0021032

Browse files
MiguelCompanyadolfomarverLaura Martin
authored
Shared Memory fixes and improvements (#1166)
* RobustInterprocessCondition implementation (#1147) * Refs #8212. RobustInterprocessCondition implementation. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8212. Condition tests. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8183. Bad SHM structures alignment in some platforms. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8212. nullptr check. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8212. Re-enable Liveliness tests. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8212. SHM ABI v3. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8212. Fix cmake error & set SHM_DEFAULT_TRANSPORT=OFF. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8212. FIFO strategy in condition notify. Signed-off-by: AdolfoMartinez <[email protected]> * SHM Buffer recovery mechanishm (#1159) * Refs #8219. Change BufferNode to pre-allocated pool with fixed addresses for the nodes during entire life-cycle. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8219. SHM Buffer invalidation implementation. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8219. logWarning -> logInfo when segment overflow. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8219. Optimization. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8219. Style changes. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8219. buffer_recover test. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8219. 'error:' string removed from log msg. Signed-off-by: AdolfoMartinez <[email protected]> * Setting shared memory on by default. Signed-off-by: Miguel Company <[email protected]> * Fix build fail in ROS-CI. Signed-off-by: AdolfoMartinez <[email protected]> * Fix SHM uncaught exceptions. Signed-off-by: AdolfoMartinez <[email protected]> * Refs #8132 Fix DDS unittests to delete entities before exiting Signed-off-by: Laura Martin <[email protected]> * Disable PSM unittests Signed-off-by: Laura Martin <[email protected]> * Fix boost::interprocess::semaphore initialization. Signed-off-by: AdolfoMartinez <[email protected]> * Fix CXX_STANDARD on boost try_compile Co-authored-by: AdolfoMartinez <[email protected]> Co-authored-by: AdolfoMartinez <[email protected]> Co-authored-by: Laura Martin <[email protected]>
1 parent 2ffac0f commit 0021032

File tree

17 files changed

+1602
-289
lines changed

17 files changed

+1602
-289
lines changed

CMakeLists.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,11 @@ endif()
256256

257257
option(SQLITE3_SUPPORT "Activate SQLITE3 support" ON)
258258

259+
###############################################################################
260+
# SHM as Default transport
261+
###############################################################################
262+
option(SHM_TRANSPORT_DEFAULT "Adds SHM transport to the default transports" ON)
263+
259264
###############################################################################
260265
# Compile library.
261266
###############################################################################
@@ -281,11 +286,6 @@ if(COMPILE_EXAMPLES)
281286
add_subdirectory(examples)
282287
endif()
283288

284-
###############################################################################
285-
# SHM as Default transport
286-
###############################################################################
287-
option(SHM_TRANSPORT_DEFAULT "Adds SHM transport to the default transports" OFF)
288-
289289
###############################################################################
290290
# Documentation
291291
###############################################################################

cmake/modules/FindThirdpartyBoost.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ try_compile(IS_THIRDPARTY_BOOST_OK
2323
${CMAKE_BINARY_DIR}
2424
${PROJECT_SOURCE_DIR}/thirdparty/boost/test/ThirdpartyBoostCompile_test.cpp
2525
CMAKE_FLAGS "-DINCLUDE_DIRECTORIES=${THIRDPARTY_BOOST_INCLUDE_DIR}"
26+
CXX_STANDARD 11
2627
LINK_LIBRARIES ${THIRDPARTY_BOOST_LINK_LIBS}
2728
OUTPUT_VARIABLE OUT
2829
)
Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef _FASTDDS_SHAREDMEM_ROBUST_INTERPROCESS_CONDITION_
16+
#define _FASTDDS_SHAREDMEM_ROBUST_INTERPROCESS_CONDITION_
17+
18+
#include <boost/interprocess/sync/detail/locks.hpp>
19+
#include <boost/interprocess/sync/interprocess_mutex.hpp>
20+
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
21+
#include <boost/interprocess/sync/scoped_lock.hpp>
22+
23+
namespace eprosima {
24+
namespace fastdds {
25+
namespace rtps {
26+
27+
namespace bi = boost::interprocess;
28+
29+
class RobustInterprocessCondition
30+
{
31+
public:
32+
33+
RobustInterprocessCondition()
34+
: list_listening_(SemaphoreList::LIST_NULL, SemaphoreList::LIST_NULL)
35+
, list_free_(0, MAX_LISTENERS-1)
36+
{
37+
init_sem_list();
38+
}
39+
40+
~RobustInterprocessCondition()
41+
{
42+
}
43+
44+
/**
45+
* If there is a thread waiting on *this, change that
46+
* thread's state to ready. Otherwise there is no effect.
47+
* @throw boost::interprocess::interprocess_exception on error.
48+
*/
49+
void notify_one()
50+
{
51+
bi::scoped_lock<bi::interprocess_mutex> lock(semaphore_lists_mutex_);
52+
53+
auto sem_index = list_listening_.head();
54+
55+
if (sem_index != SemaphoreList::LIST_NULL)
56+
{
57+
semaphores_pool_[sem_index].sem.post();
58+
}
59+
}
60+
61+
/**
62+
* Change the state of all threads waiting on *this to ready.
63+
* If there are no waiting threads, notify_all() has no effect.
64+
* @throw boost::interprocess::interprocess_exception on error.
65+
*/
66+
void notify_all()
67+
{
68+
bi::scoped_lock<bi::interprocess_mutex> lock(semaphore_lists_mutex_);
69+
70+
auto sem_index = list_listening_.head();
71+
72+
while (sem_index != SemaphoreList::LIST_NULL)
73+
{
74+
semaphores_pool_[sem_index].sem.post();
75+
sem_index = semaphores_pool_[sem_index].next;
76+
}
77+
}
78+
79+
/**
80+
* Releases the lock on the interprocess_mutex object associated with lock, blocks
81+
* the current thread of execution until readied by a call to
82+
* this->notify_one() or this->notify_all(), and then reacquires the lock.
83+
* @throw boost::interprocess::interprocess_exception on error.
84+
*/
85+
template <typename L>
86+
void wait(
87+
L& lock)
88+
{
89+
do_wait(*lock.mutex());
90+
}
91+
92+
/**
93+
* The same as:
94+
* while (!pred()) wait(lock)
95+
* @throw boost::interprocess::interprocess_exception on error.
96+
*/
97+
template <typename L, typename Pr>
98+
void wait(
99+
L& lock,
100+
Pr pred)
101+
{
102+
while (!pred())
103+
{
104+
do_wait(*lock.mutex());
105+
}
106+
}
107+
108+
/**
109+
* Releases the lock on the interprocess_mutex object associated with lock, blocks
110+
* the current thread of execution until readied by a call to
111+
* this->notify_one() or this->notify_all(), or until time abs_time is reached,
112+
* and then reacquires the lock.
113+
* @return false if time abs_time is reached, otherwise true.
114+
* @throw boost::interprocess::interprocess_exception on error.
115+
*/
116+
template <typename L>
117+
bool timed_wait(
118+
L& lock,
119+
const boost::posix_time::ptime& abs_time)
120+
{
121+
//Handle infinity absolute time here to avoid complications in do_timed_wait
122+
if (abs_time == boost::posix_time::pos_infin)
123+
{
124+
this->wait(lock);
125+
return true;
126+
}
127+
return this->do_timed_wait(abs_time, *lock.mutex());
128+
}
129+
130+
/**
131+
* The same as:
132+
* while (!pred())
133+
* {
134+
* if (!timed_wait(lock, abs_time)) return pred();
135+
* }
136+
* return true;
137+
*/
138+
template <typename L, typename Pr>
139+
bool timed_wait(
140+
L& lock,
141+
const boost::posix_time::ptime& abs_time,
142+
Pr pred)
143+
{
144+
// Posix does not support infinity absolute time so handle it here
145+
if (abs_time == boost::posix_time::pos_infin)
146+
{
147+
wait(lock, pred);
148+
return true;
149+
}
150+
while (!pred())
151+
{
152+
if (!do_timed_wait(abs_time, *lock.mutex()))
153+
{
154+
return pred();
155+
}
156+
}
157+
return true;
158+
}
159+
160+
private:
161+
162+
struct SemaphoreNode
163+
{
164+
bi::interprocess_semaphore sem {0};
165+
uint32_t next;
166+
uint32_t prev;
167+
};
168+
169+
static constexpr uint32_t MAX_LISTENERS = 512;
170+
SemaphoreNode semaphores_pool_[MAX_LISTENERS];
171+
172+
class SemaphoreList
173+
{
174+
private:
175+
176+
uint32_t head_;
177+
uint32_t tail_;
178+
179+
public:
180+
181+
static constexpr uint32_t LIST_NULL = static_cast<uint32_t>(-1);
182+
183+
SemaphoreList(
184+
uint32_t head,
185+
uint32_t tail)
186+
: head_(head)
187+
, tail_(tail)
188+
{
189+
}
190+
191+
inline void push(
192+
uint32_t sem_index,
193+
SemaphoreNode* sem_pool)
194+
{
195+
if (tail_ != LIST_NULL)
196+
{
197+
sem_pool[tail_].next = sem_index;
198+
}
199+
200+
sem_pool[sem_index].prev = tail_;
201+
sem_pool[sem_index].next = LIST_NULL;
202+
203+
tail_ = sem_index;
204+
205+
if (head_ == LIST_NULL)
206+
{
207+
head_ = sem_index;
208+
}
209+
}
210+
211+
inline uint32_t pop(
212+
SemaphoreNode* sem_pool)
213+
{
214+
if (tail_ == LIST_NULL)
215+
{
216+
throw bi::interprocess_exception("RobustInterprocessCondition: pop() on empty list!");
217+
}
218+
219+
uint32_t sem_index = tail_;
220+
tail_ = sem_pool[tail_].prev;
221+
222+
if (tail_ != LIST_NULL)
223+
{
224+
sem_pool[tail_].next = LIST_NULL;
225+
}
226+
else
227+
{
228+
head_ = LIST_NULL;
229+
}
230+
231+
return sem_index;
232+
}
233+
234+
inline uint32_t tail() const
235+
{
236+
return tail_;
237+
}
238+
239+
inline uint32_t head() const
240+
{
241+
return head_;
242+
}
243+
244+
inline void remove(
245+
uint32_t sem_index,
246+
SemaphoreNode* sem_pool)
247+
{
248+
assert(sem_index != LIST_NULL);
249+
250+
auto prev = sem_pool[sem_index].prev;
251+
auto next = sem_pool[sem_index].next;
252+
253+
if (prev != LIST_NULL)
254+
{
255+
sem_pool[prev].next = next;
256+
}
257+
258+
if (next != LIST_NULL)
259+
{
260+
sem_pool[next].prev = prev;
261+
}
262+
263+
if (head_ == sem_index)
264+
{
265+
head_ = next;
266+
}
267+
268+
if (tail_ == sem_index)
269+
{
270+
tail_ = prev;
271+
}
272+
}
273+
};
274+
275+
SemaphoreList list_listening_;
276+
SemaphoreList list_free_;
277+
bi::interprocess_mutex semaphore_lists_mutex_;
278+
279+
void init_sem_list()
280+
{
281+
semaphores_pool_[0].prev = SemaphoreList::LIST_NULL;
282+
semaphores_pool_[0].next = 1;
283+
284+
for (uint32_t i = 1; i < MAX_LISTENERS-1; i++)
285+
{
286+
semaphores_pool_[i].next = i+1;
287+
semaphores_pool_[i].prev = i-1;
288+
}
289+
290+
semaphores_pool_[MAX_LISTENERS-1].prev = MAX_LISTENERS-2;
291+
semaphores_pool_[MAX_LISTENERS-1].next = SemaphoreList::LIST_NULL;
292+
}
293+
294+
inline uint32_t enqueue_listener()
295+
{
296+
auto sem_index = list_free_.pop(semaphores_pool_);
297+
list_listening_.push(sem_index, semaphores_pool_);
298+
return sem_index;
299+
}
300+
301+
inline void dequeue_listener(
302+
uint32_t sem_index)
303+
{
304+
list_listening_.remove(sem_index, semaphores_pool_);
305+
list_free_.push(sem_index, semaphores_pool_);
306+
}
307+
308+
inline void do_wait(
309+
bi::interprocess_mutex& mut)
310+
{
311+
uint32_t sem_index;
312+
313+
{
314+
bi::scoped_lock<bi::interprocess_mutex> lock_enqueue(semaphore_lists_mutex_);
315+
sem_index = enqueue_listener();
316+
}
317+
318+
{
319+
// Release caller's lock
320+
bi::ipcdetail::lock_inverter<bi::interprocess_mutex> inverted_lock(mut);
321+
bi::scoped_lock<bi::ipcdetail::lock_inverter<bi::interprocess_mutex> > unlock(inverted_lock);
322+
323+
// timed_wait (infin) is used, instead wait, because wait on semaphores could throw when
324+
// BOOST_INTERPROCESS_ENABLE_TIMEOUT_WHEN_LOCKING is set. We don't want that for our condition_variables
325+
semaphores_pool_[sem_index].sem.timed_wait(boost::posix_time::pos_infin);
326+
}
327+
328+
{
329+
bi::scoped_lock<bi::interprocess_mutex> lock_dequeue(semaphore_lists_mutex_);
330+
dequeue_listener(sem_index);
331+
}
332+
}
333+
334+
inline bool do_timed_wait(
335+
const boost::posix_time::ptime& abs_time,
336+
bi::interprocess_mutex& mut)
337+
{
338+
bool ret;
339+
uint32_t sem_index;
340+
341+
{
342+
bi::scoped_lock<bi::interprocess_mutex> lock_enqueue(semaphore_lists_mutex_);
343+
sem_index = enqueue_listener();
344+
}
345+
346+
{
347+
// Release caller's lock
348+
bi::ipcdetail::lock_inverter<bi::interprocess_mutex> inverted_lock(mut);
349+
bi::scoped_lock<bi::ipcdetail::lock_inverter<bi::interprocess_mutex> > unlock(inverted_lock);
350+
351+
ret = semaphores_pool_[sem_index].sem.timed_wait(abs_time);
352+
}
353+
354+
{
355+
bi::scoped_lock<bi::interprocess_mutex> lock_dequeue(semaphore_lists_mutex_);
356+
dequeue_listener(sem_index);
357+
}
358+
359+
return ret;
360+
}
361+
};
362+
363+
} // namespace rtps
364+
} // namespace fastdds
365+
} // namespace eprosima
366+
367+
#endif // _FASTDDS_SHAREDMEM_ROBUST_INTERPROCESS_CONDITION_

0 commit comments

Comments
 (0)