@@ -54,6 +54,9 @@ case class UpdateMemoryQueue(oldAction: DocInfo,
54
54
case class CreateNewQueue (activationMessage : ActivationMessage ,
55
55
action : FullyQualifiedEntityName ,
56
56
actionMetadata : WhiskActionMetaData )
57
+ case class RecoverQueue (activationMessage : ActivationMessage ,
58
+ action : FullyQualifiedEntityName ,
59
+ actionMetadata : WhiskActionMetaData )
57
60
58
61
case class QueueManagerConfig (maxRetriesToGetQueue : Int , maxSchedulingTime : FiniteDuration )
59
62
@@ -80,7 +83,7 @@ class QueueManager(
80
83
81
84
private val actorSelectionMap = TrieMap [String , ActorSelection ]()
82
85
83
- private val leaderElectionCallbacks = TrieMap [String , Either [EtcdFollower , EtcdLeader ] => Unit ]()
86
+ private val leaderElectionCallbacks = TrieMap [String , ( Either [EtcdFollower , EtcdLeader ], Boolean ) => Unit ]()
84
87
85
88
private implicit val askTimeout = Timeout (5 .seconds)
86
89
private implicit val ec = context.dispatcher
@@ -90,6 +93,8 @@ class QueueManager(
90
93
// watch leaders and register them into actorSelectionMap
91
94
watcherService ! WatchEndpoint (QueueKeys .queuePrefix, " " , isPrefix = true , watcherName, Set (PutEvent , DeleteEvent ))
92
95
96
+ private var isShuttingDown = false
97
+
93
98
override def receive : Receive = {
94
99
case request : CreateQueue if isWarmUpAction(request.fqn) =>
95
100
logging.info(
@@ -114,12 +119,12 @@ class QueueManager(
114
119
msg.leadership match {
115
120
case Right (EtcdLeader (key, value, lease)) =>
116
121
leaderElectionCallbacks.remove(key).foreach { callback =>
117
- callback(Right (EtcdLeader (key, value, lease)))
122
+ callback(Right (EtcdLeader (key, value, lease)), isShuttingDown )
118
123
}
119
124
120
125
case Left (EtcdFollower (key, value)) =>
121
126
leaderElectionCallbacks.remove(key).foreach { callback =>
122
- callback(Left (EtcdFollower (key, value)))
127
+ callback(Left (EtcdFollower (key, value)), isShuttingDown )
123
128
}
124
129
}
125
130
@@ -129,7 +134,11 @@ class QueueManager(
129
134
s " Got activation message ${msg.activationId} for ${msg.user.namespace}/ ${msg.action} from remote queue manager. " )(
130
135
msg.transid)
131
136
132
- handleActivationMessage(msg)
137
+ if (sender() == self) {
138
+ handleCycle(msg)(msg.transid)
139
+ } else {
140
+ handleActivationMessage(msg)
141
+ }
133
142
134
143
case UpdateMemoryQueue (oldAction, newAction, msg) =>
135
144
logging.info(
@@ -164,6 +173,24 @@ class QueueManager(
164
173
updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
165
174
queue ! msg
166
175
msg.transid.mark(this , LoggingMarkers .SCHEDULER_QUEUE_CREATE )
176
+ if (isShuttingDown) {
177
+ queue ! GracefulShutdown
178
+ }
179
+ }
180
+
181
+ case RecoverQueue (msg, action, actionMetaData) =>
182
+ QueuePool .keys.find(_.docInfo.id == action.toDocId) match {
183
+ // queue is already recovered or a newer queue is created, send msg to new queue
184
+ case Some (key) if key.docInfo.rev >= msg.revision =>
185
+ QueuePool .get(key) match {
186
+ case Some (queue) if queue.isLeader =>
187
+ queue.queue ! msg.copy(revision = key.docInfo.rev)
188
+ logging.info(this , s " Queue for action $action is already recovered, skip " )(msg.transid)
189
+ case _ =>
190
+ recreateQueue(action, msg, actionMetaData)
191
+ }
192
+ case _ =>
193
+ recreateQueue(action, msg, actionMetaData)
167
194
}
168
195
169
196
// leaderKey is now optional, it becomes None when the stale queue is removed
@@ -208,6 +235,7 @@ class QueueManager(
208
235
}
209
236
210
237
case GracefulShutdown =>
238
+ isShuttingDown = true
211
239
logging.info(this , s " Gracefully shutdown the queue manager " )
212
240
213
241
watcherService ! UnwatchEndpoint (QueueKeys .queuePrefix, isPrefix = true , watcherName)
@@ -317,6 +345,47 @@ class QueueManager(
317
345
}
318
346
}
319
347
348
+ private def recreateQueue (action : FullyQualifiedEntityName ,
349
+ msg : ActivationMessage ,
350
+ actionMetaData : WhiskActionMetaData ): Unit = {
351
+ logging.warn(this , s " recreate queue for ${msg.action}" )(msg.transid)
352
+ val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
353
+ queue ! msg
354
+ msg.transid.mark(this , LoggingMarkers .SCHEDULER_QUEUE_RECOVER )
355
+ if (isShuttingDown) {
356
+ queue ! GracefulShutdown
357
+ }
358
+ }
359
+
360
+ private def handleCycle (msg : ActivationMessage )(implicit transid : TransactionId ): Future [Any ] = {
361
+ logging.warn(this , s " queue for ${msg.action} doesn't exist in memory but exist in etcd, recovering... " )
362
+ val start = transid.started(this , LoggingMarkers .SCHEDULER_QUEUE_RECOVER )
363
+
364
+ logging.info(this , s " Recover a queue for ${msg.action}, " )
365
+ getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false )
366
+ .map { actionMetaData : WhiskActionMetaData =>
367
+ actionMetaData.toExecutableWhiskAction match {
368
+ case Some (_) =>
369
+ self ! RecoverQueue (msg, msg.action.copy(version = Some (actionMetaData.version)), actionMetaData)
370
+ transid.finished(this , start, s " recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}" )
371
+
372
+ case None =>
373
+ val message =
374
+ s " non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager "
375
+ completeErrorActivation(msg, message)
376
+ transid.failed(this , start, message)
377
+ }
378
+ }
379
+ .recover {
380
+ case t =>
381
+ transid.failed(
382
+ this ,
383
+ start,
384
+ s " failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}" )
385
+ completeErrorActivation(msg, t.getMessage)
386
+ }
387
+ }
388
+
320
389
private def handleActivationMessage (msg : ActivationMessage ): Any = {
321
390
implicit val transid = msg.transid
322
391
@@ -451,24 +520,24 @@ class QueueManager(
451
520
case None =>
452
521
dataManagementService ! ElectLeader (leaderKey, schedulerEndpoints.serialize, self)
453
522
leaderElectionCallbacks.put(
454
- leaderKey, {
455
- case Right ( EtcdLeader (_, _, _)) =>
456
- val queue = childFactory(
457
- context,
458
- request.invocationNamespace,
459
- request.fqn ,
460
- request.revision ,
461
- request.whiskActionMetaData)
462
- queue ! Start
463
- QueuePool .put(
464
- MemoryQueueKey (request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
465
- MemoryQueueValue ( queue, true ))
466
- updateInitRevisionMap(leaderKey, request.revision)
467
- receiver.foreach(_ ! CreateQueueResponse (request.invocationNamespace, request.fqn, success = true ))
468
-
469
- // in case of follower, do nothing
470
- case Left ( EtcdFollower (_, _)) =>
471
- receiver.foreach(_ ! CreateQueueResponse (request.invocationNamespace, request.fqn, success = true ))
523
+ leaderKey,
524
+ (electResult, isShuttingDown) => {
525
+ electResult match {
526
+ case Right ( EtcdLeader (_, _, _)) =>
527
+ val queue = createAndStartQueue(
528
+ request.invocationNamespace ,
529
+ request.fqn ,
530
+ request.revision,
531
+ request.whiskActionMetaData)
532
+ receiver.foreach(_ ! CreateQueueResponse (request.invocationNamespace, request.fqn, success = true ))
533
+ if (isShuttingDown) {
534
+ queue ! GracefulShutdown
535
+ }
536
+
537
+ // in case of follower, do nothing
538
+ case Left ( EtcdFollower (_, _)) =>
539
+ receiver.foreach(_ ! CreateQueueResponse (request.invocationNamespace, request.fqn, success = true ))
540
+ }
472
541
})
473
542
474
543
// there is already a leader election for leaderKey, so skip it
@@ -488,6 +557,20 @@ class QueueManager(
488
557
}
489
558
}
490
559
560
+ private def createAndStartQueue (invocationNamespace : String ,
561
+ action : FullyQualifiedEntityName ,
562
+ revision : DocRevision ,
563
+ actionMetaData : WhiskActionMetaData ): ActorRef = {
564
+ val queue =
565
+ childFactory(context, invocationNamespace, action, revision, actionMetaData)
566
+ queue ! Start
567
+ QueuePool .put(
568
+ MemoryQueueKey (invocationNamespace, action.toDocId.asDocInfo(revision)),
569
+ MemoryQueueValue (queue, true ))
570
+ updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
571
+ queue
572
+ }
573
+
491
574
private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0 .seconds, 1 .seconds)(() => {
492
575
MetricEmitter .emitHistogramMetric(LoggingMarkers .SCHEDULER_QUEUE , QueuePool .countLeader())
493
576
})
0 commit comments