Skip to content

Commit e84fafd

Browse files
jiangpengchengJesseStutler
authored andcommitted
Prevent cycle sending (apache#5251)
1 parent 32ceac6 commit e84fafd

File tree

3 files changed

+177
-22
lines changed

3 files changed

+177
-22
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,7 @@ object LoggingMarkers {
598598
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
599599
def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
600600
def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
601+
def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds)
601602
def SCHEDULER_QUEUE_UPDATE(reason: String) =
602603
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
603604
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala

Lines changed: 105 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ case class UpdateMemoryQueue(oldAction: DocInfo,
5454
case class CreateNewQueue(activationMessage: ActivationMessage,
5555
action: FullyQualifiedEntityName,
5656
actionMetadata: WhiskActionMetaData)
57+
case class RecoverQueue(activationMessage: ActivationMessage,
58+
action: FullyQualifiedEntityName,
59+
actionMetadata: WhiskActionMetaData)
5760

5861
case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
5962

@@ -80,7 +83,7 @@ class QueueManager(
8083

8184
private val actorSelectionMap = TrieMap[String, ActorSelection]()
8285

83-
private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
86+
private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()
8487

8588
private implicit val askTimeout = Timeout(5.seconds)
8689
private implicit val ec = context.dispatcher
@@ -90,6 +93,8 @@ class QueueManager(
9093
// watch leaders and register them into actorSelectionMap
9194
watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
9295

96+
private var isShuttingDown = false
97+
9398
override def receive: Receive = {
9499
case request: CreateQueue if isWarmUpAction(request.fqn) =>
95100
logging.info(
@@ -114,12 +119,12 @@ class QueueManager(
114119
msg.leadership match {
115120
case Right(EtcdLeader(key, value, lease)) =>
116121
leaderElectionCallbacks.remove(key).foreach { callback =>
117-
callback(Right(EtcdLeader(key, value, lease)))
122+
callback(Right(EtcdLeader(key, value, lease)), isShuttingDown)
118123
}
119124

120125
case Left(EtcdFollower(key, value)) =>
121126
leaderElectionCallbacks.remove(key).foreach { callback =>
122-
callback(Left(EtcdFollower(key, value)))
127+
callback(Left(EtcdFollower(key, value)), isShuttingDown)
123128
}
124129
}
125130

@@ -129,7 +134,11 @@ class QueueManager(
129134
s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
130135
msg.transid)
131136

132-
handleActivationMessage(msg)
137+
if (sender() == self) {
138+
handleCycle(msg)(msg.transid)
139+
} else {
140+
handleActivationMessage(msg)
141+
}
133142

134143
case UpdateMemoryQueue(oldAction, newAction, msg) =>
135144
logging.info(
@@ -164,6 +173,24 @@ class QueueManager(
164173
updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
165174
queue ! msg
166175
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
176+
if (isShuttingDown) {
177+
queue ! GracefulShutdown
178+
}
179+
}
180+
181+
case RecoverQueue(msg, action, actionMetaData) =>
182+
QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
183+
// queue is already recovered or a newer queue is created, send msg to new queue
184+
case Some(key) if key.docInfo.rev >= msg.revision =>
185+
QueuePool.get(key) match {
186+
case Some(queue) if queue.isLeader =>
187+
queue.queue ! msg.copy(revision = key.docInfo.rev)
188+
logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
189+
case _ =>
190+
recreateQueue(action, msg, actionMetaData)
191+
}
192+
case _ =>
193+
recreateQueue(action, msg, actionMetaData)
167194
}
168195

169196
// leaderKey is now optional, it becomes None when the stale queue is removed
@@ -208,6 +235,7 @@ class QueueManager(
208235
}
209236

210237
case GracefulShutdown =>
238+
isShuttingDown = true
211239
logging.info(this, s"Gracefully shutdown the queue manager")
212240

213241
watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
@@ -317,6 +345,47 @@ class QueueManager(
317345
}
318346
}
319347

348+
private def recreateQueue(action: FullyQualifiedEntityName,
349+
msg: ActivationMessage,
350+
actionMetaData: WhiskActionMetaData): Unit = {
351+
logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid)
352+
val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
353+
queue ! msg
354+
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
355+
if (isShuttingDown) {
356+
queue ! GracefulShutdown
357+
}
358+
}
359+
360+
private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
361+
logging.warn(this, s"queue for ${msg.action} doesn't exist in memory but exist in etcd, recovering...")
362+
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
363+
364+
logging.info(this, s"Recover a queue for ${msg.action},")
365+
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
366+
.map { actionMetaData: WhiskActionMetaData =>
367+
actionMetaData.toExecutableWhiskAction match {
368+
case Some(_) =>
369+
self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData)
370+
transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}")
371+
372+
case None =>
373+
val message =
374+
s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager"
375+
completeErrorActivation(msg, message)
376+
transid.failed(this, start, message)
377+
}
378+
}
379+
.recover {
380+
case t =>
381+
transid.failed(
382+
this,
383+
start,
384+
s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}")
385+
completeErrorActivation(msg, t.getMessage)
386+
}
387+
}
388+
320389
private def handleActivationMessage(msg: ActivationMessage): Any = {
321390
implicit val transid = msg.transid
322391

@@ -451,24 +520,24 @@ class QueueManager(
451520
case None =>
452521
dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
453522
leaderElectionCallbacks.put(
454-
leaderKey, {
455-
case Right(EtcdLeader(_, _, _)) =>
456-
val queue = childFactory(
457-
context,
458-
request.invocationNamespace,
459-
request.fqn,
460-
request.revision,
461-
request.whiskActionMetaData)
462-
queue ! Start
463-
QueuePool.put(
464-
MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
465-
MemoryQueueValue(queue, true))
466-
updateInitRevisionMap(leaderKey, request.revision)
467-
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
468-
469-
// in case of follower, do nothing
470-
case Left(EtcdFollower(_, _)) =>
471-
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
523+
leaderKey,
524+
(electResult, isShuttingDown) => {
525+
electResult match {
526+
case Right(EtcdLeader(_, _, _)) =>
527+
val queue = createAndStartQueue(
528+
request.invocationNamespace,
529+
request.fqn,
530+
request.revision,
531+
request.whiskActionMetaData)
532+
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
533+
if (isShuttingDown) {
534+
queue ! GracefulShutdown
535+
}
536+
537+
// in case of follower, do nothing
538+
case Left(EtcdFollower(_, _)) =>
539+
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
540+
}
472541
})
473542

474543
// there is already a leader election for leaderKey, so skip it
@@ -488,6 +557,20 @@ class QueueManager(
488557
}
489558
}
490559

560+
private def createAndStartQueue(invocationNamespace: String,
561+
action: FullyQualifiedEntityName,
562+
revision: DocRevision,
563+
actionMetaData: WhiskActionMetaData): ActorRef = {
564+
val queue =
565+
childFactory(context, invocationNamespace, action, revision, actionMetaData)
566+
queue ! Start
567+
QueuePool.put(
568+
MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)),
569+
MemoryQueueValue(queue, true))
570+
updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
571+
queue
572+
}
573+
491574
private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
492575
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
493576
})

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,65 @@ class QueueManagerTests
549549
probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
550550
}
551551

552+
it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
553+
val mockEtcdClient = mock[EtcdClient]
554+
(mockEtcdClient
555+
.get(_: String))
556+
.expects(*)
557+
.returning(Future.successful {
558+
RangeResponse
559+
.newBuilder()
560+
.addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
561+
.build()
562+
})
563+
.anyNumberOfTimes()
564+
val dataManagementService = getTestDataManagementService()
565+
val watcher = TestProbe()
566+
567+
val probe = TestProbe()
568+
569+
val childFactory =
570+
(_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
571+
572+
val queueManager =
573+
TestActorRef(
574+
QueueManager
575+
.props(
576+
entityStore,
577+
get,
578+
mockEtcdClient,
579+
schedulerEndpoint,
580+
schedulerId,
581+
dataManagementService.ref,
582+
watcher.ref,
583+
ack,
584+
store,
585+
childFactory,
586+
mockConsumer))
587+
588+
watcher.expectMsg(watchEndpoint)
589+
//current queue's revision is `1-test-revision`
590+
(queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
591+
testInvocationNamespace,
592+
testFQN,
593+
true)
594+
595+
probe.expectMsg(Start)
596+
597+
// simulate queue superseded, the queue will be removed but leader key won't be deleted
598+
queueManager ! QueueRemoved(
599+
testInvocationNamespace,
600+
testFQN.toDocId.asDocInfo(testDocRevision),
601+
Some(testLeaderKey))
602+
603+
queueManager.!(activationMessage)(queueManager)
604+
val msg2 = activationMessage.copy(activationId = ActivationId.generate())
605+
queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue
606+
probe.expectMsg(Start)
607+
probe.expectMsg(activationMessage)
608+
probe.expectMsg(msg2)
609+
}
610+
552611
it should "not skip outdated activation when the revision is older than the one in a datastore" in {
553612
stream.reset()
554613
val mockEtcdClient = mock[EtcdClient]
@@ -1082,6 +1141,9 @@ class QueueManagerTests
10821141
val probe = TestProbe()
10831142
val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
10841143
val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
1144+
val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3"))
1145+
val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4"))
1146+
val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5"))
10851147

10861148
// probe will watch all actors which are created by these factories
10871149
val childFactory =
@@ -1129,5 +1191,14 @@ class QueueManagerTests
11291191
queueManager ! GracefulShutdown
11301192

11311193
probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
1194+
1195+
// after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
1196+
(queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
1197+
.mapTo[CreateQueueResponse]
1198+
.futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true)
1199+
queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData)
1200+
queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData)
1201+
1202+
probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
11321203
}
11331204
}

0 commit comments

Comments
 (0)