Skip to content

Commit 80de54e

Browse files
Remove containers gradually when disable invoker (#5253)
1 parent 21b03a5 commit 80de54e

File tree

8 files changed

+162
-25
lines changed

8 files changed

+162
-25
lines changed

ansible/roles/invoker/tasks/deploy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@
287287
"CONFIG_whisk_containerPool_prewarmExpirationCheckIntervalVariance": "{{ container_pool_prewarm_expirationCheckIntervalVariance | default('10 seconds') }}"
288288
"CONFIG_whisk_containerPool_prewarmPromotion": "{{ container_pool_strict | default('false') | lower }}"
289289
"CONFIG_whisk_containerPool_prewarmMaxRetryLimit": "{{ container_pool_prewarm_max_retry_limit | default(5) }}"
290+
"CONFIG_whisk_containerPool_batchDeletionSize": "{{ container_pool_batchDeletionSize | default(10) }}"
290291
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
291292
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"
292293

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,14 @@ case class ContainerPoolConfig(userMemory: ByteSize,
5454
prewarmMaxRetryLimit: Int,
5555
prewarmPromotion: Boolean,
5656
memorySyncInterval: FiniteDuration,
57+
batchDeletionSize: Int,
5758
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) {
5859
require(
5960
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
6061
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
6162

6263
require(prewarmExpirationCheckInterval.toSeconds > 0, "prewarmExpirationCheckInterval must be > 0")
64+
require(batchDeletionSize > 0, "batch deletion size must be > 0")
6365

6466
/**
6567
* The shareFactor indicates the number of containers that would share a single core, on average.

core/invoker/src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ whisk {
6767
prewarm-max-retry-limit: 5 # max subsequent retry limit to create prewarm containers
6868
prewarm-promotion: false # if true, action can take prewarm container which has bigger memory
6969
memory-sync-interval: 1 second # period to sync memory info to etcd
70+
batch-deletion-size: 10 # batch size for removing containers when disable invoker, too big value may cause docker/k8s overload
7071
}
7172

7273
kubernetes {

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ class FunctionPullingContainerPool(
9494
private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
9595
private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
9696

97+
// for shutting down
98+
private var disablingPool = immutable.Set.empty[ActorRef]
99+
97100
private var shuttingDown = false
98101

99102
private val creationMessages = TrieMap[ActorRef, ContainerCreationMessage]()
@@ -353,18 +356,12 @@ class FunctionPullingContainerPool(
353356

354357
// Container got removed
355358
case ContainerRemoved(replacePrewarm) =>
356-
inProgressPool.get(sender()).foreach { _ =>
357-
inProgressPool = inProgressPool - sender()
358-
}
359-
360-
warmedPool.get(sender()).foreach { _ =>
361-
warmedPool = warmedPool - sender()
362-
}
359+
inProgressPool = inProgressPool - sender()
360+
warmedPool = warmedPool - sender()
361+
disablingPool -= sender()
363362

364363
// container was busy (busy indicates at full capacity), so there is capacity to accept another job request
365-
busyPool.get(sender()).foreach { _ =>
366-
busyPool = busyPool - sender()
367-
}
364+
busyPool = busyPool - sender()
368365

369366
//in case this was a prewarm
370367
prewarmedPool.get(sender()).foreach { data =>
@@ -601,11 +598,26 @@ class FunctionPullingContainerPool(
601598
* Make all busyPool's memoryQueue actor shutdown gracefully
602599
*/
603600
private def waitForPoolToClear(): Unit = {
604-
busyPool.keys.foreach(_ ! GracefulShutdown)
605-
warmedPool.keys.foreach(_ ! GracefulShutdown)
606-
if (inProgressPool.nonEmpty) {
601+
val pool = self
602+
// how many busy containers will be removed in this term
603+
val slotsForBusyPool = math.max(poolConfig.batchDeletionSize - disablingPool.size, 0)
604+
(busyPool.keySet &~ disablingPool)
605+
.take(slotsForBusyPool)
606+
.foreach(container => {
607+
disablingPool += container
608+
container ! GracefulShutdown
609+
})
610+
// how many warm containers will be removed in this term
611+
val slotsForWarmPool = math.max(poolConfig.batchDeletionSize - disablingPool.size, 0)
612+
(warmedPool.keySet &~ disablingPool)
613+
.take(slotsForWarmPool)
614+
.foreach(container => {
615+
disablingPool += container
616+
container ! GracefulShutdown
617+
})
618+
if (inProgressPool.nonEmpty || busyPool.size + warmedPool.size > slotsForBusyPool + slotsForWarmPool) {
607619
context.system.scheduler.scheduleOnce(5.seconds) {
608-
waitForPoolToClear()
620+
pool ! GracefulShutdown
609621
}
610622
}
611623
}

tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ class ContainerPoolTests
134134
}
135135

136136
def poolConfig(userMemory: ByteSize) =
137-
ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second)
137+
ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second, 10)
138138

139139
behavior of "ContainerPool"
140140

@@ -818,7 +818,8 @@ class ContainerPoolTests
818818
100,
819819
3,
820820
false,
821-
1.second)
821+
1.second,
822+
10)
822823
val initialCount = 2
823824
val pool =
824825
system.actorOf(
@@ -864,7 +865,8 @@ class ContainerPoolTests
864865
100,
865866
3,
866867
false,
867-
1.second)
868+
1.second,
869+
10)
868870
val minCount = 0
869871
val initialCount = 2
870872
val maxCount = 4
@@ -1237,7 +1239,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
12371239
}
12381240

12391241
it should "remove expired in order of expiration" in {
1240-
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second)
1242+
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second, 10)
12411243
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
12421244
//use a second kind so that we know sorting is not isolated to the expired of each kind
12431245
val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", ImageName("testImage")), "testCode", None)
@@ -1261,7 +1263,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
12611263

12621264
it should "remove only the prewarmExpirationLimit of expired prewarms" in {
12631265
//limit prewarm removal to 2
1264-
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 2, 3, false, 1.second)
1266+
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 2, 3, false, 1.second, 10)
12651267
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
12661268
val memoryLimit = 256.MB
12671269
val prewarmConfig =
@@ -1287,7 +1289,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
12871289

12881290
it should "remove only the expired prewarms regardless of minCount" in {
12891291
//limit prewarm removal to 100
1290-
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 100, 3, false, 1.second)
1292+
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 100, 3, false, 1.second, 10)
12911293
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
12921294
val memoryLimit = 256.MB
12931295
//minCount is 2 - should leave at least 2 prewarms when removing expired

tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ class ContainerProxyTests
276276
(transid: TransactionId, activation: WhiskActivation, isBlockingActivation: Boolean, context: UserContext) =>
277277
Future.successful(())
278278
}
279-
val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second)
279+
val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second, 10)
280280
def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
281281
val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))
282282

tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ class FunctionPullingContainerPoolTests
180180
memorySyncInterval: FiniteDuration = FiniteDuration(1, TimeUnit.SECONDS),
181181
prewarmMaxRetryLimit: Int = 3,
182182
prewarmPromotion: Boolean = false,
183+
batchDeletionSize: Int = 10,
183184
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) =
184185
ContainerPoolConfig(
185186
userMemory,
@@ -192,6 +193,7 @@ class FunctionPullingContainerPoolTests
192193
prewarmMaxRetryLimit,
193194
prewarmPromotion,
194195
memorySyncInterval,
196+
batchDeletionSize,
195197
prewarmContainerCreationConfig)
196198

197199
def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
@@ -309,6 +311,118 @@ class FunctionPullingContainerPoolTests
309311
}
310312
}
311313

314+
it should "stop containers gradually when shut down" in within(timeout * 20) {
315+
val (containers, factory) = testContainers(10)
316+
val doc = put(entityStore, bigWhiskAction)
317+
val topic = s"creationAck${schedulerInstanceId.asString}"
318+
val consumer = new TestConnector(topic, 4, true)
319+
val pool = system.actorOf(
320+
Props(new FunctionPullingContainerPool(
321+
factory,
322+
invokerHealthService.ref,
323+
poolConfig(MemoryLimit.STD_MEMORY * 20, batchDeletionSize = 3),
324+
invokerInstance,
325+
List.empty,
326+
sendAckToScheduler(consumer.getProducer()))))
327+
328+
(0 to 10).foreach(_ => pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)) // 11 * stdMemory taken)
329+
(0 to 10).foreach(i => {
330+
containers(i).expectMsgPF() {
331+
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
332+
}
333+
// create 5 container in busy pool, and 6 in warmed pool
334+
if (i < 5)
335+
containers(i).send(pool, Initialized(initializedData)) // container is initialized
336+
else
337+
containers(i).send(
338+
pool,
339+
ContainerIsPaused(
340+
WarmData(
341+
stub[DockerContainer],
342+
invocationNamespace.asString,
343+
whiskAction.toExecutableWhiskAction.get,
344+
doc.rev,
345+
Instant.now,
346+
TestProbe().ref)))
347+
})
348+
349+
// disable
350+
pool ! GracefulShutdown
351+
// at first, 3 containers will be removed from busy pool, and left containers will not
352+
var disablingContainers = Set.empty[Int]
353+
(0 to 10).foreach(i => {
354+
try {
355+
containers(i).expectMsg(1.second, GracefulShutdown)
356+
disablingContainers += i
357+
} catch {
358+
case _: Throwable =>
359+
}
360+
})
361+
assert(disablingContainers.size == 3, "more than 3 containers is shutting down")
362+
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))
363+
364+
Thread.sleep(3000)
365+
var completedContainer = -1
366+
(0 to 10)
367+
.filter(!disablingContainers.contains(_))
368+
.foreach(i => {
369+
try {
370+
containers(i).expectMsg(1.second, GracefulShutdown)
371+
disablingContainers += i
372+
// only make one container complete shutting down
373+
if (completedContainer == -1)
374+
completedContainer = i
375+
} catch {
376+
case _: Throwable =>
377+
}
378+
})
379+
assert(disablingContainers.size == 6, "more than 3 containers is shutting down")
380+
containers(completedContainer).send(pool, ContainerRemoved(false))
381+
382+
Thread.sleep(3000)
383+
(0 to 10)
384+
.filter(!disablingContainers.contains(_))
385+
.foreach(i => {
386+
try {
387+
containers(i).expectMsg(1.second, GracefulShutdown)
388+
disablingContainers += i
389+
} catch {
390+
case _: Throwable =>
391+
}
392+
})
393+
// there should be only one more container going to shut down
394+
assert(disablingContainers.size == 7, "more than 3 containers is shutting down")
395+
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))
396+
397+
Thread.sleep(3000)
398+
(0 to 10)
399+
.filter(!disablingContainers.contains(_))
400+
.foreach(i => {
401+
try {
402+
containers(i).expectMsg(1.second, GracefulShutdown)
403+
disablingContainers += i
404+
} catch {
405+
case _: Throwable =>
406+
}
407+
})
408+
assert(disablingContainers.size == 10, "more than 3 containers is shutting down")
409+
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))
410+
411+
Thread.sleep(3000)
412+
(0 to 10)
413+
.filter(!disablingContainers.contains(_))
414+
.foreach(i => {
415+
try {
416+
containers(i).expectMsg(1.second, GracefulShutdown)
417+
disablingContainers += i
418+
} catch {
419+
case _: Throwable =>
420+
}
421+
})
422+
assert(disablingContainers.size == 11, "unexpected containers is shutting down")
423+
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))
424+
}
425+
312426
it should "create prewarmed containers on startup" in within(timeout) {
313427
stream.reset()
314428
val (containers, factory) = testContainers(1)
@@ -343,6 +457,7 @@ class FunctionPullingContainerPoolTests
343457
3,
344458
false,
345459
FiniteDuration(10, TimeUnit.SECONDS),
460+
10,
346461
prewarmContainerCreationConfig)
347462

348463
val pool = system.actorOf(
@@ -906,7 +1021,8 @@ class FunctionPullingContainerPoolTests
9061021
100,
9071022
3,
9081023
false,
909-
1.second)
1024+
1.second,
1025+
10)
9101026
val initialCount = 2
9111027
val pool = system.actorOf(
9121028
Props(
@@ -958,7 +1074,8 @@ class FunctionPullingContainerPoolTests
9581074
100,
9591075
3,
9601076
false,
961-
1.second)
1077+
1.second,
1078+
10)
9621079
val minCount = 0
9631080
val initialCount = 2
9641081
val maxCount = 4
@@ -1105,7 +1222,8 @@ class FunctionPullingContainerPoolTests
11051222
100,
11061223
maxRetryLimit,
11071224
false,
1108-
1.second)
1225+
1.second,
1226+
10)
11091227
val initialCount = 1
11101228
val pool = system.actorOf(
11111229
Props(

tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ class FunctionPullingContainerProxyTests
115115
100,
116116
3,
117117
false,
118-
1.second)
118+
1.second,
119+
10)
119120

120121
val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds, 5.seconds)
121122

0 commit comments

Comments
 (0)