Skip to content

Commit 11d0d59

Browse files
committed
Fix: multi thread listen
Signed-off-by: Evgeny Malygin <[email protected]>
1 parent 6783d0e commit 11d0d59

File tree

3 files changed

+173
-81
lines changed

3 files changed

+173
-81
lines changed

src/groups/bmq/bmqio/bmqio_ntcchannelfactory.cpp

Lines changed: 54 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ void NtcChannelFactory::processListenerResult(
9292
bslstl::SharedPtrUtil::dynamicCast(&alias, channel);
9393
if (alias) {
9494
int catalogHandle = d_channels.add(alias);
95+
96+
// Increment resource usage count for new channel
97+
d_resourceMonitor.acquire();
98+
9599
alias->setChannelId(catalogHandle);
96100
alias->onClose(bdlf::BindUtil::bind(
97101
&NtcChannelFactory::processChannelClosed,
@@ -122,16 +126,7 @@ void NtcChannelFactory::processListenerClosed(int handle)
122126
<< BALL_LOG_END;
123127
}
124128

125-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
126-
if (d_state == e_STATE_STOPPING) {
127-
if (d_channels.length() == 0 && d_listeners.length() == 0) {
128-
BALL_LOG_TRACE << "NTC factory channels and listeners have closed"
129-
<< BALL_LOG_END;
130-
131-
d_state = e_STATE_STOPPED;
132-
d_stateCondition.signal();
133-
}
134-
}
129+
d_resourceMonitor.release(); // Decrement resource usage count
135130
}
136131

137132
void NtcChannelFactory::processChannelResult(
@@ -166,16 +161,7 @@ void NtcChannelFactory::processChannelClosed(int handle)
166161
<< BALL_LOG_END;
167162
}
168163

169-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
170-
if (d_state == e_STATE_STOPPING) {
171-
if (d_channels.length() == 0 && d_listeners.length() == 0) {
172-
BALL_LOG_TRACE << "NTC factory channels and listeners have closed"
173-
<< BALL_LOG_END;
174-
175-
d_state = e_STATE_STOPPED;
176-
d_stateCondition.signal();
177-
}
178-
}
164+
d_resourceMonitor.release(); // Decrement resource usage count
179165
}
180166

181167
// CREATORS
@@ -189,9 +175,9 @@ NtcChannelFactory::NtcChannelFactory(
189175
, d_createSignaler(basicAllocator)
190176
, d_limitSignaler(basicAllocator)
191177
, d_owned(false)
192-
, d_stateMutex()
193-
, d_stateCondition()
194-
, d_state(e_STATE_DEFAULT)
178+
, d_validator(false)
179+
, d_resourceMonitor(false)
180+
, d_isInterfaceStarted(false)
195181
, d_allocator_p(bslma::Default::allocator(basicAllocator))
196182
{
197183
}
@@ -206,9 +192,9 @@ NtcChannelFactory::NtcChannelFactory(
206192
, d_createSignaler(basicAllocator)
207193
, d_limitSignaler(basicAllocator)
208194
, d_owned(true)
209-
, d_stateMutex()
210-
, d_stateCondition()
211-
, d_state(e_STATE_DEFAULT)
195+
, d_validator(false)
196+
, d_resourceMonitor(false)
197+
, d_isInterfaceStarted(false)
212198
, d_allocator_p(bslma::Default::allocator(basicAllocator))
213199
{
214200
bsl::shared_ptr<bdlbb::BlobBufferFactory> blobBufferFactory_sp(
@@ -232,7 +218,6 @@ NtcChannelFactory::~NtcChannelFactory()
232218
d_interface_sp.reset();
233219
}
234220

235-
BSLS_ASSERT_OPT(d_state == e_STATE_DEFAULT || d_state == e_STATE_STOPPED);
236221
BSLS_ASSERT_OPT(d_listeners.length() == 0);
237222
BSLS_ASSERT_OPT(d_channels.length() == 0);
238223
BSLS_ASSERT_OPT(d_createSignaler.slotCount() == 0);
@@ -243,65 +228,60 @@ NtcChannelFactory::~NtcChannelFactory()
243228
// MANIPULATORS
244229
int NtcChannelFactory::start()
245230
{
246-
ntsa::Error error;
231+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
247232

248-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
233+
if (valGuard.isValid()) {
234+
// Already started.
235+
return 1; // RETURN
236+
}
249237

250-
switch (d_state) {
251-
case e_STATE_DEFAULT:
252-
error = d_interface_sp->start();
238+
if (!d_isInterfaceStarted) {
239+
// Make sure we don't restart the same interface if we have
240+
// `start()`, `stop()`, `start()` sequence.
241+
d_isInterfaceStarted = true;
242+
const ntsa::Error error = d_interface_sp->start();
253243
if (error) {
254244
return error.number(); // RETURN
255245
}
256-
d_state = e_STATE_STARTED;
257-
return 0; // RETURN
258-
case e_STATE_STOPPED: d_state = e_STATE_STARTED; return 0; // RETURN
259-
case e_STATE_STARTED: return 0; // RETURN
260-
case e_STATE_STOPPING: return 1; // RETURN
261-
default: return 1; // RETURN
262246
}
247+
248+
d_resourceMonitor.reset();
249+
d_validator.reset();
250+
251+
return 0;
263252
}
264253

265254
void NtcChannelFactory::stop()
266255
{
267-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
256+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
268257

269-
if (d_state != e_STATE_STARTED) {
258+
if (!valGuard.isValid()) {
270259
return; // RETURN
271260
}
272261

273-
d_state = e_STATE_STOPPING;
262+
valGuard.release()->release();
263+
d_validator.invalidate(); // Disallow new listen/connect
274264

275265
BALL_LOG_TRACE << "NTC factory is stopping" << BALL_LOG_END;
276266

277-
if (d_channels.length() == 0 && d_listeners.length() == 0) {
278-
d_state = e_STATE_STOPPED;
279-
}
280-
else {
281-
{
282-
ChannelIterator iterator(d_channels);
283-
while (iterator) {
284-
iterator.value()->close(bmqio::Status());
285-
++iterator;
286-
}
287-
}
288-
289-
{
290-
ListenerIterator iterator(d_listeners);
291-
while (iterator) {
292-
iterator.value()->cancel();
293-
++iterator;
294-
}
267+
{
268+
ChannelIterator iterator(d_channels);
269+
while (iterator) {
270+
iterator.value()->close(bmqio::Status());
271+
++iterator;
295272
}
273+
}
296274

297-
while (d_state != e_STATE_STOPPED) {
298-
d_stateCondition.wait(&d_stateMutex);
275+
{
276+
ListenerIterator iterator(d_listeners);
277+
while (iterator) {
278+
iterator.value()->cancel();
279+
++iterator;
299280
}
300281
}
301282

302-
BSLS_ASSERT_OPT(d_state == e_STATE_STOPPED);
303-
304-
lock.release()->unlock();
283+
// Wait until all channels and listeners are finished
284+
d_resourceMonitor.invalidate();
305285

306286
d_createSignaler.disconnectAllSlots();
307287
d_limitSignaler.disconnectAllSlots();
@@ -325,9 +305,9 @@ void NtcChannelFactory::listen(Status* status,
325305
handle->reset();
326306
}
327307

328-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
308+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
329309

330-
if (d_state != e_STATE_STARTED) {
310+
if (!valGuard.isValid()) {
331311
bmqio::NtcListenerUtil::fail(status,
332312
bmqio::StatusCategory::e_GENERIC_ERROR,
333313
"state",
@@ -363,6 +343,9 @@ void NtcChannelFactory::listen(Status* status,
363343
return; // RETURN
364344
}
365345

346+
// Increment resource usage count for new listener
347+
d_resourceMonitor.acquire();
348+
366349
if (handle) {
367350
bslma::ManagedPtr<bmqio::NtcListener> alias(listener.managedPtr());
368351
handle->loadAlias(alias, listener.get());
@@ -389,9 +372,9 @@ void NtcChannelFactory::connect(Status* status,
389372
handle->reset();
390373
}
391374

392-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
375+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
393376

394-
if (d_state != e_STATE_STARTED) {
377+
if (!valGuard.isValid()) {
395378
bmqio::NtcChannelUtil::fail(status,
396379
bmqio::StatusCategory::e_GENERIC_ERROR,
397380
"state",
@@ -434,6 +417,9 @@ void NtcChannelFactory::connect(Status* status,
434417
handle->loadAlias(alias, channel.get());
435418
}
436419

420+
// Increment resource usage count for new channel
421+
d_resourceMonitor.acquire();
422+
437423
BALL_LOG_TRACE << "NTC channel " << AddressFormatter(channel.get())
438424
<< " to " << channel->peerUri() << " registered"
439425
<< BALL_LOG_END;

src/groups/bmq/bmqio/bmqio_ntcchannelfactory.h

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <bmqio_connectoptions.h>
3434
#include <bmqio_listenoptions.h>
3535
#include <bmqio_ntcchannel.h>
36+
#include <bmqu_atomicvalidator.h>
3637

3738
// NTF
3839
#include <ntca_interfaceconfig.h>
@@ -43,9 +44,6 @@
4344
#include <bdlcc_objectcatalog.h>
4445
#include <bdlma_localsequentialallocator.h>
4546
#include <bsl_memory.h>
46-
#include <bslmt_condition.h>
47-
#include <bslmt_lockguard.h>
48-
#include <bslmt_mutex.h>
4947

5048
namespace BloombergLP {
5149
namespace bmqio {
@@ -98,23 +96,16 @@ class NtcChannelFactory : public bmqio::ChannelFactory {
9896
/// This typedef defines an iterator over a catalog of channels.
9997
typedef bdlcc::ObjectCatalogIter<ChannelEntry> ChannelIterator;
10098

101-
enum State {
102-
e_STATE_DEFAULT,
103-
e_STATE_STARTED,
104-
e_STATE_STOPPING,
105-
e_STATE_STOPPED
106-
};
107-
10899
// INSTANCE DATA
109100
bsl::shared_ptr<ntci::Interface> d_interface_sp;
110101
ListenerCatalog d_listeners;
111102
ChannelCatalog d_channels;
112103
bdlmt::Signaler<CreateFnType> d_createSignaler;
113104
bdlmt::Signaler<LimitFnType> d_limitSignaler;
114105
bool d_owned;
115-
bslmt::Mutex d_stateMutex;
116-
bslmt::Condition d_stateCondition;
117-
State d_state;
106+
bmqu::AtomicValidator d_validator;
107+
bmqu::AtomicValidator d_resourceMonitor;
108+
bsls::AtomicBool d_isInterfaceStarted;
118109
bslma::Allocator* d_allocator_p;
119110

120111
private:

0 commit comments

Comments
 (0)