Skip to content

Commit 8374e39

Browse files
committed
Fix review points
1 parent 9ae9c52 commit 8374e39

File tree

2 files changed

+38
-38
lines changed

2 files changed

+38
-38
lines changed

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ import scala.concurrent.duration._
4848
import scala.util.{Random, Try}
4949
import scala.collection.immutable.Queue
5050

51-
case class Creation(creationMessage: ContainerCreationMessage, action: WhiskAction)
52-
case class Deletion(deletionMessage: ContainerDeletionMessage)
51+
case class CreationContainer(creationMessage: ContainerCreationMessage, action: WhiskAction)
52+
case class DeletionContainer(deletionMessage: ContainerDeletionMessage)
5353
case object Remove
5454
case class Keep(timeout: FiniteDuration)
5555
case class PrewarmContainer(maxConcurrent: Int)
@@ -166,7 +166,7 @@ class FunctionPullingContainerPool(
166166
}
167167
}
168168

169-
case Creation(create: ContainerCreationMessage, action: WhiskAction) =>
169+
case CreationContainer(create: ContainerCreationMessage, action: WhiskAction) =>
170170
if (shuttingDown) {
171171
val message =
172172
s"creationId: ${create.creationId}, invoker is shutting down, reschedule ${action.fullyQualifiedName(false)}"
@@ -217,7 +217,7 @@ class FunctionPullingContainerPool(
217217
}
218218
}
219219

220-
case Deletion(deletionMessage: ContainerDeletionMessage) =>
220+
case DeletionContainer(deletionMessage: ContainerDeletionMessage) =>
221221
val oldRevision = deletionMessage.revision
222222
val invocationNamespace = deletionMessage.invocationNamespace
223223
val fqn = deletionMessage.action.copy(version = None)

tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,12 @@ class FunctionPullingContainerPoolTests
226226
List.empty,
227227
sendAckToScheduler(producer))))
228228

229-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
229+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
230230
containers(0).expectMsgPF() {
231231
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
232232
}
233233

234-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
234+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
235235
containers(1).expectMsgPF() {
236236
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
237237
}
@@ -253,19 +253,19 @@ class FunctionPullingContainerPoolTests
253253
sendAckToScheduler(producer))))
254254

255255
// Start first action
256-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
256+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
257257
containers(0).expectMsgPF() {
258258
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
259259
}
260260

261261
// Send second action to the pool
262-
pool ! Creation(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction) // message is too large to be processed immediately.
262+
pool ! CreationContainer(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction) // message is too large to be processed immediately.
263263
containers(1).expectNoMessage(100.milliseconds)
264264

265265
// First container is removed
266266
containers(0).send(pool, ContainerRemoved(true)) // pool is empty again.
267267

268-
pool ! Creation(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction)
268+
pool ! CreationContainer(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction)
269269
// Second container should run now
270270
containers(1).expectMsgPF() {
271271
case Initialize(invocationNamespace, bigExecuteAction, schedulerHost, rpcPort, _) => true
@@ -288,7 +288,7 @@ class FunctionPullingContainerPoolTests
288288
sendAckToScheduler(consumer.getProducer()))))
289289

290290
pool ! GracefulShutdown
291-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
291+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
292292

293293
containers(0).expectNoMessage()
294294

@@ -306,7 +306,7 @@ class FunctionPullingContainerPoolTests
306306

307307
// pool should be back to work after enabled again
308308
pool ! Enable
309-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
309+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
310310
containers(0).expectMsgPF() {
311311
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
312312
}
@@ -412,7 +412,7 @@ class FunctionPullingContainerPoolTests
412412
containers(1).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = biggerMemory)))
413413

414414
// the prewarm container with matched memory should be chose
415-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
415+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
416416
containers(0).expectMsgPF() {
417417
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
418418
}
@@ -423,7 +423,7 @@ class FunctionPullingContainerPoolTests
423423
}
424424

425425
// the prewarm container with bigger memory should not be chose
426-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
426+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
427427
containers(3).expectMsgPF() {
428428
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
429429
}
@@ -452,7 +452,7 @@ class FunctionPullingContainerPoolTests
452452
containers(1).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = biggestMemory)))
453453

454454
// the prewarm container with smallest memory should be chose
455-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
455+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
456456
containers(0).expectMsgPF() {
457457
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
458458
}
@@ -463,7 +463,7 @@ class FunctionPullingContainerPoolTests
463463
}
464464

465465
// the prewarm container with bigger memory should be chose
466-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
466+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
467467
containers(1).expectMsgPF() {
468468
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
469469
}
@@ -475,7 +475,7 @@ class FunctionPullingContainerPoolTests
475475

476476
// now free memory is (6 - 3 - 1) * stdMemory, and required 2 * stdMemory, so both two prewarmed containers are not suitable
477477
// a new container should be created
478-
pool ! Creation(creationMessageLarge.copy(revision = doc.rev), bigWhiskAction)
478+
pool ! CreationContainer(creationMessageLarge.copy(revision = doc.rev), bigWhiskAction)
479479
containers(4).expectMsgPF() {
480480
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
481481
}
@@ -502,7 +502,7 @@ class FunctionPullingContainerPoolTests
502502
containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed
503503
containers(0).send(pool, ReadyToWork(prewarmedData.copy(kind = alternativeExec.kind))) // container0 was started
504504

505-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
505+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
506506
containers(1).expectMsgPF() {
507507
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
508508
}
@@ -526,7 +526,7 @@ class FunctionPullingContainerPoolTests
526526
containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed
527527
containers(0).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = alternativeLimit))) // container0 was started
528528

529-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
529+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
530530
containers(1).expectMsgPF() {
531531
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
532532
}
@@ -560,24 +560,24 @@ class FunctionPullingContainerPoolTests
560560
container.ref)
561561

562562
// the revision doesn't match, create 1 container
563-
pool ! Creation(creationMessage, whiskAction)
563+
pool ! CreationContainer(creationMessage, whiskAction)
564564
containers(0).expectMsgPF() {
565565
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
566566
}
567567

568568
// the invocation namespace doesn't match, create 1 container
569-
pool ! Creation(creationMessage.copy(invocationNamespace = "otherNamespace"), whiskAction)
569+
pool ! CreationContainer(creationMessage.copy(invocationNamespace = "otherNamespace"), whiskAction)
570570
containers(1).expectMsgPF() {
571571
case Initialize("otherNamespace", executeAction, schedulerHost, rpcPort, _) => true
572572
}
573573

574-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
574+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
575575
container.expectMsgPF() {
576576
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
577577
}
578578

579579
// warmed container is occupied, create 1 more container
580-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
580+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
581581
containers(2).expectMsgPF() {
582582
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
583583
}
@@ -612,7 +612,7 @@ class FunctionPullingContainerPoolTests
612612
container.ref)
613613

614614
// choose the warmed container
615-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
615+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
616616
container.expectMsgPF() {
617617
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
618618
}
@@ -687,7 +687,7 @@ class FunctionPullingContainerPoolTests
687687
container3.ref)
688688

689689
// now the pool has no free memory, and new job needs 2*stdMemory, so it needs to remove two warmed containers
690-
pool ! Creation(creationMessage, bigWhiskAction)
690+
pool ! CreationContainer(creationMessage, bigWhiskAction)
691691
container1.expectMsg(Remove)
692692
container2.expectMsg(Remove)
693693
container3.expectNoMessage()
@@ -736,7 +736,7 @@ class FunctionPullingContainerPoolTests
736736
val actualCreationMessage = creationMessage.copy(revision = doc.rev)
737737
val ackMessage = createAckMsg(actualCreationMessage, None, None)
738738

739-
pool ! Creation(actualCreationMessage, whiskAction)
739+
pool ! CreationContainer(actualCreationMessage, whiskAction)
740740
containers(0).expectMsgPF() {
741741
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
742742
}
@@ -784,7 +784,7 @@ class FunctionPullingContainerPoolTests
784784
container.ref)
785785

786786
// choose the warmed container
787-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
787+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
788788
container.expectMsgPF() {
789789
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
790790
}
@@ -830,7 +830,7 @@ class FunctionPullingContainerPoolTests
830830
val ackMessage =
831831
createAckMsg(actualCreationMessageLarge, Some(ResourceNotEnoughError), Some(error))
832832

833-
pool ! Creation(actualCreationMessageLarge, bigWhiskAction)
833+
pool ! CreationContainer(actualCreationMessageLarge, bigWhiskAction)
834834

835835
utilRetry({
836836
val buffer = consumer.peek(50.millisecond)
@@ -842,7 +842,7 @@ class FunctionPullingContainerPoolTests
842842
val actualCreationMessage = creationMessage.copy(revision = doc2.rev)
843843
val rescheduleAckMsg = createAckMsg(actualCreationMessage, Some(UnknownError), Some("ContainerProxy init failed."))
844844

845-
pool ! Creation(actualCreationMessage, whiskAction)
845+
pool ! CreationContainer(actualCreationMessage, whiskAction)
846846
containers(0).expectMsgPF() {
847847
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
848848
}
@@ -883,9 +883,9 @@ class FunctionPullingContainerPoolTests
883883
sendAckToScheduler(producer))))
884884
containers(0).expectMsg(Start(exec, memoryLimit))
885885

886-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
887-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
888-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
886+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
887+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
888+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
889889

890890
awaitAssert {
891891
count shouldBe 3
@@ -1011,11 +1011,11 @@ class FunctionPullingContainerPoolTests
10111011
stream.toString should not include (s"removed ${initialCount} expired prewarmed container")
10121012

10131013
// 2 cold start happened
1014-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
1014+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
10151015
containers(2).expectMsgPF() {
10161016
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
10171017
}
1018-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
1018+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
10191019
containers(3).expectMsgPF() {
10201020
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
10211021
}
@@ -1049,23 +1049,23 @@ class FunctionPullingContainerPoolTests
10491049
stream.reset()
10501050

10511051
// 5 code start happened(5 > maxCount)
1052-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
1052+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
10531053
containers(6).expectMsgPF() {
10541054
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
10551055
}
1056-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
1056+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
10571057
containers(7).expectMsgPF() {
10581058
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
10591059
}
1060-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
1060+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
10611061
containers(8).expectMsgPF() {
10621062
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
10631063
}
1064-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
1064+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
10651065
containers(9).expectMsgPF() {
10661066
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
10671067
}
1068-
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
1068+
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
10691069
containers(10).expectMsgPF() {
10701070
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
10711071
}

0 commit comments

Comments
 (0)