Skip to content

Commit 46a9574

Browse files
committed
Implement FunctionPullingContainerProxy
1 parent 3802374 commit 46a9574

File tree

28 files changed

+4616
-120
lines changed

28 files changed

+4616
-120
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
191191
case Some(parent) => findRoot(parent)
192192
case _ => meta
193193
}
194+
195+
def serialize = TransactionId.serdes.write(this).compactPrint
194196
}
195197

196198
/**
@@ -238,6 +240,7 @@ object TransactionId {
238240
val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity
239241
val invokerHealthManager = TransactionId(systemPrefix + "invokerHealthManager") // Invoker startup/shutdown or GC activity
240242
val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker warmup thread that makes stem-cell containers
243+
val invokerColdstart = TransactionId(systemPrefix + "invokerColdstart") //Invoker cold start thread
241244
val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker nanny thread
242245
val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message dispatcher
243246
val loadbalancer = TransactionId(systemPrefix + "loadbalancer") // Loadbalancer thread
@@ -246,6 +249,7 @@ object TransactionId {
246249
val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
247250
val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
248251
def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
252+
val warmUp = TransactionId(systemPrefix + "warmUp")
249253

250254
private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
251255

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core
19+
20+
import org.apache.openwhisk.common.TransactionId
21+
import org.apache.openwhisk.core.connector.{ActivationMessage, ContainerCreationMessage}
22+
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
23+
import org.apache.openwhisk.core.entity._
24+
25+
object WarmUp {
26+
val warmUpActionIdentity = {
27+
val whiskSystem = "whisk.system"
28+
val uuid = UUID()
29+
Identity(Subject(whiskSystem), Namespace(EntityName(whiskSystem), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
30+
}
31+
32+
private val actionName = "warmUp"
33+
34+
// this action doesn't need to be in database
35+
val warmUpAction = FullyQualifiedEntityName(warmUpActionIdentity.namespace.name.toPath, EntityName(actionName))
36+
37+
def warmUpActivation(controller: ControllerInstanceId) = {
38+
ActivationMessage(
39+
transid = TransactionId.warmUp,
40+
action = warmUpAction,
41+
revision = DocRevision.empty,
42+
user = warmUpActionIdentity,
43+
activationId = new ActivationIdGenerator {}.make(),
44+
rootControllerIndex = controller,
45+
blocking = false,
46+
content = None,
47+
initArgs = Set.empty)
48+
}
49+
50+
def warmUpContainerCreationMessage(scheduler: SchedulerInstanceId) =
51+
ExecManifest.runtimesManifest
52+
.resolveDefaultRuntime("nodejs:default")
53+
.map { manifest =>
54+
val metadata = WhiskActionMetaData(
55+
warmUpAction.path,
56+
warmUpAction.name,
57+
CodeExecMetaDataAsString(manifest, false, entryPoint = None))
58+
ContainerCreationMessage(
59+
TransactionId.warmUp,
60+
warmUpActionIdentity.namespace.name.toString,
61+
warmUpAction,
62+
DocRevision.empty,
63+
metadata,
64+
scheduler,
65+
"",
66+
0)
67+
}
68+
69+
def isWarmUpAction(fqn: FullyQualifiedEntityName): Boolean = {
70+
fqn == warmUpAction
71+
}
72+
}

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,13 @@ trait Container {
135135
transid.failed(this, start, s"initializiation failed with $t")
136136
}
137137
.flatMap { result =>
138+
// if runtime container is shutting down, reschedule the activation message
139+
result.response.right.map { res =>
140+
if (res.shuttingDown) {
141+
throw ContainerHealthError(transid, containerId.asString)
142+
}
143+
}
144+
138145
if (result.ok) {
139146
Future.successful(result.interval)
140147
} else if (result.interval.duration >= timeout) {
@@ -180,6 +187,13 @@ trait Container {
180187
transid.failed(this, start, s"run failed with $t")
181188
}
182189
.map { result =>
190+
// if runtime container is shutting down, reschedule the activation message
191+
result.response.right.map { res =>
192+
if (res.shuttingDown) {
193+
throw ContainerHealthError(transid, containerId.asString)
194+
}
195+
}
196+
183197
val response = if (result.interval.duration >= timeout) {
184198
ActivationResponse.developerError(Messages.timedoutActivation(timeout, false))
185199
} else {

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,8 @@ trait ContainerFactory {
114114
userProvidedImage: Boolean,
115115
memory: ByteSize,
116116
cpuShares: Int,
117-
action: Option[ExecutableWhiskAction])(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
118-
createContainer(tid, name, actionImage, userProvidedImage, memory, cpuShares)
119-
}
120-
121-
def createContainer(tid: TransactionId,
122-
name: String,
123-
actionImage: ExecManifest.ImageName,
124-
userProvidedImage: Boolean,
125-
memory: ByteSize,
126-
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container]
117+
action: Option[ExecutableWhiskAction],
118+
resourceTags: Option[List[String]] = None)(implicit config: WhiskConfig, logging: Logging): Future[Container]
127119

128120
/** perform any initialization */
129121
def init(): Unit

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
package org.apache.openwhisk.core.entity
1919

2020
import scala.util.Try
21-
22-
import akka.http.scaladsl.model.StatusCodes.OK
23-
21+
import akka.http.scaladsl.model.StatusCodes.{OK, ServiceUnavailable}
2422
import spray.json._
2523
import spray.json.DefaultJsonProtocol
26-
27-
import org.apache.openwhisk.common.Logging
24+
import spray.json.DefaultJsonProtocol._
25+
import org.apache.openwhisk.common.{Logging, TransactionId}
26+
import org.apache.openwhisk.core.containerpool.Container
27+
import org.apache.openwhisk.core.entity.size._
2828
import org.apache.openwhisk.http.Messages._
2929

3030
protected[core] case class ActivationResponse private (statusCode: Int,
@@ -51,11 +51,48 @@ protected[core] case class ActivationResponse private (statusCode: Int,
5151
def withoutResult = ActivationResponse(statusCode, None)
5252

5353
override def toString = toJsonObject.compactPrint
54+
55+
def logs(logLimit: ByteSize): Option[ActivationLogs] = {
56+
Try {
57+
result.toJson.asJsObject.fields
58+
.get(ActivationResponse.LOGS_FIELD)
59+
.map { log =>
60+
var logSize = 0
61+
Try { log.convertTo[Vector[String]] }
62+
.getOrElse(Vector.empty)
63+
.filterNot(_.contains(Container.ACTIVATION_LOG_SENTINEL))
64+
.takeWhile { data =>
65+
logSize += data.size + 1
66+
logSize <= logLimit.toBytes
67+
}
68+
}
69+
.map(ActivationLogs(_))
70+
}.getOrElse(None)
71+
}
72+
73+
def withoutLogs() = {
74+
Try {
75+
val newRes = JsObject(result.toJson.asJsObject.fields - ActivationResponse.LOGS_FIELD).toJson
76+
copy(result = Some(newRes), size = size.map(_ => newRes.compactPrint.size))
77+
} getOrElse (this)
78+
}
79+
80+
// remove possible logs first and then truncate response
81+
def truncate(limit: ByteSize, truncateSize: ByteSize) = {
82+
val pureResponse = withoutLogs()
83+
val pureResult = pureResponse.result.toJson.compactPrint
84+
if (pureResult.size > limit.toBytes) {
85+
val error = JsString(truncatedResponse(pureResult.take(truncateSize.toBytes.toInt), pureResult.size.B, limit))
86+
ActivationResponse.applicationError(error, size.map(_ => pureResult.size))
87+
} else
88+
pureResponse
89+
}
5490
}
5591

5692
protected[core] object ActivationResponse extends DefaultJsonProtocol {
5793
/* The field name that is universally recognized as the marker of an error, from the application or otherwise. */
5894
val ERROR_FIELD: String = "error"
95+
val LOGS_FIELD: String = "__OW_LOGS"
5996

6097
// These constants need to be synchronized with messageForCode() method below
6198
val Success = 0 // action ran successfully and produced a result
@@ -139,6 +176,10 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
139176
/** true iff status code is OK (HTTP 200 status code), anything else is considered an error. **/
140177
val okStatus = statusCode == OK.intValue
141178
val ok = okStatus && truncated.isEmpty
179+
180+
/** true iff status code is ServiceUnavailable (HTTP 503 status code) */
181+
val shuttingDown = statusCode == ServiceUnavailable.intValue
182+
142183
override def toString = {
143184
val base = if (okStatus) "ok" else "not ok"
144185
val rest = truncated.map(e => s", truncated ${e.toString}").getOrElse("")
@@ -196,7 +237,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
196237
* @return appropriate ActivationResponse representing run result
197238
*/
198239
protected[core] def processRunResponseContent(response: Either[ContainerConnectionError, ContainerResponse],
199-
logger: Logging): ActivationResponse = {
240+
logger: Logging)(implicit id: TransactionId = TransactionId.unknown): ActivationResponse = {
200241
response match {
201242
case Right(res @ ContainerResponse(_, str, truncated)) =>
202243
truncated match {
@@ -240,6 +281,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
240281

241282
case Left(e) =>
242283
// This indicates a terminal failure in the container (it exited prematurely).
284+
logger.error(this, abnormalRun)
243285
developerError(abnormalRun)
244286
}
245287
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/Annotations.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ object Annotations {
2424
val RawHttpAnnotationName = "raw-http"
2525
val RequireWhiskAuthAnnotation = "require-whisk-auth"
2626
val ProvideApiKeyAnnotationName = "provide-api-key"
27+
val InvokerResourcesAnnotationName = "invoker-resources"
2728
}

common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosContainerFactory.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ import org.apache.openwhisk.common.TransactionId
4545
import org.apache.openwhisk.core.ConfigKeys
4646
import org.apache.openwhisk.core.WhiskConfig
4747
import org.apache.openwhisk.core.containerpool._
48-
import org.apache.openwhisk.core.entity.ByteSize
49-
import org.apache.openwhisk.core.entity.ExecManifest
50-
import org.apache.openwhisk.core.entity.InvokerInstanceId
51-
import org.apache.openwhisk.core.entity.UUID
48+
import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, ExecutableWhiskAction, InvokerInstanceId, UUID}
5249

5350
/**
5451
* Configuration for mesos timeouts
@@ -125,12 +122,15 @@ class MesosContainerFactory(config: WhiskConfig,
125122
}
126123
}
127124

128-
override def createContainer(tid: TransactionId,
129-
name: String,
130-
actionImage: ExecManifest.ImageName,
131-
userProvidedImage: Boolean,
132-
memory: ByteSize,
133-
cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
125+
override def createContainer(
126+
tid: TransactionId,
127+
name: String,
128+
actionImage: ExecManifest.ImageName,
129+
userProvidedImage: Boolean,
130+
memory: ByteSize,
131+
cpuShares: Int,
132+
action: Option[ExecutableWhiskAction] = None,
133+
resourceTags: Option[List[String]] = None)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
134134
implicit val transid = tid
135135
val image = actionImage.resolveImageName(Some(
136136
ContainerFactory.resolveRegistryConfig(userProvidedImage, runtimesRegistryConfig, userImagesRegistryConfig).url))

common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import akka.util.Timeout
2424
import org.apache.openwhisk.common.{Logging, TransactionId}
2525
import org.apache.openwhisk.core.containerpool._
2626
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
27-
import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, InvokerInstanceId}
27+
import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, ExecutableWhiskAction, InvokerInstanceId}
2828
import org.apache.openwhisk.core.yarn.YARNComponentActor.CreateContainerAsync
2929
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3030
import pureconfig._
@@ -125,7 +125,9 @@ class YARNContainerFactory(actorSystem: ActorSystem,
125125
actionImage: ExecManifest.ImageName,
126126
unuseduserProvidedImage: Boolean,
127127
unusedmemory: ByteSize,
128-
unusedcpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
128+
unusedcpuShares: Int,
129+
action: Option[ExecutableWhiskAction] = None,
130+
resourceTags: Option[List[String]] = None)(implicit config: WhiskConfig, logging: Logging): Future[Container] = {
129131
implicit val timeout: Timeout = Timeout(containerStartTimeoutMS.milliseconds)
130132

131133
//First send the create command to YARN, then with a different actor, wait for the container to be ready

core/invoker/src/main/resources/application.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ whisk {
155155
#aka 'How long should a container sit idle until we kill it?'
156156
idle-container = 10 minutes
157157
pause-grace = 50 milliseconds
158+
lease-grace = 5 seconds
159+
keeping-duration = 60 minutes
158160
}
159161
action-health-check {
160162
enabled = false # if true, prewarm containers will be pinged periodically and warm containers will be pinged once after resumed
@@ -178,4 +180,6 @@ whisk {
178180
protocol: http
179181
}
180182
runtime.delete.timeout = "30 seconds"
183+
184+
runtime.resource.role = "openwhisk-runtime-resource-role"
181185
}

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ class ContainerProxy(factory: (TransactionId,
251251
Boolean,
252252
ByteSize,
253253
Int,
254-
Option[ExecutableWhiskAction]) => Future[Container],
254+
Option[ExecutableWhiskAction],
255+
Option[List[String]]) => Future[Container],
255256
sendActiveAck: ActiveAck,
256257
storeActivation: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],
257258
collectLogs: LogsCollector,
@@ -290,6 +291,7 @@ class ContainerProxy(factory: (TransactionId,
290291
job.exec.pull,
291292
job.memoryLimit,
292293
poolConfig.cpuShare(job.memoryLimit),
294+
None,
293295
None)
294296
.map(container =>
295297
PreWarmCompleted(PreWarmedData(container, job.exec.kind, job.memoryLimit, expires = job.ttl.map(_.fromNow))))
@@ -309,7 +311,8 @@ class ContainerProxy(factory: (TransactionId,
309311
job.action.exec.pull,
310312
job.action.limits.memory.megabytes.MB,
311313
poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
312-
Some(job.action))
314+
Some(job.action),
315+
job.action.annotations.get(Annotations.InvokerResourcesAnnotationName).map(_.convertTo[List[String]]))
313316

314317
// container factory will either yield a new container ready to execute the action, or
315318
// starting up the container failed; for the latter, it's either an internal error starting
@@ -838,12 +841,15 @@ class ContainerProxy(factory: (TransactionId,
838841
val initRunInterval = initInterval
839842
.map(i => Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
840843
.getOrElse(runInterval)
844+
val truncatedResponse = response.truncate(
845+
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
846+
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT)
841847
ContainerProxy.constructWhiskActivation(
842848
job,
843849
initInterval,
844850
initRunInterval,
845851
runInterval.duration >= actionTimeout,
846-
response)
852+
truncatedResponse)
847853
}
848854
}
849855
.recoverWith {
@@ -963,7 +969,9 @@ class ContainerProxy(factory: (TransactionId,
963969
}
964970
}
965971

966-
final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, pauseGrace: FiniteDuration)
972+
final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration,
973+
pauseGrace: FiniteDuration,
974+
keepingDuration: FiniteDuration)
967975
final case class ContainerProxyHealthCheckConfig(enabled: Boolean, checkPeriod: FiniteDuration, maxFails: Int)
968976
final case class ContainerProxyActivationErrorLogConfig(applicationErrors: Boolean,
969977
developerErrors: Boolean,
@@ -976,7 +984,8 @@ object ContainerProxy {
976984
Boolean,
977985
ByteSize,
978986
Int,
979-
Option[ExecutableWhiskAction]) => Future[Container],
987+
Option[ExecutableWhiskAction],
988+
Option[List[String]]) => Future[Container],
980989
ack: ActiveAck,
981990
store: (TransactionId, WhiskActivation, Boolean, UserContext) => Future[Any],
982991
collectLogs: LogsCollector,

0 commit comments

Comments
 (0)