Skip to content

Commit 9b45ea3

Browse files
committed
Fix: multi thread listen
Signed-off-by: Evgeny Malygin <[email protected]>
1 parent 6783d0e commit 9b45ea3

File tree

4 files changed

+182
-81
lines changed

4 files changed

+182
-81
lines changed

src/groups/bmq/bmqio/bmqio_ntcchannel.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -927,6 +927,8 @@ void NtcChannel::processClose(const bmqio::Status& status)
927927
return;
928928
}
929929

930+
BALL_LOG_ERROR << "NtcChannel::processClose";
931+
930932
BMQIO_NTCCHANNEL_LOG_CLOSED(this, d_streamSocket_sp, status);
931933

932934
d_state = e_STATE_CLOSED;

src/groups/bmq/bmqio/bmqio_ntcchannelfactory.cpp

Lines changed: 61 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ void NtcChannelFactory::processListenerResult(
9292
bslstl::SharedPtrUtil::dynamicCast(&alias, channel);
9393
if (alias) {
9494
int catalogHandle = d_channels.add(alias);
95+
96+
// Increment resource usage count for new channel
97+
BALL_LOG_ERROR << "d_resourceMonitor.acquire()";
98+
d_resourceMonitor.acquire();
99+
95100
alias->setChannelId(catalogHandle);
96101
alias->onClose(bdlf::BindUtil::bind(
97102
&NtcChannelFactory::processChannelClosed,
@@ -122,16 +127,8 @@ void NtcChannelFactory::processListenerClosed(int handle)
122127
<< BALL_LOG_END;
123128
}
124129

125-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
126-
if (d_state == e_STATE_STOPPING) {
127-
if (d_channels.length() == 0 && d_listeners.length() == 0) {
128-
BALL_LOG_TRACE << "NTC factory channels and listeners have closed"
129-
<< BALL_LOG_END;
130-
131-
d_state = e_STATE_STOPPED;
132-
d_stateCondition.signal();
133-
}
134-
}
130+
BALL_LOG_ERROR << "d_resourceMonitor.release()";
131+
d_resourceMonitor.release(); // Decrement resource usage count
135132
}
136133

137134
void NtcChannelFactory::processChannelResult(
@@ -166,16 +163,8 @@ void NtcChannelFactory::processChannelClosed(int handle)
166163
<< BALL_LOG_END;
167164
}
168165

169-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
170-
if (d_state == e_STATE_STOPPING) {
171-
if (d_channels.length() == 0 && d_listeners.length() == 0) {
172-
BALL_LOG_TRACE << "NTC factory channels and listeners have closed"
173-
<< BALL_LOG_END;
174-
175-
d_state = e_STATE_STOPPED;
176-
d_stateCondition.signal();
177-
}
178-
}
166+
BALL_LOG_ERROR << "d_resourceMonitor.release()";
167+
d_resourceMonitor.release(); // Decrement resource usage count
179168
}
180169

181170
// CREATORS
@@ -189,9 +178,9 @@ NtcChannelFactory::NtcChannelFactory(
189178
, d_createSignaler(basicAllocator)
190179
, d_limitSignaler(basicAllocator)
191180
, d_owned(false)
192-
, d_stateMutex()
193-
, d_stateCondition()
194-
, d_state(e_STATE_DEFAULT)
181+
, d_validator(false)
182+
, d_resourceMonitor(false)
183+
, d_isInterfaceStarted(false)
195184
, d_allocator_p(bslma::Default::allocator(basicAllocator))
196185
{
197186
}
@@ -206,9 +195,9 @@ NtcChannelFactory::NtcChannelFactory(
206195
, d_createSignaler(basicAllocator)
207196
, d_limitSignaler(basicAllocator)
208197
, d_owned(true)
209-
, d_stateMutex()
210-
, d_stateCondition()
211-
, d_state(e_STATE_DEFAULT)
198+
, d_validator(false)
199+
, d_resourceMonitor(false)
200+
, d_isInterfaceStarted(false)
212201
, d_allocator_p(bslma::Default::allocator(basicAllocator))
213202
{
214203
bsl::shared_ptr<bdlbb::BlobBufferFactory> blobBufferFactory_sp(
@@ -232,7 +221,6 @@ NtcChannelFactory::~NtcChannelFactory()
232221
d_interface_sp.reset();
233222
}
234223

235-
BSLS_ASSERT_OPT(d_state == e_STATE_DEFAULT || d_state == e_STATE_STOPPED);
236224
BSLS_ASSERT_OPT(d_listeners.length() == 0);
237225
BSLS_ASSERT_OPT(d_channels.length() == 0);
238226
BSLS_ASSERT_OPT(d_createSignaler.slotCount() == 0);
@@ -243,65 +231,60 @@ NtcChannelFactory::~NtcChannelFactory()
243231
// MANIPULATORS
244232
int NtcChannelFactory::start()
245233
{
246-
ntsa::Error error;
234+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
247235

248-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
236+
if (valGuard.isValid()) {
237+
// Already started.
238+
return 1; // RETURN
239+
}
249240

250-
switch (d_state) {
251-
case e_STATE_DEFAULT:
252-
error = d_interface_sp->start();
241+
if (!d_isInterfaceStarted) {
242+
// Make sure we don't restart the same interface if we have
243+
// `start()`, `stop()`, `start()` sequence.
244+
d_isInterfaceStarted = true;
245+
const ntsa::Error error = d_interface_sp->start();
253246
if (error) {
254247
return error.number(); // RETURN
255248
}
256-
d_state = e_STATE_STARTED;
257-
return 0; // RETURN
258-
case e_STATE_STOPPED: d_state = e_STATE_STARTED; return 0; // RETURN
259-
case e_STATE_STARTED: return 0; // RETURN
260-
case e_STATE_STOPPING: return 1; // RETURN
261-
default: return 1; // RETURN
262249
}
250+
251+
d_resourceMonitor.reset();
252+
d_validator.reset();
253+
254+
return 0;
263255
}
264256

265257
void NtcChannelFactory::stop()
266258
{
267-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
259+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
268260

269-
if (d_state != e_STATE_STARTED) {
261+
if (!valGuard.isValid()) {
270262
return; // RETURN
271263
}
272264

273-
d_state = e_STATE_STOPPING;
265+
valGuard.release()->release();
266+
d_validator.invalidate(); // Disallow new listen/connect
274267

275268
BALL_LOG_TRACE << "NTC factory is stopping" << BALL_LOG_END;
276269

277-
if (d_channels.length() == 0 && d_listeners.length() == 0) {
278-
d_state = e_STATE_STOPPED;
279-
}
280-
else {
281-
{
282-
ChannelIterator iterator(d_channels);
283-
while (iterator) {
284-
iterator.value()->close(bmqio::Status());
285-
++iterator;
286-
}
287-
}
288-
289-
{
290-
ListenerIterator iterator(d_listeners);
291-
while (iterator) {
292-
iterator.value()->cancel();
293-
++iterator;
294-
}
270+
{
271+
ChannelIterator iterator(d_channels);
272+
while (iterator) {
273+
iterator.value()->close(bmqio::Status());
274+
++iterator;
295275
}
276+
}
296277

297-
while (d_state != e_STATE_STOPPED) {
298-
d_stateCondition.wait(&d_stateMutex);
278+
{
279+
ListenerIterator iterator(d_listeners);
280+
while (iterator) {
281+
iterator.value()->cancel();
282+
++iterator;
299283
}
300284
}
301285

302-
BSLS_ASSERT_OPT(d_state == e_STATE_STOPPED);
303-
304-
lock.release()->unlock();
286+
// Wait until all channels and listeners are finished
287+
d_resourceMonitor.invalidate();
305288

306289
d_createSignaler.disconnectAllSlots();
307290
d_limitSignaler.disconnectAllSlots();
@@ -325,9 +308,9 @@ void NtcChannelFactory::listen(Status* status,
325308
handle->reset();
326309
}
327310

328-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
311+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
329312

330-
if (d_state != e_STATE_STARTED) {
313+
if (!valGuard.isValid()) {
331314
bmqio::NtcListenerUtil::fail(status,
332315
bmqio::StatusCategory::e_GENERIC_ERROR,
333316
"state",
@@ -359,10 +342,15 @@ void NtcChannelFactory::listen(Status* status,
359342

360343
rc = listener->listen(status, options);
361344
if (rc != 0) {
345+
BALL_LOG_ERROR << "listener failed";
362346
d_listeners.remove(catalogHandle);
363347
return; // RETURN
364348
}
365349

350+
// Increment resource usage count for new listener
351+
BALL_LOG_ERROR << "d_resourceMonitor.acquire()";
352+
d_resourceMonitor.acquire();
353+
366354
if (handle) {
367355
bslma::ManagedPtr<bmqio::NtcListener> alias(listener.managedPtr());
368356
handle->loadAlias(alias, listener.get());
@@ -389,9 +377,9 @@ void NtcChannelFactory::connect(Status* status,
389377
handle->reset();
390378
}
391379

392-
bslmt::LockGuard<bslmt::Mutex> lock(&d_stateMutex); // LOCKED
380+
bmqu::AtomicValidatorGuard valGuard(&d_validator);
393381

394-
if (d_state != e_STATE_STARTED) {
382+
if (!valGuard.isValid()) {
395383
bmqio::NtcChannelUtil::fail(status,
396384
bmqio::StatusCategory::e_GENERIC_ERROR,
397385
"state",
@@ -425,6 +413,7 @@ void NtcChannelFactory::connect(Status* status,
425413

426414
rc = channel->connect(status, options);
427415
if (rc != 0) {
416+
BALL_LOG_ERROR << "connect failed";
428417
d_channels.remove(catalogHandle);
429418
return; // RETURN
430419
}
@@ -434,6 +423,10 @@ void NtcChannelFactory::connect(Status* status,
434423
handle->loadAlias(alias, channel.get());
435424
}
436425

426+
// Increment resource usage count for new channel
427+
BALL_LOG_ERROR << "d_resourceMonitor.acquire()";
428+
d_resourceMonitor.acquire();
429+
437430
BALL_LOG_TRACE << "NTC channel " << AddressFormatter(channel.get())
438431
<< " to " << channel->peerUri() << " registered"
439432
<< BALL_LOG_END;

src/groups/bmq/bmqio/bmqio_ntcchannelfactory.h

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <bmqio_connectoptions.h>
3434
#include <bmqio_listenoptions.h>
3535
#include <bmqio_ntcchannel.h>
36+
#include <bmqu_atomicvalidator.h>
3637

3738
// NTF
3839
#include <ntca_interfaceconfig.h>
@@ -43,9 +44,6 @@
4344
#include <bdlcc_objectcatalog.h>
4445
#include <bdlma_localsequentialallocator.h>
4546
#include <bsl_memory.h>
46-
#include <bslmt_condition.h>
47-
#include <bslmt_lockguard.h>
48-
#include <bslmt_mutex.h>
4947

5048
namespace BloombergLP {
5149
namespace bmqio {
@@ -98,23 +96,16 @@ class NtcChannelFactory : public bmqio::ChannelFactory {
9896
/// This typedef defines an iterator over a catalog of channels.
9997
typedef bdlcc::ObjectCatalogIter<ChannelEntry> ChannelIterator;
10098

101-
enum State {
102-
e_STATE_DEFAULT,
103-
e_STATE_STARTED,
104-
e_STATE_STOPPING,
105-
e_STATE_STOPPED
106-
};
107-
10899
// INSTANCE DATA
109100
bsl::shared_ptr<ntci::Interface> d_interface_sp;
110101
ListenerCatalog d_listeners;
111102
ChannelCatalog d_channels;
112103
bdlmt::Signaler<CreateFnType> d_createSignaler;
113104
bdlmt::Signaler<LimitFnType> d_limitSignaler;
114105
bool d_owned;
115-
bslmt::Mutex d_stateMutex;
116-
bslmt::Condition d_stateCondition;
117-
State d_state;
106+
bmqu::AtomicValidator d_validator;
107+
bmqu::AtomicValidator d_resourceMonitor;
108+
bsls::AtomicBool d_isInterfaceStarted;
118109
bslma::Allocator* d_allocator_p;
119110

120111
private:

0 commit comments

Comments
 (0)