@@ -85,7 +85,6 @@ case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey:
85
85
case class QueueReactivated (invocationNamespace : String , action : FullyQualifiedEntityName , docInfo : DocInfo )
86
86
case class CancelPoll (promise : Promise [Either [MemoryQueueError , ActivationMessage ]])
87
87
case object QueueRemovedCompleted
88
- case object FlushPulse
89
88
90
89
// Events received by the actor
91
90
case object Start
@@ -125,7 +124,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
125
124
checkToDropStaleActivation : (Queue [TimeSeriesActivationEntry ],
126
125
Long ,
127
126
String ,
128
- FullyQualifiedEntityName ,
127
+ WhiskActionMetaData ,
129
128
MemoryQueueState ,
130
129
ActorRef ) => Unit ,
131
130
queueConfig : QueueConfig )(implicit logging : Logging )
@@ -151,6 +150,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
151
150
private val memory = actionMetaData.limits.memory.megabytes.MB
152
151
private val queueRemovedMsg = QueueRemoved (invocationNamespace, action.toDocId.asDocInfo(revision), Some (leaderKey))
153
152
private val staleQueueRemovedMsg = QueueRemoved (invocationNamespace, action.toDocId.asDocInfo(revision), None )
153
+ private val actionRetentionTimeout = MemoryQueue .getRetentionTimeout(actionMetaData, queueConfig)
154
154
155
155
private [queue] var containers = Set .empty[String ]
156
156
private [queue] var creationIds = Set .empty[String ]
@@ -197,7 +197,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
197
197
198
198
when(Uninitialized ) {
199
199
case Event (Start , _) =>
200
- logging.info(this , s " [ $invocationNamespace: $action: $stateName] a new queue is created. " )
200
+ logging.info(
201
+ this ,
202
+ s " [ $invocationNamespace: $action: $stateName] a new queue is created, retentionTimeout: $actionRetentionTimeout, kind: ${actionMetaData.exec.kind}. " )
201
203
val (schedulerActor, droppingActor) = startMonitoring()
202
204
initializeThrottling()
203
205
@@ -256,7 +258,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
256
258
// when there is no container, it moves to the Flushing state as no activations can be invoked
257
259
if (containers.size <= 0 ) {
258
260
val isWhiskError = ContainerCreationError .whiskErrors.contains(error)
259
- completeAllActivations(message, isWhiskError)
261
+ if (! isWhiskError) {
262
+ completeAllActivations(message, isWhiskError)
263
+ }
260
264
logging.error(
261
265
this ,
262
266
s " [ $invocationNamespace: $action: $stateName] Failed to create an initial container due to ${if (isWhiskError) " whiskError"
@@ -271,7 +275,11 @@ class MemoryQueue(private val etcdClient: EtcdClient,
271
275
// there is no timeout for this state as when there is no further message, it would move to the Running state again.
272
276
when(NamespaceThrottled ) {
273
277
case Event (msg : ActivationMessage , _ : ThrottledData ) =>
274
- handleActivationMessage(msg)
278
+ if (containers.size + creationIds.size == 0 ) {
279
+ completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = false )
280
+ } else {
281
+ handleActivationMessage(msg)
282
+ }
275
283
stay
276
284
277
285
case Event (DisableNamespaceThrottling , data : ThrottledData ) =>
@@ -328,33 +336,51 @@ class MemoryQueue(private val etcdClient: EtcdClient,
328
336
goto(Running ) using RunningData (schedulerActor, droppingActor)
329
337
330
338
// log the failed information
331
- case Event (FailedCreationJob (creationId, _, _, _, _ , message), data : FlushingData ) =>
339
+ case Event (FailedCreationJob (creationId, _, _, _, error , message), data : FlushingData ) =>
332
340
creationIds -= creationId.asString
333
341
logging.info(
334
342
this ,
335
343
s " [ $invocationNamespace: $action: $stateName][ $creationId] Failed to create a container due to $message" )
336
344
337
345
// keep updating the reason
338
- stay using data.copy(reason = message)
346
+ stay using data.copy(error = error, reason = message)
339
347
340
348
// since there is no container, activations cannot be handled.
341
349
case Event (msg : ActivationMessage , data : FlushingData ) =>
342
- completeErrorActivation(msg, data.reason, ContainerCreationError .whiskErrors.contains(data.error))
350
+ logging.info(this , s " [ $invocationNamespace: $action: $stateName] got a new activation message ${msg.activationId}" )(
351
+ msg.transid)
352
+ val whiskError = isWhiskError(data.error)
353
+ if (whiskError)
354
+ queue = queue.enqueue(TimeSeriesActivationEntry (Instant .now, msg))
355
+ else
356
+ completeErrorActivation(msg, data.reason, whiskError)
343
357
stay() using data.copy(activeDuringFlush = true )
344
358
345
359
// Since SchedulingDecisionMaker keep sending a message to create a container, this state is not automatically timed out.
346
360
// Instead, StateTimeout message will be sent by a timer.
347
- case Event (StateTimeout , data : FlushingData ) =>
348
- completeAllActivations(data.reason, ContainerCreationError .whiskErrors.contains(data.error))
349
- if (data.activeDuringFlush)
361
+ case Event (StateTimeout | DropOld , data : FlushingData ) =>
362
+ logging.info(this , s " [ $invocationNamespace: $action: $stateName] Received StateTimeout, drop stale messages. " )
363
+ queue =
364
+ MemoryQueue .dropOld(queue, Duration .ofMillis(actionRetentionTimeout), data.reason, completeErrorActivation)
365
+ if (data.activeDuringFlush || queue.nonEmpty)
350
366
stay using data.copy(activeDuringFlush = false )
351
367
else
352
368
cleanUpActorsAndGotoRemoved(data)
353
369
354
370
case Event (GracefulShutdown , data : FlushingData ) =>
355
- completeAllActivations(data.reason, ContainerCreationError .whiskErrors.contains (data.error))
371
+ completeAllActivations(data.reason, isWhiskError (data.error))
356
372
logging.info(this , s " [ $invocationNamespace: $action: $stateName] Received GracefulShutdown, stop the queue. " )
357
373
cleanUpActorsAndGotoRemoved(data)
374
+
375
+ case Event (StopSchedulingAsOutdated , data : FlushingData ) =>
376
+ logging.info(this , s " [ $invocationNamespace: $action: $stateName] stop further scheduling. " )
377
+ completeAllActivations(data.reason, isWhiskError(data.error))
378
+ // let QueueManager know this queue is no longer in charge.
379
+ context.parent ! staleQueueRemovedMsg
380
+ cleanUpActors(data)
381
+ cleanUpData()
382
+
383
+ goto(Removed ) using NoData ()
358
384
}
359
385
360
386
// in case there is any activation in the queue, it waits until all of them are handled.
@@ -399,6 +425,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
399
425
400
426
// actors and data are already wiped
401
427
case Event (QueueRemovedCompleted , _ : NoData ) =>
428
+ logging.info(this , " stop fsm" )
402
429
stop()
403
430
404
431
// This is not supposed to happen. This will ensure the queue does not run forever.
@@ -523,15 +550,19 @@ class MemoryQueue(private val etcdClient: EtcdClient,
523
550
case Event (DropOld , _) =>
524
551
if (queue.nonEmpty && Duration
525
552
.between(queue.head.timestamp, Instant .now)
526
- .compareTo(Duration .ofMillis(queueConfig.maxRetentionMs )) < 0 ) {
553
+ .compareTo(Duration .ofMillis(actionRetentionTimeout )) < 0 ) {
527
554
logging.error(
528
555
this ,
529
556
s " [ $invocationNamespace: $action: $stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}. " )
530
557
logging.error(
531
558
this ,
532
559
s " [ $invocationNamespace: $action: $stateName] the head stale message: ${queue.head.msg.activationId}" )
533
560
}
534
- queue = MemoryQueue .dropOld(queue, Duration .ofMillis(queueConfig.maxRetentionMs), completeErrorActivation)
561
+ queue = MemoryQueue .dropOld(
562
+ queue,
563
+ Duration .ofMillis(actionRetentionTimeout),
564
+ s " Activation processing is not initiated for $actionRetentionTimeout ms " ,
565
+ completeErrorActivation)
535
566
536
567
stay
537
568
@@ -861,7 +892,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
861
892
// these schedulers will run forever and stop when the memory queue stops
862
893
private def startMonitoring (): (ActorRef , ActorRef ) = {
863
894
val droppingScheduler = Scheduler .scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
864
- checkToDropStaleActivation(queue, queueConfig.maxRetentionMs , invocationNamespace, action , stateName, self)
895
+ checkToDropStaleActivation(queue, actionRetentionTimeout , invocationNamespace, actionMetaData , stateName, self)
865
896
Future .successful(())
866
897
}
867
898
@@ -1055,11 +1086,12 @@ class MemoryQueue(private val etcdClient: EtcdClient,
1055
1086
causedBy ++ limits ++ binding
1056
1087
})
1057
1088
}
1089
+
1090
+ private def isWhiskError (error : ContainerCreationError ): Boolean = ContainerCreationError .whiskErrors.contains(error)
1058
1091
}
1059
1092
1060
1093
object MemoryQueue {
1061
1094
private [queue] val queueConfig = loadConfigOrThrow[QueueConfig ](ConfigKeys .schedulerQueue)
1062
- private [queue] val MaxRetentionTime = queueConfig.maxRetentionMs
1063
1095
1064
1096
def props (etcdClient : EtcdClient ,
1065
1097
durationChecker : DurationChecker ,
@@ -1105,21 +1137,27 @@ object MemoryQueue {
1105
1137
def dropOld (
1106
1138
queue : Queue [TimeSeriesActivationEntry ],
1107
1139
retention : Duration ,
1140
+ reason : String ,
1108
1141
completeErrorActivation : (ActivationMessage , String , Boolean ) => Future [Any ]): Queue [TimeSeriesActivationEntry ] = {
1109
1142
if (queue.isEmpty || Duration .between(queue.head.timestamp, Instant .now).compareTo(retention) < 0 )
1110
1143
queue
1111
1144
else {
1112
- completeErrorActivation(queue.head.msg, s " activation processing is not initiated for $MaxRetentionTime ms " , true )
1113
- dropOld(queue.tail, retention, completeErrorActivation)
1145
+ completeErrorActivation(queue.head.msg, reason , true )
1146
+ dropOld(queue.tail, retention, reason, completeErrorActivation)
1114
1147
}
1115
1148
}
1116
1149
1117
1150
def checkToDropStaleActivation (queue : Queue [TimeSeriesActivationEntry ],
1118
1151
maxRetentionMs : Long ,
1119
1152
invocationNamespace : String ,
1120
- action : FullyQualifiedEntityName ,
1153
+ actionMetaData : WhiskActionMetaData ,
1121
1154
stateName : MemoryQueueState ,
1122
1155
queueRef : ActorRef )(implicit logging : Logging ) = {
1156
+ val action = actionMetaData.fullyQualifiedName(true )
1157
+ logging.debug(
1158
+ this ,
1159
+ s " [ $invocationNamespace: $action: $stateName] use the given retention timeout: $maxRetentionMs for this action kind: ${actionMetaData.exec.kind}. " )
1160
+
1123
1161
if (queue.nonEmpty && Duration
1124
1162
.between(queue.head.timestamp, Instant .now)
1125
1163
.compareTo(Duration .ofMillis(maxRetentionMs)) >= 0 ) {
@@ -1130,6 +1168,14 @@ object MemoryQueue {
1130
1168
queueRef ! DropOld
1131
1169
}
1132
1170
}
1171
+
1172
+ private def getRetentionTimeout (actionMetaData : WhiskActionMetaData , queueConfig : QueueConfig ): Long = {
1173
+ if (actionMetaData.exec.kind == ExecMetaDataBase .BLACKBOX ) {
1174
+ queueConfig.maxBlackboxRetentionMs
1175
+ } else {
1176
+ queueConfig.maxRetentionMs
1177
+ }
1178
+ }
1133
1179
}
1134
1180
1135
1181
case class QueueSnapshot (initialized : Boolean ,
@@ -1151,6 +1197,7 @@ case class QueueConfig(idleGrace: FiniteDuration,
1151
1197
gracefulShutdownTimeout : FiniteDuration ,
1152
1198
maxRetentionSize : Int ,
1153
1199
maxRetentionMs : Long ,
1200
+ maxBlackboxRetentionMs : Long ,
1154
1201
throttlingFraction : Double ,
1155
1202
durationBufferSize : Int )
1156
1203
0 commit comments