@@ -92,6 +92,10 @@ void NtcChannelFactory::processListenerResult(
92
92
bslstl::SharedPtrUtil::dynamicCast (&alias, channel);
93
93
if (alias) {
94
94
int catalogHandle = d_channels.add (alias);
95
+
96
+ // Increment resource usage count for new channel
97
+ d_resourceMonitor.acquire ();
98
+
95
99
alias->setChannelId (catalogHandle);
96
100
alias->onClose (bdlf::BindUtil::bind (
97
101
&NtcChannelFactory::processChannelClosed,
@@ -122,16 +126,7 @@ void NtcChannelFactory::processListenerClosed(int handle)
122
126
<< BALL_LOG_END;
123
127
}
124
128
125
- bslmt::LockGuard<bslmt::Mutex> lock (&d_stateMutex); // LOCKED
126
- if (d_state == e_STATE_STOPPING) {
127
- if (d_channels.length () == 0 && d_listeners.length () == 0 ) {
128
- BALL_LOG_TRACE << " NTC factory channels and listeners have closed"
129
- << BALL_LOG_END;
130
-
131
- d_state = e_STATE_STOPPED;
132
- d_stateCondition.signal ();
133
- }
134
- }
129
+ d_resourceMonitor.release (); // Decrement resource usage count
135
130
}
136
131
137
132
void NtcChannelFactory::processChannelResult (
@@ -166,16 +161,7 @@ void NtcChannelFactory::processChannelClosed(int handle)
166
161
<< BALL_LOG_END;
167
162
}
168
163
169
- bslmt::LockGuard<bslmt::Mutex> lock (&d_stateMutex); // LOCKED
170
- if (d_state == e_STATE_STOPPING) {
171
- if (d_channels.length () == 0 && d_listeners.length () == 0 ) {
172
- BALL_LOG_TRACE << " NTC factory channels and listeners have closed"
173
- << BALL_LOG_END;
174
-
175
- d_state = e_STATE_STOPPED;
176
- d_stateCondition.signal ();
177
- }
178
- }
164
+ d_resourceMonitor.release (); // Decrement resource usage count
179
165
}
180
166
181
167
// CREATORS
@@ -189,9 +175,9 @@ NtcChannelFactory::NtcChannelFactory(
189
175
, d_createSignaler(basicAllocator)
190
176
, d_limitSignaler(basicAllocator)
191
177
, d_owned(false )
192
- , d_stateMutex( )
193
- , d_stateCondition( )
194
- , d_state(e_STATE_DEFAULT )
178
+ , d_validator( false )
179
+ , d_resourceMonitor( false )
180
+ , d_isInterfaceStarted( false )
195
181
, d_allocator_p(bslma::Default::allocator(basicAllocator))
196
182
{
197
183
}
@@ -206,9 +192,9 @@ NtcChannelFactory::NtcChannelFactory(
206
192
, d_createSignaler(basicAllocator)
207
193
, d_limitSignaler(basicAllocator)
208
194
, d_owned(true )
209
- , d_stateMutex( )
210
- , d_stateCondition( )
211
- , d_state(e_STATE_DEFAULT )
195
+ , d_validator( false )
196
+ , d_resourceMonitor( false )
197
+ , d_isInterfaceStarted( false )
212
198
, d_allocator_p(bslma::Default::allocator(basicAllocator))
213
199
{
214
200
bsl::shared_ptr<bdlbb::BlobBufferFactory> blobBufferFactory_sp (
@@ -232,7 +218,6 @@ NtcChannelFactory::~NtcChannelFactory()
232
218
d_interface_sp.reset ();
233
219
}
234
220
235
- BSLS_ASSERT_OPT (d_state == e_STATE_DEFAULT || d_state == e_STATE_STOPPED);
236
221
BSLS_ASSERT_OPT (d_listeners.length () == 0 );
237
222
BSLS_ASSERT_OPT (d_channels.length () == 0 );
238
223
BSLS_ASSERT_OPT (d_createSignaler.slotCount () == 0 );
@@ -243,65 +228,60 @@ NtcChannelFactory::~NtcChannelFactory()
243
228
// MANIPULATORS
244
229
int NtcChannelFactory::start ()
245
230
{
246
- ntsa::Error error ;
231
+ bmqu::AtomicValidatorGuard valGuard (&d_validator) ;
247
232
248
- bslmt::LockGuard<bslmt::Mutex> lock (&d_stateMutex); // LOCKED
233
+ if (valGuard.isValid ()) {
234
+ // Already started.
235
+ return 1 ; // RETURN
236
+ }
249
237
250
- switch (d_state) {
251
- case e_STATE_DEFAULT:
252
- error = d_interface_sp->start ();
238
+ if (!d_isInterfaceStarted) {
239
+ // Make sure we don't restart the same interface if we have
240
+ // `start()`, `stop()`, `start()` sequence.
241
+ d_isInterfaceStarted = true ;
242
+ const ntsa::Error error = d_interface_sp->start ();
253
243
if (error) {
254
244
return error.number (); // RETURN
255
245
}
256
- d_state = e_STATE_STARTED;
257
- return 0 ; // RETURN
258
- case e_STATE_STOPPED: d_state = e_STATE_STARTED; return 0 ; // RETURN
259
- case e_STATE_STARTED: return 0 ; // RETURN
260
- case e_STATE_STOPPING: return 1 ; // RETURN
261
- default : return 1 ; // RETURN
262
246
}
247
+
248
+ d_resourceMonitor.reset ();
249
+ d_validator.reset ();
250
+
251
+ return 0 ;
263
252
}
264
253
265
254
void NtcChannelFactory::stop ()
266
255
{
267
- bslmt::LockGuard<bslmt::Mutex> lock (&d_stateMutex); // LOCKED
256
+ bmqu::AtomicValidatorGuard valGuard (&d_validator);
268
257
269
- if (d_state != e_STATE_STARTED ) {
258
+ if (!valGuard. isValid () ) {
270
259
return ; // RETURN
271
260
}
272
261
273
- d_state = e_STATE_STOPPING;
262
+ valGuard.release ()->release ();
263
+ d_validator.invalidate (); // Disallow new listen/connect
274
264
275
265
BALL_LOG_TRACE << " NTC factory is stopping" << BALL_LOG_END;
276
266
277
- if (d_channels.length () == 0 && d_listeners.length () == 0 ) {
278
- d_state = e_STATE_STOPPED;
279
- }
280
- else {
281
- {
282
- ChannelIterator iterator (d_channels);
283
- while (iterator) {
284
- iterator.value ()->close (bmqio::Status ());
285
- ++iterator;
286
- }
287
- }
288
-
289
- {
290
- ListenerIterator iterator (d_listeners);
291
- while (iterator) {
292
- iterator.value ()->cancel ();
293
- ++iterator;
294
- }
267
+ {
268
+ ChannelIterator iterator (d_channels);
269
+ while (iterator) {
270
+ iterator.value ()->close (bmqio::Status ());
271
+ ++iterator;
295
272
}
273
+ }
296
274
297
- while (d_state != e_STATE_STOPPED) {
298
- d_stateCondition.wait (&d_stateMutex);
275
+ {
276
+ ListenerIterator iterator (d_listeners);
277
+ while (iterator) {
278
+ iterator.value ()->cancel ();
279
+ ++iterator;
299
280
}
300
281
}
301
282
302
- BSLS_ASSERT_OPT (d_state == e_STATE_STOPPED);
303
-
304
- lock.release ()->unlock ();
283
+ // Wait until all channels and listeners are finished
284
+ d_resourceMonitor.invalidate ();
305
285
306
286
d_createSignaler.disconnectAllSlots ();
307
287
d_limitSignaler.disconnectAllSlots ();
@@ -325,9 +305,9 @@ void NtcChannelFactory::listen(Status* status,
325
305
handle->reset ();
326
306
}
327
307
328
- bslmt::LockGuard<bslmt::Mutex> lock (&d_stateMutex); // LOCKED
308
+ bmqu::AtomicValidatorGuard valGuard (&d_validator);
329
309
330
- if (d_state != e_STATE_STARTED ) {
310
+ if (!valGuard. isValid () ) {
331
311
bmqio::NtcListenerUtil::fail (status,
332
312
bmqio::StatusCategory::e_GENERIC_ERROR,
333
313
" state" ,
@@ -363,6 +343,9 @@ void NtcChannelFactory::listen(Status* status,
363
343
return ; // RETURN
364
344
}
365
345
346
+ // Increment resource usage count for new listener
347
+ d_resourceMonitor.acquire ();
348
+
366
349
if (handle) {
367
350
bslma::ManagedPtr<bmqio::NtcListener> alias (listener.managedPtr ());
368
351
handle->loadAlias (alias, listener.get ());
@@ -389,9 +372,9 @@ void NtcChannelFactory::connect(Status* status,
389
372
handle->reset ();
390
373
}
391
374
392
- bslmt::LockGuard<bslmt::Mutex> lock (&d_stateMutex); // LOCKED
375
+ bmqu::AtomicValidatorGuard valGuard (&d_validator);
393
376
394
- if (d_state != e_STATE_STARTED ) {
377
+ if (!valGuard. isValid () ) {
395
378
bmqio::NtcChannelUtil::fail (status,
396
379
bmqio::StatusCategory::e_GENERIC_ERROR,
397
380
" state" ,
@@ -434,6 +417,9 @@ void NtcChannelFactory::connect(Status* status,
434
417
handle->loadAlias (alias, channel.get ());
435
418
}
436
419
420
+ // Increment resource usage count for new channel
421
+ d_resourceMonitor.acquire ();
422
+
437
423
BALL_LOG_TRACE << " NTC channel " << AddressFormatter (channel.get ())
438
424
<< " to " << channel->peerUri () << " registered"
439
425
<< BALL_LOG_END;
0 commit comments