Skip to content

Commit da47d71

Browse files
committed
Consider binding action when creating or recovering queue.
1 parent 8843579 commit da47d71

File tree

8 files changed

+98
-81
lines changed

8 files changed

+98
-81
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class WatcherService(etcdClient: EtcdClient)(implicit logging: Logging, actorSys
128128
deleteWatchers.update(watcherKey, sender())
129129

130130
case request: UnwatchEndpoint =>
131+
logging.info(this, s"unwatch endpoint: $request")
131132
val watcherKey = WatcherKey(request.watchKey, request.watchName)
132133
if (request.isPrefix) {
133134
prefixPutWatchers.remove(watcherKey)

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ class FPCPoolBalancer(config: WhiskConfig,
174174
if (retryCount >= 0)
175175
scheduler
176176
.getRemoteRef(QueueManager.actorName)
177-
.ask(CreateQueue(invocationNamespace, fullyQualifiedEntityName.copy(binding = None), revision, actionMetaData))
177+
.ask(CreateQueue(invocationNamespace, fullyQualifiedEntityName, revision, actionMetaData))
178178
.mapTo[CreateQueueResponse]
179179
.onComplete {
180180
case Success(_) =>

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,13 @@ class FunctionPullingContainerPool(
680680
case Some(((proxy, data), containerState)) =>
681681
// record creationMessage so when container created failed, we can send failed message to scheduler
682682
creationMessages.getOrElseUpdate(proxy, create)
683-
proxy ! Initialize(create.invocationNamespace, executable, create.schedulerHost, create.rpcPort, create.transid)
683+
proxy ! Initialize(
684+
create.invocationNamespace,
685+
create.action,
686+
executable,
687+
create.schedulerHost,
688+
create.rpcPort,
689+
create.transid)
684690
inProgressPool = inProgressPool + (proxy -> data)
685691
logContainerStart(create, executable.toWhiskAction, containerState)
686692

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ case class InitCodeCompleted(data: WarmData)
6868

6969
// Events received by the actor
7070
case class Initialize(invocationNamespace: String,
71+
fqn: FullyQualifiedEntityName,
7172
action: ExecutableWhiskAction,
7273
schedulerHost: String,
7374
rpcPort: Int,
@@ -251,7 +252,7 @@ class FunctionPullingContainerProxy(
251252
clientProxyFactory(
252253
context,
253254
job.invocationNamespace,
254-
job.action.fullyQualifiedName(true),
255+
job.fqn, // include binding field
255256
job.action.rev,
256257
job.schedulerHost,
257258
job.rpcPort,
@@ -292,7 +293,7 @@ class FunctionPullingContainerProxy(
292293
clientProxyFactory(
293294
context,
294295
job.invocationNamespace,
295-
job.action.fullyQualifiedName(true),
296+
job.fqn, // include binding field
296297
job.action.rev,
297298
job.schedulerHost,
298299
job.rpcPort,

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ class InvokerHealthManager(instanceId: InvokerInstanceId,
172172

173173
WhiskAction.get(entityStore, docId).onComplete {
174174
case Success(action) =>
175-
val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
175+
val initialize =
176+
Initialize(namespace, action.fullyQualifiedName(true), action.toExecutableWhiskAction.get, "", 0, transid)
176177
startHealthAction(initialize, manager)
177178
case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
178179
}

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ class QueueManager(
179179
}
180180

181181
case RecoverQueue(msg, action, actionMetaData) =>
182-
QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
182+
QueuePool.keys.find(k =>
183+
k.invocationNamespace == msg.user.namespace.name.asString && k.docInfo.id == action.toDocId) match {
183184
// queue is already recovered or a newer queue is created, send msg to new queue
184185
case Some(key) if key.docInfo.rev >= msg.revision =>
185186
QueuePool.get(key) match {
@@ -358,10 +359,12 @@ class QueueManager(
358359
}
359360

360361
private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
361-
logging.warn(this, s"queue for ${msg.action} doesn't exist in memory but exist in etcd, recovering...")
362+
logging.warn(
363+
this,
364+
s"queue for ${msg.user.namespace.name.asString}/${msg.action} doesn't exist in memory but exist in etcd, recovering...")
362365
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
363366

364-
logging.info(this, s"Recover a queue for ${msg.action},")
367+
logging.info(this, s"Recover a queue for ${msg.user.namespace.name.asString}/${msg.action},")
365368
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
366369
.map { actionMetaData: WhiskActionMetaData =>
367370
actionMetaData.toExecutableWhiskAction match {
@@ -397,7 +400,7 @@ class QueueManager(
397400
logging.warn(
398401
this,
399402
s"[${msg.activationId}] the activation message has not been scheduled for ${queueManagerConfig.maxSchedulingTime.toSeconds} sec")
400-
completeErrorActivation(msg, "The activation has not been processed")
403+
completeErrorActivation(msg, "The activation has not been processed: too old activation is arrived.")
401404
} else {
402405
QueuePool.get(MemoryQueueKey(msg.user.namespace.name.asString, msg.action.toDocId.asDocInfo(msg.revision))) match {
403406
case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
@@ -466,13 +469,15 @@ class QueueManager(
466469
.recoverWith {
467470
case t =>
468471
logging.warn(this, s"[${msg.activationId}] failed to parse endpoints (${t.getMessage})")
469-
completeErrorActivation(msg, "The activation has not been processed")
472+
completeErrorActivation(
473+
msg,
474+
"The activation has not been processed: failed to parse the scheduler endpoint.")
470475
}
471476

472477
} recoverWith {
473478
case t =>
474479
logging.warn(this, s"[${msg.activationId}] activation has been dropped (${t.getMessage})")
475-
completeErrorActivation(msg, "The activation has not been processed")
480+
completeErrorActivation(msg, "The activation has not been processed: failed to get the queue endpoint.")
476481
}
477482
}
478483

tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -227,12 +227,12 @@ class FunctionPullingContainerPoolTests
227227

228228
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
229229
containers(0).expectMsgPF() {
230-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
230+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
231231
}
232232

233233
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
234234
containers(1).expectMsgPF() {
235-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
235+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
236236
}
237237
}
238238

@@ -254,7 +254,7 @@ class FunctionPullingContainerPoolTests
254254
// Start first action
255255
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
256256
containers(0).expectMsgPF() {
257-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
257+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
258258
}
259259

260260
// Send second action to the pool
@@ -267,7 +267,7 @@ class FunctionPullingContainerPoolTests
267267
pool ! CreationContainer(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction)
268268
// Second container should run now
269269
containers(1).expectMsgPF() {
270-
case Initialize(invocationNamespace, bigExecuteAction, schedulerHost, rpcPort, _) => true
270+
case Initialize(invocationNamespace, fqn, bigExecuteAction, schedulerHost, rpcPort, _) => true
271271
}
272272
}
273273

@@ -307,7 +307,7 @@ class FunctionPullingContainerPoolTests
307307
pool ! Enable
308308
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
309309
containers(0).expectMsgPF() {
310-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
310+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
311311
}
312312
}
313313

@@ -328,7 +328,7 @@ class FunctionPullingContainerPoolTests
328328
(0 to 10).foreach(_ => pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)) // 11 * stdMemory taken)
329329
(0 to 10).foreach(i => {
330330
containers(i).expectMsgPF() {
331-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
331+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
332332
}
333333
// create 5 container in busy pool, and 6 in warmed pool
334334
if (i < 5)
@@ -526,7 +526,7 @@ class FunctionPullingContainerPoolTests
526526
// the prewarm container with matched memory should be chose
527527
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
528528
containers(0).expectMsgPF() {
529-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
529+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
530530
}
531531

532532
// prewarm a new container
@@ -537,7 +537,7 @@ class FunctionPullingContainerPoolTests
537537
// the prewarm container with bigger memory should not be chose
538538
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
539539
containers(3).expectMsgPF() {
540-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
540+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
541541
}
542542
}
543543

@@ -566,7 +566,7 @@ class FunctionPullingContainerPoolTests
566566
// the prewarm container with smallest memory should be chose
567567
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
568568
containers(0).expectMsgPF() {
569-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
569+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
570570
}
571571

572572
// prewarm a new container
@@ -577,7 +577,7 @@ class FunctionPullingContainerPoolTests
577577
// the prewarm container with bigger memory should be chose
578578
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
579579
containers(1).expectMsgPF() {
580-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
580+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
581581
}
582582

583583
// prewarm a new container
@@ -589,7 +589,7 @@ class FunctionPullingContainerPoolTests
589589
// a new container should be created
590590
pool ! CreationContainer(creationMessageLarge.copy(revision = doc.rev), bigWhiskAction)
591591
containers(4).expectMsgPF() {
592-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
592+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
593593
}
594594

595595
// no new prewarmed container should be created
@@ -616,7 +616,7 @@ class FunctionPullingContainerPoolTests
616616

617617
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
618618
containers(1).expectMsgPF() {
619-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
619+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
620620
}
621621
}
622622

@@ -640,7 +640,7 @@ class FunctionPullingContainerPoolTests
640640

641641
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
642642
containers(1).expectMsgPF() {
643-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
643+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
644644
}
645645
}
646646

@@ -674,24 +674,24 @@ class FunctionPullingContainerPoolTests
674674
// the revision doesn't match, create 1 container
675675
pool ! CreationContainer(creationMessage, whiskAction)
676676
containers(0).expectMsgPF() {
677-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
677+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
678678
}
679679

680680
// the invocation namespace doesn't match, create 1 container
681681
pool ! CreationContainer(creationMessage.copy(invocationNamespace = "otherNamespace"), whiskAction)
682682
containers(1).expectMsgPF() {
683-
case Initialize("otherNamespace", executeAction, schedulerHost, rpcPort, _) => true
683+
case Initialize("otherNamespace", fqn, executeAction, schedulerHost, rpcPort, _) => true
684684
}
685685

686686
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
687687
container.expectMsgPF() {
688-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
688+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
689689
}
690690

691691
// warmed container is occupied, create 1 more container
692692
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
693693
containers(2).expectMsgPF() {
694-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
694+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
695695
}
696696
}
697697

@@ -726,7 +726,7 @@ class FunctionPullingContainerPoolTests
726726
// choose the warmed container
727727
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
728728
container.expectMsgPF() {
729-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
729+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
730730
}
731731

732732
// warmed container is failed to resume
@@ -743,7 +743,7 @@ class FunctionPullingContainerPoolTests
743743

744744
// then a new container will be created
745745
containers(0).expectMsgPF() {
746-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
746+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
747747
}
748748
}
749749

@@ -806,7 +806,7 @@ class FunctionPullingContainerPoolTests
806806

807807
// a new container will be created
808808
containers(0).expectMsgPF() {
809-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
809+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
810810
}
811811
}
812812

@@ -850,7 +850,7 @@ class FunctionPullingContainerPoolTests
850850

851851
pool ! CreationContainer(actualCreationMessage, whiskAction)
852852
containers(0).expectMsgPF() {
853-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
853+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
854854
}
855855
containers(0).send(pool, Initialized(initializedData)) // container is initialized
856856

@@ -898,7 +898,7 @@ class FunctionPullingContainerPoolTests
898898
// choose the warmed container
899899
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
900900
container.expectMsgPF() {
901-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
901+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
902902
}
903903
pool.tell(
904904
Resumed(
@@ -956,7 +956,7 @@ class FunctionPullingContainerPoolTests
956956

957957
pool ! CreationContainer(actualCreationMessage, whiskAction)
958958
containers(0).expectMsgPF() {
959-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
959+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
960960
}
961961
containers(0).send(pool, ContainerRemoved(true)) // the container0 init failed or create container failed
962962

@@ -1127,11 +1127,11 @@ class FunctionPullingContainerPoolTests
11271127
// 2 cold start happened
11281128
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
11291129
containers(2).expectMsgPF() {
1130-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
1130+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
11311131
}
11321132
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
11331133
containers(3).expectMsgPF() {
1134-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
1134+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
11351135
}
11361136
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
11371137
Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
@@ -1165,23 +1165,23 @@ class FunctionPullingContainerPoolTests
11651165
// 5 code start happened(5 > maxCount)
11661166
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
11671167
containers(6).expectMsgPF() {
1168-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
1168+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
11691169
}
11701170
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
11711171
containers(7).expectMsgPF() {
1172-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
1172+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
11731173
}
11741174
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
11751175
containers(8).expectMsgPF() {
1176-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
1176+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
11771177
}
11781178
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
11791179
containers(9).expectMsgPF() {
1180-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
1180+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
11811181
}
11821182
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
11831183
containers(10).expectMsgPF() {
1184-
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
1184+
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
11851185
}
11861186

11871187
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time

0 commit comments

Comments
 (0)