@@ -55,6 +55,10 @@ case class CreateNewQueue(activationMessage: ActivationMessage,
55
55
action : FullyQualifiedEntityName ,
56
56
actionMetadata : WhiskActionMetaData )
57
57
58
+ case class RecoverQueue (activationMessage : ActivationMessage ,
59
+ action : FullyQualifiedEntityName ,
60
+ actionMetadata : WhiskActionMetaData )
61
+
58
62
case class QueueManagerConfig (maxRetriesToGetQueue : Int , maxSchedulingTime : FiniteDuration )
59
63
60
64
class QueueManager (
@@ -80,7 +84,7 @@ class QueueManager(
80
84
81
85
private val actorSelectionMap = TrieMap [String , ActorSelection ]()
82
86
83
- private val leaderElectionCallbacks = TrieMap [String , Either [EtcdFollower , EtcdLeader ] => Unit ]()
87
+ private val leaderElectionCallbacks = TrieMap [String , ( Either [EtcdFollower , EtcdLeader ], Boolean ) => Unit ]()
84
88
85
89
private implicit val askTimeout = Timeout (5 .seconds)
86
90
private implicit val ec = context.dispatcher
@@ -90,6 +94,8 @@ class QueueManager(
90
94
// watch leaders and register them into actorSelectionMap
91
95
watcherService ! WatchEndpoint (QueueKeys .queuePrefix, " " , isPrefix = true , watcherName, Set (PutEvent , DeleteEvent ))
92
96
97
+ private var isShuttingDown = false
98
+
93
99
override def receive : Receive = {
94
100
case request : CreateQueue if isWarmUpAction(request.fqn) =>
95
101
logging.info(
@@ -114,12 +120,12 @@ class QueueManager(
114
120
msg.leadership match {
115
121
case Right (EtcdLeader (key, value, lease)) =>
116
122
leaderElectionCallbacks.remove(key).foreach { callback =>
117
- callback(Right (EtcdLeader (key, value, lease)))
123
+ callback(Right (EtcdLeader (key, value, lease)), isShuttingDown )
118
124
}
119
125
120
126
case Left (EtcdFollower (key, value)) =>
121
127
leaderElectionCallbacks.remove(key).foreach { callback =>
122
- callback(Left (EtcdFollower (key, value)))
128
+ callback(Left (EtcdFollower (key, value)), isShuttingDown )
123
129
}
124
130
}
125
131
@@ -129,7 +135,11 @@ class QueueManager(
129
135
s " Got activation message ${msg.activationId} for ${msg.user.namespace}/ ${msg.action} from remote queue manager. " )(
130
136
msg.transid)
131
137
132
- handleActivationMessage(msg)
138
+ if (sender() == self) {
139
+ handleCycle(msg)(msg.transid)
140
+ } else {
141
+ handleActivationMessage(msg)
142
+ }
133
143
134
144
case UpdateMemoryQueue (oldAction, newAction, msg) =>
135
145
logging.info(
@@ -164,6 +174,25 @@ class QueueManager(
164
174
updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
165
175
queue ! msg
166
176
msg.transid.mark(this , LoggingMarkers .SCHEDULER_QUEUE_CREATE )
177
+ if (isShuttingDown) {
178
+ queue ! GracefulShutdown
179
+ }
180
+ }
181
+
182
+ case RecoverQueue (msg, action, actionMetaData) =>
183
+ QueuePool .keys.find(_.docInfo.id == action.toDocId) match {
184
+ // a newer queue is created, send msg to new queue
185
+ case Some (key) if key.docInfo.rev >= msg.revision =>
186
+ QueuePool .get(key) match {
187
+ case Some (queue) if queue.isLeader =>
188
+ queue.queue ! msg.copy(revision = key.docInfo.rev)
189
+ logging.info(this , s " Queue for action $action is already recovered, skip " )(msg.transid)
190
+ case _ =>
191
+ recreateQueue(action, msg, actionMetaData)
192
+ }
193
+ case _ =>
194
+ recreateQueue(action, msg, actionMetaData)
195
+
167
196
}
168
197
169
198
// leaderKey is now optional, it becomes None when the stale queue is removed
@@ -208,6 +237,7 @@ class QueueManager(
208
237
}
209
238
210
239
case GracefulShutdown =>
240
+ isShuttingDown = true
211
241
logging.info(this , s " Gracefully shutdown the queue manager " )
212
242
213
243
watcherService ! UnwatchEndpoint (QueueKeys .queuePrefix, isPrefix = true , watcherName)
@@ -278,6 +308,62 @@ class QueueManager(
278
308
initRevisionMap.update(key, revision)
279
309
}
280
310
311
+ private def recreateQueue (action : FullyQualifiedEntityName ,
312
+ msg : ActivationMessage ,
313
+ actionMetaData : WhiskActionMetaData ): Unit = {
314
+ logging.warn(this , s " recreate queue for ${msg.action}" )(msg.transid)
315
+ val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
316
+ queue ! msg
317
+ msg.transid.mark(this , LoggingMarkers .SCHEDULER_QUEUE_RECOVER )
318
+ if (isShuttingDown) {
319
+ queue ! GracefulShutdown
320
+ }
321
+ }
322
+
323
+ private def handleCycle (msg : ActivationMessage )(implicit transid : TransactionId ): Unit = {
324
+ val action = msg.action
325
+ QueuePool .keys.find(_.docInfo.id == action.toDocId) match {
326
+ // a newer queue is created, send msg to new queue
327
+ case Some (key) if key.docInfo.rev >= msg.revision =>
328
+ QueuePool .get(key) match {
329
+ case Some (queue) if queue.isLeader =>
330
+ queue.queue ! msg.copy(revision = key.docInfo.rev)
331
+ logging.info(this , s " Queue for action $action is already recovered, skip " )(msg.transid)
332
+ case _ =>
333
+ recoverQueue(msg)
334
+ }
335
+ case _ =>
336
+ recoverQueue(msg)
337
+ }
338
+ }
339
+
340
+ private def recoverQueue (msg : ActivationMessage )(implicit transid : TransactionId ): Unit = {
341
+ val start = transid.started(this , LoggingMarkers .SCHEDULER_QUEUE_RECOVER )
342
+ logging.info(this , s " Recover a queue for ${msg.action}, " )
343
+ getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false )
344
+ .map { actionMetaData : WhiskActionMetaData =>
345
+ actionMetaData.toExecutableWhiskAction match {
346
+ case Some (_) =>
347
+ self ! RecoverQueue (msg, msg.action.copy(version = Some (actionMetaData.version)), actionMetaData)
348
+ transid.finished(this , start, s " recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}" )
349
+
350
+ case None =>
351
+ val message =
352
+ s " non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager "
353
+ completeErrorActivation(msg, message)
354
+ transid.failed(this , start, message)
355
+ }
356
+ }
357
+ .recover {
358
+ case t =>
359
+ transid.failed(
360
+ this ,
361
+ start,
362
+ s " failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}" )
363
+ completeErrorActivation(msg, t.getMessage)
364
+ }
365
+ }
366
+
281
367
private def createNewQueue (newAction : FullyQualifiedEntityName , msg : ActivationMessage )(
282
368
implicit transid : TransactionId ): Future [Any ] = {
283
369
val start = transid.started(this , LoggingMarkers .SCHEDULER_QUEUE_UPDATE (" version-mismatch" ))
@@ -453,24 +539,24 @@ class QueueManager(
453
539
case None =>
454
540
dataManagementService ! ElectLeader (leaderKey, schedulerEndpoints.serialize, self)
455
541
leaderElectionCallbacks.put(
456
- leaderKey, {
457
- case Right ( EtcdLeader (_, _, _)) =>
458
- val queue = childFactory(
459
- context,
460
- request.invocationNamespace,
461
- request.fqn ,
462
- request.revision ,
463
- request.whiskActionMetaData)
464
- queue ! Start
465
- QueuePool .put(
466
- MemoryQueueKey (request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
467
- MemoryQueueValue ( queue, true ))
468
- updateInitRevisionMap(leaderKey, request.revision)
469
- receiver.foreach(_ ! CreateQueueResponse (request.invocationNamespace, request.fqn, success = true ))
470
-
471
- // in case of follower, do nothing
472
- case Left ( EtcdFollower (_, _)) =>
473
- receiver.foreach(_ ! CreateQueueResponse (request.invocationNamespace, request.fqn, success = true ))
542
+ leaderKey,
543
+ (electResult, isShuttingDown) => {
544
+ electResult match {
545
+ case Right ( EtcdLeader (_, _, _)) =>
546
+ val queue = createAndStartQueue(
547
+ request.invocationNamespace ,
548
+ request.fqn ,
549
+ request.revision,
550
+ request.whiskActionMetaData)
551
+ receiver.foreach(_ ! CreateQueueResponse (request.invocationNamespace, request.fqn, success = true ))
552
+ if (isShuttingDown) {
553
+ queue ! GracefulShutdown
554
+ }
555
+
556
+ // in case of follower, do nothing
557
+ case Left ( EtcdFollower (_, _)) =>
558
+ receiver.foreach(_ ! CreateQueueResponse (request.invocationNamespace, request.fqn, success = true ))
559
+ }
474
560
})
475
561
476
562
// there is already a leader election for leaderKey, so skip it
@@ -490,6 +576,20 @@ class QueueManager(
490
576
}
491
577
}
492
578
579
+ private def createAndStartQueue (invocationNamespace : String ,
580
+ action : FullyQualifiedEntityName ,
581
+ revision : DocRevision ,
582
+ actionMetaData : WhiskActionMetaData ): ActorRef = {
583
+ val queue =
584
+ childFactory(context, invocationNamespace, action, revision, actionMetaData)
585
+ queue ! Start
586
+ QueuePool .put(
587
+ MemoryQueueKey (invocationNamespace, action.toDocId.asDocInfo(revision)),
588
+ MemoryQueueValue (queue, true ))
589
+ updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
590
+ queue
591
+ }
592
+
493
593
private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0 .seconds, 1 .seconds)(() => {
494
594
MetricEmitter .emitHistogramMetric(LoggingMarkers .SCHEDULER_QUEUE , QueuePool .countLeader())
495
595
})
0 commit comments