Skip to content

Commit 2e2e640

Browse files
committed
Prevent cycle in the QueueManager
1 parent 236ca5e commit 2e2e640

File tree

3 files changed

+195
-22
lines changed

3 files changed

+195
-22
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,7 @@ object LoggingMarkers {
601601
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
602602
def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
603603
def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
604+
def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds)
604605
def SCHEDULER_QUEUE_UPDATE(reason: String) =
605606
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
606607
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala

Lines changed: 122 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ case class CreateNewQueue(activationMessage: ActivationMessage,
5555
action: FullyQualifiedEntityName,
5656
actionMetadata: WhiskActionMetaData)
5757

58+
case class RecoverQueue(activationMessage: ActivationMessage,
59+
action: FullyQualifiedEntityName,
60+
actionMetadata: WhiskActionMetaData)
61+
5862
case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
5963

6064
class QueueManager(
@@ -80,7 +84,7 @@ class QueueManager(
8084

8185
private val actorSelectionMap = TrieMap[String, ActorSelection]()
8286

83-
private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
87+
private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()
8488

8589
private implicit val askTimeout = Timeout(5.seconds)
8690
private implicit val ec = context.dispatcher
@@ -90,6 +94,8 @@ class QueueManager(
9094
// watch leaders and register them into actorSelectionMap
9195
watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
9296

97+
private var isShuttingDown = false
98+
9399
override def receive: Receive = {
94100
case request: CreateQueue if isWarmUpAction(request.fqn) =>
95101
logging.info(
@@ -114,12 +120,12 @@ class QueueManager(
114120
msg.leadership match {
115121
case Right(EtcdLeader(key, value, lease)) =>
116122
leaderElectionCallbacks.remove(key).foreach { callback =>
117-
callback(Right(EtcdLeader(key, value, lease)))
123+
callback(Right(EtcdLeader(key, value, lease)), isShuttingDown)
118124
}
119125

120126
case Left(EtcdFollower(key, value)) =>
121127
leaderElectionCallbacks.remove(key).foreach { callback =>
122-
callback(Left(EtcdFollower(key, value)))
128+
callback(Left(EtcdFollower(key, value)), isShuttingDown)
123129
}
124130
}
125131

@@ -129,7 +135,11 @@ class QueueManager(
129135
s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
130136
msg.transid)
131137

132-
handleActivationMessage(msg)
138+
if (sender() == self) {
139+
handleCycle(msg)(msg.transid)
140+
} else {
141+
handleActivationMessage(msg)
142+
}
133143

134144
case UpdateMemoryQueue(oldAction, newAction, msg) =>
135145
logging.info(
@@ -164,6 +174,25 @@ class QueueManager(
164174
updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
165175
queue ! msg
166176
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
177+
if (isShuttingDown) {
178+
queue ! GracefulShutdown
179+
}
180+
}
181+
182+
case RecoverQueue(msg, action, actionMetaData) =>
183+
QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
184+
// a newer queue is created, send msg to new queue
185+
case Some(key) if key.docInfo.rev >= msg.revision =>
186+
QueuePool.get(key) match {
187+
case Some(queue) if queue.isLeader =>
188+
queue.queue ! msg.copy(revision = key.docInfo.rev)
189+
logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
190+
case _ =>
191+
recreateQueue(action, msg, actionMetaData)
192+
}
193+
case _ =>
194+
recreateQueue(action, msg, actionMetaData)
195+
167196
}
168197

169198
// leaderKey is now optional, it becomes None when the stale queue is removed
@@ -208,6 +237,7 @@ class QueueManager(
208237
}
209238

210239
case GracefulShutdown =>
240+
isShuttingDown = true
211241
logging.info(this, s"Gracefully shutdown the queue manager")
212242

213243
watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
@@ -278,6 +308,62 @@ class QueueManager(
278308
initRevisionMap.update(key, revision)
279309
}
280310

311+
private def recreateQueue(action: FullyQualifiedEntityName,
312+
msg: ActivationMessage,
313+
actionMetaData: WhiskActionMetaData): Unit = {
314+
logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid)
315+
val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
316+
queue ! msg
317+
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
318+
if (isShuttingDown) {
319+
queue ! GracefulShutdown
320+
}
321+
}
322+
323+
private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Unit = {
324+
val action = msg.action
325+
QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
326+
// a newer queue is created, send msg to new queue
327+
case Some(key) if key.docInfo.rev >= msg.revision =>
328+
QueuePool.get(key) match {
329+
case Some(queue) if queue.isLeader =>
330+
queue.queue ! msg.copy(revision = key.docInfo.rev)
331+
logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
332+
case _ =>
333+
recoverQueue(msg)
334+
}
335+
case _ =>
336+
recoverQueue(msg)
337+
}
338+
}
339+
340+
private def recoverQueue(msg: ActivationMessage)(implicit transid: TransactionId): Unit = {
341+
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
342+
logging.info(this, s"Recover a queue for ${msg.action},")
343+
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
344+
.map { actionMetaData: WhiskActionMetaData =>
345+
actionMetaData.toExecutableWhiskAction match {
346+
case Some(_) =>
347+
self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData)
348+
transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}")
349+
350+
case None =>
351+
val message =
352+
s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager"
353+
completeErrorActivation(msg, message)
354+
transid.failed(this, start, message)
355+
}
356+
}
357+
.recover {
358+
case t =>
359+
transid.failed(
360+
this,
361+
start,
362+
s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}")
363+
completeErrorActivation(msg, t.getMessage)
364+
}
365+
}
366+
281367
private def createNewQueue(newAction: FullyQualifiedEntityName, msg: ActivationMessage)(
282368
implicit transid: TransactionId): Future[Any] = {
283369
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_UPDATE("version-mismatch"))
@@ -453,24 +539,24 @@ class QueueManager(
453539
case None =>
454540
dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
455541
leaderElectionCallbacks.put(
456-
leaderKey, {
457-
case Right(EtcdLeader(_, _, _)) =>
458-
val queue = childFactory(
459-
context,
460-
request.invocationNamespace,
461-
request.fqn,
462-
request.revision,
463-
request.whiskActionMetaData)
464-
queue ! Start
465-
QueuePool.put(
466-
MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
467-
MemoryQueueValue(queue, true))
468-
updateInitRevisionMap(leaderKey, request.revision)
469-
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
470-
471-
// in case of follower, do nothing
472-
case Left(EtcdFollower(_, _)) =>
473-
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
542+
leaderKey,
543+
(electResult, isShuttingDown) => {
544+
electResult match {
545+
case Right(EtcdLeader(_, _, _)) =>
546+
val queue = createAndStartQueue(
547+
request.invocationNamespace,
548+
request.fqn,
549+
request.revision,
550+
request.whiskActionMetaData)
551+
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
552+
if (isShuttingDown) {
553+
queue ! GracefulShutdown
554+
}
555+
556+
// in case of follower, do nothing
557+
case Left(EtcdFollower(_, _)) =>
558+
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
559+
}
474560
})
475561

476562
// there is already a leader election for leaderKey, so skip it
@@ -490,6 +576,20 @@ class QueueManager(
490576
}
491577
}
492578

579+
private def createAndStartQueue(invocationNamespace: String,
580+
action: FullyQualifiedEntityName,
581+
revision: DocRevision,
582+
actionMetaData: WhiskActionMetaData): ActorRef = {
583+
val queue =
584+
childFactory(context, invocationNamespace, action, revision, actionMetaData)
585+
queue ! Start
586+
QueuePool.put(
587+
MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)),
588+
MemoryQueueValue(queue, true))
589+
updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
590+
queue
591+
}
592+
493593
private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
494594
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
495595
})

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,65 @@ class QueueManagerTests
549549
probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
550550
}
551551

552+
it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
553+
val mockEtcdClient = mock[EtcdClient]
554+
(mockEtcdClient
555+
.get(_: String))
556+
.expects(*)
557+
.returning(Future.successful {
558+
RangeResponse
559+
.newBuilder()
560+
.addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
561+
.build()
562+
})
563+
.anyNumberOfTimes()
564+
val dataManagementService = getTestDataManagementService()
565+
val watcher = TestProbe()
566+
567+
val probe = TestProbe()
568+
569+
val childFactory =
570+
(_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
571+
572+
val queueManager =
573+
TestActorRef(
574+
QueueManager
575+
.props(
576+
entityStore,
577+
get,
578+
mockEtcdClient,
579+
schedulerEndpoint,
580+
schedulerId,
581+
dataManagementService.ref,
582+
watcher.ref,
583+
ack,
584+
store,
585+
childFactory,
586+
mockConsumer))
587+
588+
watcher.expectMsg(watchEndpoint)
589+
//current queue's revision is `1-test-revision`
590+
(queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
591+
testInvocationNamespace,
592+
testFQN,
593+
true)
594+
595+
probe.expectMsg(Start)
596+
597+
// simulate queue superseded, the queue will be removed but leader key won't be deleted
598+
queueManager ! QueueRemoved(
599+
testInvocationNamespace,
600+
testFQN.toDocId.asDocInfo(testDocRevision),
601+
Some(testLeaderKey))
602+
603+
queueManager.!(activationMessage)(queueManager)
604+
val msg2 = activationMessage.copy(activationId = ActivationId.generate())
605+
queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue
606+
probe.expectMsg(Start)
607+
probe.expectMsg(activationMessage)
608+
probe.expectMsg(msg2)
609+
}
610+
552611
it should "not skip outdated activation when the revision is older than the one in a datastore" in {
553612
stream.reset()
554613
val mockEtcdClient = mock[EtcdClient]
@@ -1082,6 +1141,9 @@ class QueueManagerTests
10821141
val probe = TestProbe()
10831142
val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
10841143
val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
1144+
val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3"))
1145+
val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4"))
1146+
val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5"))
10851147

10861148
// probe will watch all actors which are created by these factories
10871149
val childFactory =
@@ -1129,5 +1191,15 @@ class QueueManagerTests
11291191
queueManager ! GracefulShutdown
11301192

11311193
probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
1194+
1195+
// after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
1196+
(queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
1197+
.mapTo[CreateQueueResponse]
1198+
.futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true)
1199+
queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData)
1200+
queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData)
1201+
1202+
probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
1203+
11321204
}
11331205
}

0 commit comments

Comments
 (0)