Skip to content

Commit 3da3a1f

Browse files
committed
Merge branch 'master' of github.com:OpenDDS/OpenDDS into dcs-recorder-replayer
2 parents 2d16fdf + bba894f commit 3da3a1f

26 files changed

+460
-60
lines changed

cmake/opendds_utils.cmake

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -372,17 +372,28 @@ endfunction()
372372

373373
macro(_opendds_save_cache name type value)
374374
list(APPEND _opendds_save_cache_vars ${name})
375-
set(_opendds_save_cache_${name}_type ${type})
376-
set(_opendds_save_cache_${name}_value "${${name}}")
375+
set(_prefix _opendds_save_cache_${name})
376+
if(DEFINED ${name})
377+
set(${_prefix}_defined TRUE)
378+
set(${_prefix}_type ${type})
379+
set(${_prefix}_value "${${name}}")
380+
else()
381+
set(${_prefix}_defined FALSE)
382+
endif()
377383
set(${name} "${value}" CACHE ${type} "" FORCE)
378384
endmacro()
379385

380386
macro(_opendds_restore_cache)
381-
foreach(name ${_opendds_save_cache_vars})
382-
set(${name} "${_opendds_save_cache_${name}_value}" CACHE
383-
"${_opendds_save_cache_${name}_type}" "" FORCE)
384-
unset(_opendds_save_cache_${name}_type)
385-
unset(_opendds_save_cache_${name}_value)
387+
foreach(_name ${_opendds_save_cache_vars})
388+
set(_prefix _opendds_save_cache_${_name})
389+
if(${_prefix}_defined)
390+
set(${_name} "${${_prefix}_value}" CACHE "${${_prefix}_type}" "" FORCE)
391+
unset(${_prefix}_type)
392+
unset(${_prefix}_value)
393+
else()
394+
unset(${_name} CACHE)
395+
endif()
396+
unset(${_prefix}_defined)
386397
endforeach()
387398
unset(_opendds_save_cache_vars)
388399
endmacro()

dds/DCPS/AddressCache.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,12 @@ class AddressCache {
232232
return map_.empty() && id_map_.empty();
233233
}
234234

235+
size_t size() const
236+
{
237+
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
238+
return map_.size() + id_map_.size();
239+
}
240+
235241
private:
236242

237243
void insert_ids(const Key& key)

dds/DCPS/Dynamic_Cached_Allocator_With_Overflow_T.h

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -51,29 +51,22 @@ class Dynamic_Cached_Allocator_With_Overflow : public ACE_New_Allocator, public
5151
allocs_from_pool_(0),
5252
frees_to_heap_(0),
5353
frees_to_pool_(0),
54+
chunk_size_(ACE_MALLOC_ROUNDUP(chunk_size, ACE_MALLOC_ALIGN)),
55+
begin_(static_cast<unsigned char*>(ACE_Allocator::instance()->malloc(n_chunks * chunk_size_))),
56+
end_(begin_ + n_chunks * chunk_size_),
5457
free_list_(ACE_PURE_FREE_LIST)
5558
{
56-
chunk_size_ = ACE_MALLOC_ROUNDUP(chunk_size, ACE_MALLOC_ALIGN);
57-
begin_ = static_cast<unsigned char*> (ACE_Allocator::instance()->malloc(n_chunks * chunk_size_));
58-
// Remember end of the pool.
59-
end_ = begin_ + n_chunks * chunk_size_;
60-
6159
// Put into free list using placement contructor, no real memory
6260
// allocation in the <new> below.
63-
for (size_t c = 0;
64-
c < n_chunks;
65-
c++) {
66-
void* placement = begin_ + c * chunk_size_;
67-
61+
for (size_t c = 0; c < n_chunks; ++c) {
62+
void* const placement = begin_ + c * chunk_size_;
6863
free_list_.add(new(placement) ACE_Cached_Mem_Pool_Node<char>);
6964
}
7065
}
7166

72-
/// Clear things up.
73-
~Dynamic_Cached_Allocator_With_Overflow() {
67+
~Dynamic_Cached_Allocator_With_Overflow()
68+
{
7469
ACE_Allocator::instance()->free(begin_);
75-
begin_ = 0;
76-
chunk_size_ = 0;
7770
}
7871

7972
/**
@@ -82,7 +75,8 @@ class Dynamic_Cached_Allocator_With_Overflow : public ACE_New_Allocator, public
8275
* and is otherwise ignored since malloc() always returns a pointer to an
8376
* item of @a chunk_size size.
8477
*/
85-
void *malloc(size_t nbytes = 0) {
78+
void* malloc(size_t nbytes = 0)
79+
{
8680
// Check if size requested fits within pre-determined size.
8781
if (nbytes > chunk_size_)
8882
return 0;
@@ -93,7 +87,8 @@ class Dynamic_Cached_Allocator_With_Overflow : public ACE_New_Allocator, public
9387

9488
if (0 == rtn) {
9589
rtn = ACE_Allocator::instance()->malloc(chunk_size_);
96-
allocs_from_heap_++;
90+
++allocs_from_heap_;
91+
heap_allocated_ += chunk_size_;
9792

9893
if (DCPS_debug_level >= 2) {
9994
if (allocs_from_heap_ == 1 && DCPS_debug_level >= 2)
@@ -113,7 +108,7 @@ class Dynamic_Cached_Allocator_With_Overflow : public ACE_New_Allocator, public
113108
}
114109

115110
} else {
116-
allocs_from_pool_++;
111+
++allocs_from_pool_;
117112

118113
if (DCPS_debug_level >= 6)
119114
if (allocs_from_pool_ % 500 == 0)
@@ -134,26 +129,28 @@ class Dynamic_Cached_Allocator_With_Overflow : public ACE_New_Allocator, public
134129
* that it's less or equal to @a chunk_size, and is otherwise ignored
135130
* since calloc() always returns a pointer to an item of @a chunk_size.
136131
*/
137-
virtual void *calloc(size_t /* nbytes */,
138-
char /* initial_value */ = '\0') {
132+
virtual void* calloc(size_t /* nbytes */,
133+
char /* initial_value */ = '\0')
134+
{
139135
ACE_NOTSUP_RETURN(0);
140136
}
141137

142138
/// This method is a no-op and just returns 0 since the free list
143139
/// only works with fixed sized entities.
144-
virtual void *calloc(size_t /* n_elem */,
140+
virtual void* calloc(size_t /* n_elem */,
145141
size_t /* elem_size */,
146142
char /* initial_value */ = '\0') {
147143
ACE_NOTSUP_RETURN(0);
148144
}
149145

150146
/// Return a chunk of memory back to free list cache.
151-
void free(void * ptr) {
152-
unsigned char* tmp = static_cast<unsigned char*> (ptr);
153-
if (tmp < begin_ ||
154-
tmp >= end_) {
147+
void free(void* ptr)
148+
{
149+
unsigned char* tmp = static_cast<unsigned char*>(ptr);
150+
if (tmp < begin_ || tmp >= end_) {
155151
ACE_Allocator::instance()->free(tmp);
156-
frees_to_heap_ ++;
152+
++frees_to_heap_;
153+
heap_allocated_ -= chunk_size_;
157154

158155
if (frees_to_heap_ > allocs_from_heap_) {
159156
ACE_ERROR((LM_ERROR,
@@ -177,7 +174,7 @@ class Dynamic_Cached_Allocator_With_Overflow : public ACE_New_Allocator, public
177174
return;
178175

179176
} else if (ptr != 0) {
180-
frees_to_pool_ ++;
177+
++frees_to_pool_;
181178

182179
if (frees_to_pool_ > allocs_from_pool_) {
183180
ACE_ERROR((LM_ERROR,
@@ -188,7 +185,7 @@ class Dynamic_Cached_Allocator_With_Overflow : public ACE_New_Allocator, public
188185
allocs_from_pool_.load()));
189186
}
190187

191-
free_list_.add((ACE_Cached_Mem_Pool_Node<char> *) ptr) ;
188+
free_list_.add((ACE_Cached_Mem_Pool_Node<char>*) ptr);
192189

193190
if (DCPS_debug_level >= 6)
194191
if (available() % 500 == 0)
@@ -201,41 +198,42 @@ class Dynamic_Cached_Allocator_With_Overflow : public ACE_New_Allocator, public
201198
}
202199

203200
/// Return the number of chunks available in the cache.
204-
size_t pool_depth() {
205-
return free_list_.size() ;
206-
}
201+
size_t pool_depth() { return free_list_.size(); }
207202

208203
// -- for debug
209204

210205
/** How many chunks are available at this time.
211206
*/
212-
size_t available() {
213-
return free_list_.size();
214-
};
207+
size_t available() { return free_list_.size(); }
208+
209+
size_t bytes_heap_allocated() const { return heap_allocated_.value(); }
215210

216211
/// number of allocations from the heap.
217212
Atomic<unsigned long> allocs_from_heap_;
218213
/// number of allocations from the pool.
219214
Atomic<unsigned long> allocs_from_pool_;
220215
/// number of frees returned to the heap
221-
Atomic<unsigned long> frees_to_heap_ ;
216+
Atomic<unsigned long> frees_to_heap_;
222217
/// number of frees returned to the pool
223218
Atomic<unsigned long> frees_to_pool_;
219+
224220
private:
221+
/// Remember the size of our chunks.
222+
const size_t chunk_size_;
223+
225224
/// Remember how we allocate the memory in the first place so
226225
/// we can clear things up later.
227-
unsigned char* begin_;
226+
unsigned char* const begin_;
228227
/// The end of the pool.
229-
unsigned char* end_;
228+
unsigned char* const end_;
230229

231230
/// Maintain a cached memory free list. We use @c char as template
232231
/// parameter, although sizeof(char) is usually less than
233232
/// sizeof(void*). Really important is that @a chunk_size
234233
/// must be greater or equal to sizeof(void*).
235234
ACE_Locked_Free_List<ACE_Cached_Mem_Pool_Node<char>, ACE_LOCK> free_list_;
236235

237-
/// Remember the size of our chunks.
238-
size_t chunk_size_;
236+
ACE_Atomic_Op<ACE_LOCK, size_t> heap_allocated_;
239237
};
240238

241239
} // namespace DCPS

dds/DCPS/ReactorTask.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,12 @@ void ReactorTask::process_command_queue_i(ACE_Guard<ACE_Thread_Mutex>& guard,
367367
}
368368
}
369369

370+
size_t ReactorTask::command_queue_size() const
371+
{
372+
GuardType guard(lock_);
373+
return command_queue_.size();
374+
}
375+
370376
bool ReactorWrapper::open(ACE_Reactor* reactor)
371377
{
372378
reactor_ = reactor;

dds/DCPS/ReactorTask.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ class OpenDDS_Dcps_Export ReactorTask
129129
CommandPtr execute_or_enqueue(CommandPtr command);
130130
void wait_until_empty();
131131

132+
size_t command_queue_size() const;
133+
132134
OPENDDS_POOL_ALLOCATION_FWD
133135

134136
private:

dds/DCPS/transport/framework/DataLink.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1288,6 +1288,42 @@ DataLink::terminate_send_if_suspended()
12881288
}
12891289
}
12901290

1291+
StatisticSeq DataLink::stats_template()
1292+
{
1293+
static const DDS::UInt32 num_local_stats = 9;
1294+
StatisticSeq stats(num_local_stats);
1295+
stats.length(num_local_stats);
1296+
stats[0].name = "DataLinkSendListeners";
1297+
stats[1].name = "DataLinkRecvListeners";
1298+
stats[2].name = "DataLinkAssociationsByRemote";
1299+
stats[3].name = "DataLinkAssociationsByLocal";
1300+
stats[4].name = "DataLinkAssociationsReleasing";
1301+
stats[5].name = "DataLinkOnStartCallbacks";
1302+
stats[6].name = "DataLinkPendingOnStarts";
1303+
stats[7].name = "DataLinkMessageBlocks";
1304+
stats[8].name = "DataLinkDataBlocks";
1305+
return stats;
1306+
}
1307+
1308+
void DataLink::fill_stats(StatisticSeq& stats, DDS::UInt32& idx) const
1309+
{
1310+
{
1311+
GuardType guard(pub_sub_maps_lock_);
1312+
stats[idx++].value = send_listeners_.size();
1313+
stats[idx++].value = recv_listeners_.size();
1314+
stats[idx++].value = assoc_by_remote_.size();
1315+
stats[idx++].value = assoc_by_local_.size();
1316+
stats[idx++].value = assoc_releasing_.size();
1317+
}
1318+
{
1319+
GuardType guard(strategy_lock_);
1320+
stats[idx++].value = on_start_callbacks_.size();
1321+
stats[idx++].value = pending_on_starts_.size();
1322+
}
1323+
stats[idx++].value = mb_allocator_ ? mb_allocator_->bytes_heap_allocated() : 0;
1324+
stats[idx++].value = db_allocator_ ? db_allocator_->bytes_heap_allocated() : 0;
1325+
}
1326+
12911327
}
12921328
}
12931329

dds/DCPS/transport/framework/DataLink.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ class OpenDDS_Dcps_Export DataLink
436436

437437
/// The transport send strategy object for this DataLink.
438438
TransportSendStrategy_rch send_strategy_;
439-
LockType strategy_lock_;
439+
mutable LockType strategy_lock_;
440440

441441
TransportSendStrategy_rch get_send_strategy();
442442

@@ -463,6 +463,9 @@ class OpenDDS_Dcps_Export DataLink
463463

464464
/// Listener for TransportSendControlElements created in send_control
465465
SendResponseListener send_response_listener_;
466+
467+
static StatisticSeq stats_template();
468+
void fill_stats(StatisticSeq& stats, DDS::UInt32& idx) const;
466469
};
467470

468471
} // namespace DCPS

dds/DCPS/transport/framework/TransportImpl.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,25 @@ TransportImpl::dump_to_str()
154154
return cfg ? cfg->dump_to_str(domain_) : OPENDDS_STRING();
155155
}
156156

157+
StatisticSeq TransportImpl::stats_template()
158+
{
159+
static const DDS::UInt32 num_local_stats = 2;
160+
StatisticSeq stats(num_local_stats);
161+
stats.length(num_local_stats);
162+
stats[0].name = "TransportImplPendingConnections";
163+
stats[1].name = "TransportImplReactorTaskCmdQueue";
164+
return stats;
165+
}
166+
167+
void TransportImpl::fill_stats(StatisticSeq& stats, DDS::UInt32& idx) const
168+
{
169+
{
170+
GuardType guard(pending_connections_lock_);
171+
stats[idx++].value = pending_connections_.size();
172+
}
173+
stats[idx++].value = reactor_task_ ? reactor_task_->command_queue_size() : 0;
174+
}
175+
157176
}
158177
}
159178

dds/DCPS/transport/framework/TransportImpl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,9 @@ class OpenDDS_Dcps_Export TransportImpl : public virtual RcObject {
308308
/// Id of the last link established.
309309
AtomicBool is_shut_down_;
310310
DDS::DomainId_t domain_;
311+
312+
static StatisticSeq stats_template();
313+
void fill_stats(StatisticSeq& stats, DDS::UInt32& idx) const;
311314
};
312315

313316
} // namespace DCPS

dds/DCPS/transport/framework/TransportReassembly.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,15 @@ void TransportReassembly::check_expirations(const MonotonicTimePoint& now)
308308
}
309309
}
310310

311+
size_t TransportReassembly::total_frags() const
312+
{
313+
size_t total = 0;
314+
for (FragInfoMap::const_iterator iter = fragments_.begin(); iter != fragments_.end(); ++iter) {
315+
total += iter->second.total_frags_;
316+
}
317+
return total;
318+
}
319+
311320
TransportReassembly::FragInfo::FragInfo()
312321
: have_first_(false)
313322
, total_frags_(0)

dds/DCPS/transport/framework/TransportReassembly.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ class OpenDDS_Dcps_Export TransportReassembly : public RcObject {
105105
CORBA::Long bitmap[], CORBA::ULong length,
106106
CORBA::ULong& numBits) const;
107107

108+
size_t fragments_size() const { return fragments_.size(); }
109+
size_t queue_size() const { return expiration_queue_.size(); }
110+
size_t completed_size() const { return completed_.size(); }
111+
size_t total_frags() const;
112+
108113
private:
109114

110115
bool reassemble_i(const FragmentRange& fragRange, bool firstFrag,

0 commit comments

Comments
 (0)