Skip to content

Commit 0a397e1

Browse files
style95michele-sciabarra
authored andcommitted
Revert cycle handling. (apache#5300)
* Revert cycle handling. * Remove the RecoverQueue reference.
1 parent c66b9d4 commit 0a397e1

File tree

3 files changed

+22
-180
lines changed

3 files changed

+22
-180
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,6 @@ object LoggingMarkers {
601601
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
602602
def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
603603
def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
604-
def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds)
605604
def SCHEDULER_QUEUE_UPDATE(reason: String) =
606605
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
607606
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala

Lines changed: 22 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ case class UpdateMemoryQueue(oldAction: DocInfo,
5454
case class CreateNewQueue(activationMessage: ActivationMessage,
5555
action: FullyQualifiedEntityName,
5656
actionMetadata: WhiskActionMetaData)
57-
case class RecoverQueue(activationMessage: ActivationMessage,
58-
action: FullyQualifiedEntityName,
59-
actionMetadata: WhiskActionMetaData)
6057

6158
case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
6259

@@ -83,7 +80,7 @@ class QueueManager(
8380

8481
private val actorSelectionMap = TrieMap[String, ActorSelection]()
8582

86-
private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()
83+
private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
8784

8885
private implicit val askTimeout = Timeout(5.seconds)
8986
private implicit val ec = context.dispatcher
@@ -93,8 +90,6 @@ class QueueManager(
9390
// watch leaders and register them into actorSelectionMap
9491
watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))
9592

96-
private var isShuttingDown = false
97-
9893
override def receive: Receive = {
9994
case request: CreateQueue if isWarmUpAction(request.fqn) =>
10095
logging.info(
@@ -119,12 +114,12 @@ class QueueManager(
119114
msg.leadership match {
120115
case Right(EtcdLeader(key, value, lease)) =>
121116
leaderElectionCallbacks.remove(key).foreach { callback =>
122-
callback(Right(EtcdLeader(key, value, lease)), isShuttingDown)
117+
callback(Right(EtcdLeader(key, value, lease)))
123118
}
124119

125120
case Left(EtcdFollower(key, value)) =>
126121
leaderElectionCallbacks.remove(key).foreach { callback =>
127-
callback(Left(EtcdFollower(key, value)), isShuttingDown)
122+
callback(Left(EtcdFollower(key, value)))
128123
}
129124
}
130125

@@ -134,11 +129,7 @@ class QueueManager(
134129
s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
135130
msg.transid)
136131

137-
if (sender() == self) {
138-
handleCycle(msg)(msg.transid)
139-
} else {
140-
handleActivationMessage(msg)
141-
}
132+
handleActivationMessage(msg)
142133

143134
case UpdateMemoryQueue(oldAction, newAction, msg) =>
144135
logging.info(
@@ -173,25 +164,6 @@ class QueueManager(
173164
updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
174165
queue ! msg
175166
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
176-
if (isShuttingDown) {
177-
queue ! GracefulShutdown
178-
}
179-
}
180-
181-
case RecoverQueue(msg, action, actionMetaData) =>
182-
QueuePool.keys.find(k =>
183-
k.invocationNamespace == msg.user.namespace.name.asString && k.docInfo.id == action.toDocId) match {
184-
// queue is already recovered or a newer queue is created, send msg to new queue
185-
case Some(key) if key.docInfo.rev >= msg.revision =>
186-
QueuePool.get(key) match {
187-
case Some(queue) if queue.isLeader =>
188-
queue.queue ! msg.copy(revision = key.docInfo.rev)
189-
logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
190-
case _ =>
191-
recreateQueue(action, msg, actionMetaData)
192-
}
193-
case _ =>
194-
recreateQueue(action, msg, actionMetaData)
195167
}
196168

197169
// leaderKey is now optional, it becomes None when the stale queue is removed
@@ -236,7 +208,6 @@ class QueueManager(
236208
}
237209

238210
case GracefulShutdown =>
239-
isShuttingDown = true
240211
logging.info(this, s"Gracefully shutdown the queue manager")
241212

242213
watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
@@ -346,49 +317,6 @@ class QueueManager(
346317
}
347318
}
348319

349-
private def recreateQueue(action: FullyQualifiedEntityName,
350-
msg: ActivationMessage,
351-
actionMetaData: WhiskActionMetaData): Unit = {
352-
logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid)
353-
val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
354-
queue ! msg
355-
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
356-
if (isShuttingDown) {
357-
queue ! GracefulShutdown
358-
}
359-
}
360-
361-
private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
362-
logging.warn(
363-
this,
364-
s"queue for ${msg.user.namespace.name.asString}/${msg.action} doesn't exist in memory but exist in etcd, recovering...")
365-
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
366-
367-
logging.info(this, s"Recover a queue for ${msg.user.namespace.name.asString}/${msg.action},")
368-
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
369-
.map { actionMetaData: WhiskActionMetaData =>
370-
actionMetaData.toExecutableWhiskAction match {
371-
case Some(_) =>
372-
self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData)
373-
transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}")
374-
375-
case None =>
376-
val message =
377-
s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager"
378-
completeErrorActivation(msg, message)
379-
transid.failed(this, start, message)
380-
}
381-
}
382-
.recover {
383-
case t =>
384-
transid.failed(
385-
this,
386-
start,
387-
s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}")
388-
completeErrorActivation(msg, t.getMessage)
389-
}
390-
}
391-
392320
private def handleActivationMessage(msg: ActivationMessage): Any = {
393321
implicit val transid = msg.transid
394322

@@ -525,24 +453,24 @@ class QueueManager(
525453
case None =>
526454
dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
527455
leaderElectionCallbacks.put(
528-
leaderKey,
529-
(electResult, isShuttingDown) => {
530-
electResult match {
531-
case Right(EtcdLeader(_, _, _)) =>
532-
val queue = createAndStartQueue(
533-
request.invocationNamespace,
534-
request.fqn,
535-
request.revision,
536-
request.whiskActionMetaData)
537-
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
538-
if (isShuttingDown) {
539-
queue ! GracefulShutdown
540-
}
541-
542-
// in case of follower, do nothing
543-
case Left(EtcdFollower(_, _)) =>
544-
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
545-
}
456+
leaderKey, {
457+
case Right(EtcdLeader(_, _, _)) =>
458+
val queue = childFactory(
459+
context,
460+
request.invocationNamespace,
461+
request.fqn,
462+
request.revision,
463+
request.whiskActionMetaData)
464+
queue ! Start
465+
QueuePool.put(
466+
MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
467+
MemoryQueueValue(queue, true))
468+
updateInitRevisionMap(leaderKey, request.revision)
469+
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
470+
471+
// in case of follower, do nothing
472+
case Left(EtcdFollower(_, _)) =>
473+
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
546474
})
547475

548476
// there is already a leader election for leaderKey, so skip it
@@ -562,20 +490,6 @@ class QueueManager(
562490
}
563491
}
564492

565-
private def createAndStartQueue(invocationNamespace: String,
566-
action: FullyQualifiedEntityName,
567-
revision: DocRevision,
568-
actionMetaData: WhiskActionMetaData): ActorRef = {
569-
val queue =
570-
childFactory(context, invocationNamespace, action, revision, actionMetaData)
571-
queue ! Start
572-
QueuePool.put(
573-
MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)),
574-
MemoryQueueValue(queue, true))
575-
updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
576-
queue
577-
}
578-
579493
private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
580494
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
581495
})

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -549,65 +549,6 @@ class QueueManagerTests
549549
probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
550550
}
551551

552-
it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
553-
val mockEtcdClient = mock[EtcdClient]
554-
(mockEtcdClient
555-
.get(_: String))
556-
.expects(*)
557-
.returning(Future.successful {
558-
RangeResponse
559-
.newBuilder()
560-
.addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
561-
.build()
562-
})
563-
.anyNumberOfTimes()
564-
val dataManagementService = getTestDataManagementService()
565-
val watcher = TestProbe()
566-
567-
val probe = TestProbe()
568-
569-
val childFactory =
570-
(_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
571-
572-
val queueManager =
573-
TestActorRef(
574-
QueueManager
575-
.props(
576-
entityStore,
577-
get,
578-
mockEtcdClient,
579-
schedulerEndpoint,
580-
schedulerId,
581-
dataManagementService.ref,
582-
watcher.ref,
583-
ack,
584-
store,
585-
childFactory,
586-
mockConsumer))
587-
588-
watcher.expectMsg(watchEndpoint)
589-
//current queue's revision is `1-test-revision`
590-
(queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
591-
testInvocationNamespace,
592-
testFQN,
593-
true)
594-
595-
probe.expectMsg(Start)
596-
597-
// simulate queue superseded, the queue will be removed but leader key won't be deleted
598-
queueManager ! QueueRemoved(
599-
testInvocationNamespace,
600-
testFQN.toDocId.asDocInfo(testDocRevision),
601-
Some(testLeaderKey))
602-
603-
queueManager.!(activationMessage)(queueManager)
604-
val msg2 = activationMessage.copy(activationId = ActivationId.generate())
605-
queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue
606-
probe.expectMsg(Start)
607-
probe.expectMsg(activationMessage)
608-
probe.expectMsg(msg2)
609-
}
610-
611552
it should "not skip outdated activation when the revision is older than the one in a datastore" in {
612553
stream.reset()
613554
val mockEtcdClient = mock[EtcdClient]
@@ -1141,9 +1082,6 @@ class QueueManagerTests
11411082
val probe = TestProbe()
11421083
val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
11431084
val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
1144-
val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3"))
1145-
val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4"))
1146-
val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5"))
11471085

11481086
// probe will watch all actors which are created by these factories
11491087
val childFactory =
@@ -1191,14 +1129,5 @@ class QueueManagerTests
11911129
queueManager ! GracefulShutdown
11921130

11931131
probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
1194-
1195-
// after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
1196-
(queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
1197-
.mapTo[CreateQueueResponse]
1198-
.futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true)
1199-
queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData)
1200-
queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData)
1201-
1202-
probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
12031132
}
12041133
}

0 commit comments

Comments
 (0)