Skip to content

Commit 3af3a74

Browse files
committed
Fix: multi thread listen
Signed-off-by: Evgeny Malygin <[email protected]>
1 parent 7002f95 commit 3af3a74

File tree

3 files changed

+172
-78
lines changed

3 files changed

+172
-78
lines changed

src/groups/bmq/bmqio/bmqio_ntcchannelfactory.cpp

Lines changed: 54 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ void NtcChannelFactory::processListenerResult(
9393
bslstl::SharedPtrUtil::dynamicCast(&alias, channel);
9494
if (alias) {
9595
int catalogHandle = d_channels.add(alias);
96+
97+
// Increment resource usage count for new channel
98+
d_resourceMonitor.acquire();
99+
96100
alias->setChannelId(catalogHandle);
97101
alias->onClose(bdlf::BindUtil::bind(
98102
&NtcChannelFactory::processChannelClosed,
@@ -123,16 +127,7 @@ void NtcChannelFactory::processListenerClosed(int handle)
123127
<< BALL_LOG_END;
124128
}
125129

126-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
127-
if (d_state == e_STATE_STOPPING) {
128-
if (d_channels.length() == 0 && d_listeners.length() == 0) {
129-
BALL_LOG_TRACE << "NTC factory channels and listeners have closed"
130-
<< BALL_LOG_END;
131-
132-
d_state = e_STATE_STOPPED;
133-
d_stateCondition.signal();
134-
}
135-
}
130+
d_resourceMonitor.release(); // Decrement resource usage count
136131
}
137132

138133
void NtcChannelFactory::processChannelResult(
@@ -167,16 +162,7 @@ void NtcChannelFactory::processChannelClosed(int handle)
167162
<< BALL_LOG_END;
168163
}
169164

170-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
171-
if (d_state == e_STATE_STOPPING) {
172-
if (d_channels.length() == 0 && d_listeners.length() == 0) {
173-
BALL_LOG_TRACE << "NTC factory channels and listeners have closed"
174-
<< BALL_LOG_END;
175-
176-
d_state = e_STATE_STOPPED;
177-
d_stateCondition.signal();
178-
}
179-
}
165+
d_resourceMonitor.release(); // Decrement resource usage count
180166
}
181167

182168
// CREATORS
@@ -190,9 +176,9 @@ NtcChannelFactory::NtcChannelFactory(
190176
, d_createSignaler(basicAllocator)
191177
, d_limitSignaler(basicAllocator)
192178
, d_owned(false)
193-
, d_stateMutex()
194-
, d_stateCondition()
195-
, d_state(e_STATE_DEFAULT)
179+
, d_validator(false)
180+
, d_resourceMonitor(false)
181+
, d_isInterfaceStarted(false)
196182
, d_allocator_p(bslma::Default::allocator(basicAllocator))
197183
{
198184
}
@@ -207,9 +193,9 @@ NtcChannelFactory::NtcChannelFactory(
207193
, d_createSignaler(basicAllocator)
208194
, d_limitSignaler(basicAllocator)
209195
, d_owned(true)
210-
, d_stateMutex()
211-
, d_stateCondition()
212-
, d_state(e_STATE_DEFAULT)
196+
, d_validator(false)
197+
, d_resourceMonitor(false)
198+
, d_isInterfaceStarted(false)
213199
, d_allocator_p(bslma::Default::allocator(basicAllocator))
214200
{
215201
bsl::shared_ptr<bdlbb::BlobBufferFactory> blobBufferFactory_sp(
@@ -233,7 +219,6 @@ NtcChannelFactory::~NtcChannelFactory()
233219
d_interface_sp.reset();
234220
}
235221

236-
BSLS_ASSERT_OPT(d_state == e_STATE_DEFAULT || d_state == e_STATE_STOPPED);
237222
BSLS_ASSERT_OPT(d_listeners.length() == 0);
238223
BSLS_ASSERT_OPT(d_channels.length() == 0);
239224
BSLS_ASSERT_OPT(d_createSignaler.slotCount() == 0);
@@ -244,65 +229,60 @@ NtcChannelFactory::~NtcChannelFactory()
244229
// MANIPULATORS
245230
int NtcChannelFactory::start()
246231
{
247-
ntsa::Error error;
232+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
248233

249-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
234+
if (valGuard.isValid()) {
235+
// Already started.
236+
return 1; // RETURN
237+
}
250238

251-
switch (d_state) {
252-
case e_STATE_DEFAULT:
253-
error = d_interface_sp->start();
239+
if (!d_isInterfaceStarted) {
240+
// Make sure we don't restart the same interface if we have
241+
// `start()`, `stop()`, `start()` sequence.
242+
d_isInterfaceStarted = true;
243+
const ntsa::Error error = d_interface_sp->start();
254244
if (error) {
255245
return error.number(); // RETURN
256246
}
257-
d_state = e_STATE_STARTED;
258-
return 0; // RETURN
259-
case e_STATE_STOPPED: d_state = e_STATE_STARTED; return 0; // RETURN
260-
case e_STATE_STARTED: return 0; // RETURN
261-
case e_STATE_STOPPING: return 1; // RETURN
262-
default: return 1; // RETURN
263247
}
248+
249+
d_resourceMonitor.reset();
250+
d_validator.reset();
251+
252+
return 0;
264253
}
265254

266255
void NtcChannelFactory::stop()
267256
{
268-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
257+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
269258

270-
if (d_state != e_STATE_STARTED) {
259+
if (!valGuard.isValid()) {
271260
return; // RETURN
272261
}
273262

274-
d_state = e_STATE_STOPPING;
263+
valGuard.release()->release();
264+
d_validator.invalidate(); // Disallow new listen/connect
275265

276266
BALL_LOG_TRACE << "NTC factory is stopping" << BALL_LOG_END;
277267

278-
if (d_channels.length() == 0 && d_listeners.length() == 0) {
279-
d_state = e_STATE_STOPPED;
280-
}
281-
else {
282-
{
283-
ChannelIterator iterator(d_channels);
284-
while (iterator) {
285-
iterator.value()->close(bmqio::Status());
286-
++iterator;
287-
}
288-
}
289-
290-
{
291-
ListenerIterator iterator(d_listeners);
292-
while (iterator) {
293-
iterator.value()->cancel();
294-
++iterator;
295-
}
268+
{
269+
ChannelIterator iterator(d_channels);
270+
while (iterator) {
271+
iterator.value()->close(bmqio::Status());
272+
++iterator;
296273
}
274+
}
297275

298-
while (d_state != e_STATE_STOPPED) {
299-
d_stateCondition.wait(&d_stateMutex);
276+
{
277+
ListenerIterator iterator(d_listeners);
278+
while (iterator) {
279+
iterator.value()->cancel();
280+
++iterator;
300281
}
301282
}
302283

303-
BSLS_ASSERT_OPT(d_state == e_STATE_STOPPED);
304-
305-
lock.release()->unlock();
284+
// Wait until all channels and listeners are finished
285+
d_resourceMonitor.invalidate();
306286

307287
d_createSignaler.disconnectAllSlots();
308288
d_limitSignaler.disconnectAllSlots();
@@ -326,9 +306,9 @@ void NtcChannelFactory::listen(Status* status,
326306
handle->reset();
327307
}
328308

329-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
309+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
330310

331-
if (d_state != e_STATE_STARTED) {
311+
if (!valGuard.isValid()) {
332312
bmqio::NtcListenerUtil::fail(status,
333313
bmqio::StatusCategory::e_GENERIC_ERROR,
334314
"state",
@@ -364,6 +344,9 @@ void NtcChannelFactory::listen(Status* status,
364344
return; // RETURN
365345
}
366346

347+
// Increment resource usage count for new listener
348+
d_resourceMonitor.acquire();
349+
367350
if (handle) {
368351
bslma::ManagedPtr<bmqio::NtcListener> alias(listener.managedPtr());
369352
handle->loadAlias(alias, listener.get());
@@ -390,9 +373,9 @@ void NtcChannelFactory::connect(Status* status,
390373
handle->reset();
391374
}
392375

393-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
376+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
394377

395-
if (d_state != e_STATE_STARTED) {
378+
if (!valGuard.isValid()) {
396379
bmqio::NtcChannelUtil::fail(status,
397380
bmqio::StatusCategory::e_GENERIC_ERROR,
398381
"state",
@@ -435,6 +418,9 @@ void NtcChannelFactory::connect(Status* status,
435418
handle->loadAlias(alias, channel.get());
436419
}
437420

421+
// Increment resource usage count for new channel
422+
d_resourceMonitor.acquire();
423+
438424
BALL_LOG_TRACE << "NTC channel " << AddressFormatter(channel.get())
439425
<< " to " << channel->peerUri() << " registered"
440426
<< BALL_LOG_END;

src/groups/bmq/bmqio/bmqio_ntcchannelfactory.h

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <bmqio_connectoptions.h>
3434
#include <bmqio_listenoptions.h>
3535
#include <bmqio_ntcchannel.h>
36+
#include <bmqu_atomicvalidator.h>
3637

3738
// NTF
3839
#include <ntca_interfaceconfig.h>
@@ -100,23 +101,16 @@ class NtcChannelFactory : public bmqio::ChannelFactory {
100101
/// This typedef defines an iterator over a catalog of channels.
101102
typedef bdlcc::ObjectCatalogIter<ChannelEntry> ChannelIterator;
102103

103-
enum State {
104-
e_STATE_DEFAULT,
105-
e_STATE_STARTED,
106-
e_STATE_STOPPING,
107-
e_STATE_STOPPED
108-
};
109-
110104
// INSTANCE DATA
111105
bsl::shared_ptr<ntci::Interface> d_interface_sp;
112106
ListenerCatalog d_listeners;
113107
ChannelCatalog d_channels;
114108
bdlmt::Signaler<CreateFnType> d_createSignaler;
115109
bdlmt::Signaler<LimitFnType> d_limitSignaler;
116110
bool d_owned;
117-
bslmt::Mutex d_stateMutex;
118-
bslmt::Condition d_stateCondition;
119-
State d_state;
111+
bmqu::AtomicValidator d_validator;
112+
bmqu::AtomicValidator d_resourceMonitor;
113+
bsls::AtomicBool d_isInterfaceStarted;
120114
bslma::Allocator* d_allocator_p;
121115

122116
private:

0 commit comments

Comments
 (0)