Skip to content

Commit 6783d0e

Browse files
authored
Fix[MQB]: do not access queue handle flags from cluster thread [179311632] (#764)
Signed-off-by: Evgeny Malygin <[email protected]>
1 parent 61e68d5 commit 6783d0e

File tree

3 files changed

+39
-56
lines changed

3 files changed

+39
-56
lines changed

src/groups/mqb/mqbblp/mqbblp_cluster.cpp

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,6 +1045,9 @@ void Cluster::onPutEvent(const mqbi::DispatcherPutEvent& event)
10451045
// it needs to be forwarded to the queue after appropriate checks. The
10461046
// replica source node is event.clusterNode(). This routine is similar to
10471047
// that of ClientSession.
1048+
// Note that we don't check if the queue is opened in WRITE mode here, we
1049+
// just pass the event to the corresponding *QUEUE* dispatcher thread that
1050+
// can safely access the `QueueHandleParameters` to check this.
10481051

10491052
mqbnet::ClusterNode* source = event.clusterNode();
10501053
mqbc::ClusterNodeSession* ns =
@@ -1128,28 +1131,6 @@ void Cluster::onPutEvent(const mqbi::DispatcherPutEvent& event)
11281131
bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) !=
11291132
queueState.d_subQueueInfosMap.end());
11301133

1131-
// Ensure that queue is opened in WRITE mode
1132-
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
1133-
!bmqt::QueueFlagsUtil::isWriter(
1134-
queueState.d_handle_p->handleParameters().flags()))) {
1135-
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
1136-
1137-
BMQU_THROTTLEDACTION_THROTTLE(
1138-
d_throttledFailedPutMessages,
1139-
BALL_LOG_WARN << description() << ": PUT message for queue ["
1140-
<< queueState.d_handle_p->queue()->uri()
1141-
<< "] not opened in WRITE mode.";);
1142-
1143-
sendAck(bmqt::AckResult::e_REFUSED,
1144-
bmqp::AckMessage::k_NULL_CORRELATION_ID,
1145-
putIt.header().messageGUID(),
1146-
queueId.id(),
1147-
"putEvent::notWritable",
1148-
ns,
1149-
true); // isSelfGenerated
1150-
continue; // CONTINUE
1151-
}
1152-
11531134
// Ensure that final closeQueue request has not been sent.
11541135

11551136
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(

src/groups/mqb/mqbblp/mqbblp_localqueue.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -421,14 +421,18 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
421421
<< "] from client [" << source->client()->description()
422422
<< "]. Queue not opened in WRITE mode by the client.";);
423423

424-
// Note that a NACK is not sent in this case. This is a case of client
425-
// violating the contract, by attempting to post a message after
426-
// closing/reconfiguring the queue. Since this is out of contract, its
427-
// ok not to send the NACK. If it is still desired to send a NACK, it
428-
// will need some enqueuing b/w client and queue dispatcher threads to
429-
// ensure that despite NACKs being sent, closeQueue response is still
430-
// the last event to be sent to the client for the given queue.
424+
bmqp::AckMessage ackMessage;
425+
ackMessage
426+
.setStatus(bmqp::ProtocolUtil::ackResultToCode(
427+
bmqt::AckResult::e_REFUSED))
428+
.setMessageGUID(putHeader.messageGUID());
429+
// CorrelationId & QueueId are left unset as those fields will
430+
// be filled downstream.
431431

432+
source->onAckMessage(ackMessage);
433+
434+
d_state_p->stats()
435+
->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(1);
432436
return; // RETURN
433437
}
434438

src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -904,33 +904,6 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,
904904
translation.applyTo(&putHeader);
905905

906906
// Relay the PUT message via clusterProxy/cluster
907-
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(!bmqt::QueueFlagsUtil::isWriter(
908-
source->handleParameters().flags()))) {
909-
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
910-
911-
// Either queue was not opened in the WRITE mode (which should have
912-
// been caught in the SDK) or client is posting a message after closing
913-
// or reconfiguring the queue (which may not be caught in the SDK).
914-
915-
if (d_throttledFailedPutMessages.requestPermission()) {
916-
BALL_LOG_WARN
917-
<< "[THROTTLED] #CLIENT_IMPROPER_BEHAVIOR "
918-
<< "Failed PUT message for queue [" << d_state_p->uri()
919-
<< "] from client [" << source->client()->description()
920-
<< "]. Queue not opened in WRITE mode by the client.";
921-
}
922-
923-
// Note that a NACK is not sent in this case. This is a case of client
924-
// violating the contract, by attempting to post a message after
925-
// closing/reconfiguring the queue. Since this is out of contract, its
926-
// ok not to send the NACK. If it is still desired to send a NACK, it
927-
// will need some enqueuing b/w client and queue dispatcher threads to
928-
// ensure that despite NACKs being sent, closeQueue response is still
929-
// the last event to be sent to the client for the given queue.
930-
931-
return; // RETURN
932-
}
933-
934907
SubStreamContext& ctx = d_producerState;
935908

936909
if (ctx.d_state == SubStreamContext::e_NONE) {
@@ -944,6 +917,31 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,
944917

945918
if (ctx.d_state == SubStreamContext::e_CLOSED) {
946919
isInvalid = true;
920+
921+
if (d_throttledFailedPutMessages.requestPermission()) {
922+
BALL_LOG_WARN << "[THROTTLED] Failed PUT message for queue ["
923+
<< d_state_p->uri()
924+
<< "]. Upstream is already closed.";
925+
}
926+
}
927+
else if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
928+
!bmqt::QueueFlagsUtil::isWriter(
929+
source->handleParameters().flags()))) {
930+
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
931+
932+
// Either queue was not opened in the WRITE mode (which should have
933+
// been caught in the SDK) or client is posting a message after closing
934+
// or reconfiguring the queue (which may not be caught in the SDK).
935+
936+
isInvalid = true;
937+
938+
if (d_throttledFailedPutMessages.requestPermission()) {
939+
BALL_LOG_WARN
940+
<< "[THROTTLED] #CLIENT_IMPROPER_BEHAVIOR "
941+
<< "Failed PUT message for queue [" << d_state_p->uri()
942+
<< "] from client [" << source->client()->description()
943+
<< "]. Queue not opened in WRITE mode by the client.";
944+
}
947945
}
948946
else if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
949947
d_pendingMessages.find(putHeader.messageGUID()) !=

0 commit comments

Comments
 (0)