Skip to content

Commit f1dca88

Browse files
committed
Support graceful shutdown.
1 parent 4016aa8 commit f1dca88

File tree

4 files changed

+52
-4
lines changed

4 files changed

+52
-4
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,13 @@ trait Container {
135135
transid.failed(this, start, s"initializiation failed with $t")
136136
}
137137
.flatMap { result =>
138+
// if runtime container is shutting down, reschedule the activation message
139+
result.response.right.map { res =>
140+
if (res.shuttingDown) {
141+
throw ContainerHealthError(transid, containerId.asString)
142+
}
143+
}
144+
138145
if (result.ok) {
139146
Future.successful(result.interval)
140147
} else if (result.interval.duration >= timeout) {
@@ -180,6 +187,13 @@ trait Container {
180187
transid.failed(this, start, s"run failed with $t")
181188
}
182189
.map { result =>
190+
// if runtime container is shutting down, reschedule the activation message
191+
result.response.right.map { res =>
192+
if (res.shuttingDown) {
193+
throw ContainerHealthError(transid, containerId.asString)
194+
}
195+
}
196+
183197
val response = if (result.interval.duration >= timeout) {
184198
ActivationResponse.developerError(Messages.timedoutActivation(timeout, false))
185199
} else {

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,9 @@
1818
package org.apache.openwhisk.core.entity
1919

2020
import scala.util.Try
21-
22-
import akka.http.scaladsl.model.StatusCodes.OK
23-
21+
import akka.http.scaladsl.model.StatusCodes.{OK, ServiceUnavailable}
2422
import spray.json._
2523
import spray.json.DefaultJsonProtocol
26-
2724
import org.apache.openwhisk.common.Logging
2825
import org.apache.openwhisk.http.Messages._
2926

@@ -139,6 +136,10 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
139136
/** true iff status code is OK (HTTP 200 status code), anything else is considered an error. **/
140137
val okStatus = statusCode == OK.intValue
141138
val ok = okStatus && truncated.isEmpty
139+
140+
/** true iff status code is ServiceUnavailable (HTTP 503 status code) */
141+
val shuttingDown = statusCode == ServiceUnavailable.intValue
142+
142143
override def toString = {
143144
val base = if (okStatus) "ok" else "not ok"
144145
val rest = truncated.map(e => s", truncated ${e.toString}").getOrElse("")

tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,23 @@ class DockerContainerTests
474474
end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
475475
}
476476

477+
it should "throw ContainerHealthError if runtime container returns 503 response" in {
478+
implicit val docker = stub[DockerApiWithFileAccess]
479+
implicit val runc = stub[RuncApi]
480+
481+
val interval = intervalOf(1.millisecond)
482+
val result = JsObject.empty
483+
val container = dockerContainer() {
484+
Future.successful(RunResult(interval, Right(ContainerResponse(503, result.compactPrint, None))))
485+
}
486+
487+
val initResult = container.initialize(JsObject.empty, 1.second, 1)
488+
an[ContainerHealthError] should be thrownBy await(initResult)
489+
490+
val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
491+
an[ContainerHealthError] should be thrownBy await(runResult)
492+
}
493+
477494
it should "properly deal with a timeout during run" in {
478495
implicit val docker = stub[DockerApiWithFileAccess]
479496
implicit val runc = stub[RuncApi]

tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,22 @@ class KubernetesContainerTests
277277
end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
278278
}
279279

280+
it should "throw ContainerHealthError if runtime container returns 503 response" in {
281+
implicit val kubernetes = stub[KubernetesApi]
282+
283+
val interval = intervalOf(1.millisecond)
284+
val result = JsObject.empty
285+
val container = kubernetesContainer() {
286+
Future.successful(RunResult(interval, Right(ContainerResponse(503, result.compactPrint, None))))
287+
}
288+
289+
val initResult = container.initialize(JsObject.empty, 1.second, 1)
290+
an[ContainerHealthError] should be thrownBy await(initResult)
291+
292+
val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
293+
an[ContainerHealthError] should be thrownBy await(runResult)
294+
}
295+
280296
it should "properly deal with a timeout during run" in {
281297
implicit val kubernetes = stub[KubernetesApi]
282298

0 commit comments

Comments
 (0)