Skip to content

Commit 12e8fff

Browse files
Add CreationJobManager
1 parent 1a0bf9f commit 12e8fff

File tree

4 files changed

+720
-0
lines changed

4 files changed

+720
-0
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ object ConfigKeys {
294294
val azBlob = "whisk.azure-blob"
295295

296296
val schedulerMaxPeek = "whisk.scheduler.max-peek"
297+
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
297298

298299
val whiskClusterName = "whisk.cluster.name"
299300

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
whisk {
19+
# tracing configuration
20+
tracing {
21+
component = "Scheduler"
22+
}
23+
24+
scheduler {
25+
in-progress-job-retention = "20"
26+
}
27+
}
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.scheduler.container
19+
20+
import java.nio.charset.StandardCharsets
21+
import java.util.concurrent.TimeUnit
22+
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Props}
23+
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
24+
import org.apache.openwhisk.core.connector._
25+
import org.apache.openwhisk.core.entity._
26+
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer
27+
import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool}
28+
import org.apache.openwhisk.core.service.{RegisterData, UnregisterData}
29+
import org.apache.openwhisk.core.ConfigKeys
30+
import pureconfig.loadConfigOrThrow
31+
32+
import scala.collection.concurrent.TrieMap
33+
import scala.concurrent.duration._
34+
import scala.concurrent.{ExecutionContext, Future}
35+
import scala.language.postfixOps
36+
37+
sealed trait CreationJob
38+
case class RegisterCreationJob(msg: ContainerCreationMessage) extends CreationJob
39+
case class FinishCreationJob(ack: ContainerCreationAckMessage) extends CreationJob
40+
case class ReschedulingCreationJob(tid: TransactionId,
41+
creationId: CreationId,
42+
invocationNamespace: String,
43+
action: FullyQualifiedEntityName,
44+
revision: DocRevision,
45+
actionMetaData: WhiskActionMetaData,
46+
schedulerHost: String,
47+
rpcPort: Int,
48+
retry: Int)
49+
extends CreationJob {
50+
51+
def toCreationMessage(sid: SchedulerInstanceId, retryCount: Int): ContainerCreationMessage =
52+
ContainerCreationMessage(
53+
tid,
54+
invocationNamespace,
55+
action,
56+
revision,
57+
actionMetaData,
58+
sid,
59+
schedulerHost,
60+
rpcPort,
61+
retryCount,
62+
creationId)
63+
}
64+
65+
abstract class CreationJobState(val creationId: CreationId,
66+
val invocationNamespace: String,
67+
val action: FullyQualifiedEntityName,
68+
val revision: DocRevision)
69+
case class FailedCreationJob(override val creationId: CreationId,
70+
override val invocationNamespace: String,
71+
override val action: FullyQualifiedEntityName,
72+
override val revision: DocRevision,
73+
error: ContainerCreationError,
74+
message: String)
75+
extends CreationJobState(creationId, invocationNamespace, action, revision)
76+
case class SuccessfulCreationJob(override val creationId: CreationId,
77+
override val invocationNamespace: String,
78+
override val action: FullyQualifiedEntityName,
79+
override val revision: DocRevision)
80+
extends CreationJobState(creationId, invocationNamespace, action, revision)
81+
82+
case object GetPoolStatus
83+
84+
case class JobEntry(action: FullyQualifiedEntityName, timer: Cancellable)
85+
86+
class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
87+
schedulerInstanceId: SchedulerInstanceId,
88+
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
89+
extends Actor {
90+
private implicit val ec: ExecutionContext = actorSystem.dispatcher
91+
private val baseTimeout = loadConfigOrThrow[Int](ConfigKeys.schedulerInProgressJobRetentionSecond).seconds
92+
private val retryLimit = 5
93+
private val retryDelayTime = 100.milliseconds
94+
95+
/**
96+
* Store a JobEntry in local to get an alarm for key timeout
97+
* It does not matter whether the information stored in Local is redundant or null.
98+
* When a new JobEntry is created, it is overwritten if it is duplicated.
99+
* If there is no corresponding JobEntry at the time of deletion, nothing is done.
100+
*/
101+
protected val creationJobPool = TrieMap[CreationId, JobEntry]()
102+
103+
override def receive: Receive = {
104+
case RegisterCreationJob(
105+
ContainerCreationMessage(_, invocationNamespace, action, revision, actionMetaData, _, _, _, _, creationId)) =>
106+
val isBlackboxInvocation = actionMetaData.toExecutableWhiskAction.exists(a => a.exec.pull);
107+
registerJob(invocationNamespace, action, revision, creationId, isBlackboxInvocation)
108+
109+
case FinishCreationJob(
110+
ContainerCreationAckMessage(
111+
tid,
112+
creationId,
113+
invocationNamespace,
114+
action,
115+
revision,
116+
actionMetaData,
117+
_,
118+
schedulerHost,
119+
rpcPort,
120+
retryCount,
121+
error,
122+
reason)) =>
123+
if (error.isEmpty) {
124+
logging.info(this, s"[$creationId] create container successfully")
125+
deleteJob(
126+
invocationNamespace,
127+
action,
128+
revision,
129+
creationId,
130+
SuccessfulCreationJob(creationId, invocationNamespace, action, revision))
131+
132+
} else {
133+
val cause = reason.getOrElse("unknown reason")
134+
// if exceed the retry limit or meet errors which we don't need to reschedule, make it a failure
135+
if (retryCount >= retryLimit || !error.exists(ContainerCreationError.whiskErrors.contains)) {
136+
logging.error(
137+
this,
138+
s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation")
139+
// Delete from pool after all retries are failed
140+
deleteJob(
141+
invocationNamespace,
142+
action,
143+
revision,
144+
creationId,
145+
FailedCreationJob(creationId, invocationNamespace, action, revision, error.get, cause))
146+
} else {
147+
// Reschedule
148+
logging.error(
149+
this,
150+
s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Started rescheduling")
151+
// Add some time interval during retry create container, because etcd put operation needs some time if data inconsistant happens
152+
actorSystem.scheduler.scheduleOnce(retryDelayTime) {
153+
context.parent ! ReschedulingCreationJob(
154+
tid,
155+
creationId,
156+
invocationNamespace,
157+
action,
158+
revision,
159+
actionMetaData,
160+
schedulerHost,
161+
rpcPort,
162+
retryCount)
163+
}
164+
}
165+
}
166+
167+
case GracefulShutdown =>
168+
ackFeed ! GracefulShutdown
169+
}
170+
171+
private def registerJob(invocationNamespace: String,
172+
action: FullyQualifiedEntityName,
173+
revision: DocRevision,
174+
creationId: CreationId,
175+
isBlackboxInvocation: Boolean) = {
176+
creationJobPool getOrElseUpdate (creationId, {
177+
val key = inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId)
178+
dataManagementService ! RegisterData(key, "", failoverEnabled = false)
179+
JobEntry(action, createTimer(invocationNamespace, action, revision, creationId, isBlackboxInvocation))
180+
})
181+
}
182+
183+
private def deleteJob(invocationNamespace: String,
184+
action: FullyQualifiedEntityName,
185+
revision: DocRevision,
186+
creationId: CreationId,
187+
state: CreationJobState) = {
188+
val key = inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId)
189+
190+
// If there is a JobEntry, delete it.
191+
creationJobPool
192+
.remove(creationId)
193+
.foreach(entry => {
194+
sendState(state)
195+
entry.timer.cancel()
196+
})
197+
198+
dataManagementService ! UnregisterData(key)
199+
Future.successful({})
200+
}
201+
202+
private def sendState(state: CreationJobState): Unit = {
203+
context.parent ! state // send state to ContainerManager
204+
QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match {
205+
case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
206+
memoryQueueValue.queue ! state
207+
case _ =>
208+
logging.error(this, s"get a $state for a nonexistent memory queue or a follower")
209+
}
210+
}
211+
212+
protected def createTimer(invocationNamespace: String,
213+
action: FullyQualifiedEntityName,
214+
revision: DocRevision,
215+
creationId: CreationId,
216+
isBlackbox: Boolean): Cancellable = {
217+
val timeout = if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * 3, TimeUnit.SECONDS) else baseTimeout
218+
actorSystem.scheduler.scheduleOnce(timeout) {
219+
logging.warn(
220+
this,
221+
s"Failed to create a container for $action(blackbox: $isBlackbox), error: $creationId timed out after $timeout")
222+
creationJobPool
223+
.remove(creationId)
224+
.foreach(
225+
_ =>
226+
sendState(
227+
FailedCreationJob(
228+
creationId,
229+
invocationNamespace,
230+
action,
231+
revision,
232+
ContainerCreationError.TimeoutError,
233+
s"timeout waiting for the ack of $creationId after $timeout")))
234+
dataManagementService ! UnregisterData(
235+
inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId))
236+
}
237+
}
238+
239+
private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
240+
private val topic = s"${topicPrefix}creationAck${schedulerInstanceId.asString}"
241+
private val maxActiveAcksPerPoll = 128
242+
private val ackFeed = feedFactory(actorSystem, topic, maxActiveAcksPerPoll, processAcknowledgement)
243+
244+
def feedProducer(actorRefFactory: ActorRefFactory,
245+
desc: String,
246+
logging: Logging,
247+
consumer: MessageConsumer,
248+
maxActiveAcksPerPoll: Int,
249+
longPollDuration: Duration,
250+
handler: (Array[Byte]) => Future[Unit]): ActorRef = {
251+
actorRefFactory.actorOf(Props {
252+
new MessageFeed(desc, logging, consumer, maxActiveAcksPerPoll, 1 second, handler)
253+
})
254+
}
255+
256+
def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = {
257+
Future(ContainerCreationAckMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
258+
.flatMap(Future.fromTry)
259+
.flatMap { msg =>
260+
// forward msg to job manager
261+
self ! FinishCreationJob(msg)
262+
ackFeed ! MessageFeed.Processed
263+
Future.successful(())
264+
}
265+
.recoverWith {
266+
case t =>
267+
// Iff everything above failed, we have a terminal error at hand. Either the message failed
268+
// to deserialize, or something threw an error where it is not expected to throw.
269+
ackFeed ! MessageFeed.Processed
270+
logging.error(this, s"terminal failure while processing container creation ack message: $t")
271+
Future.successful(())
272+
}
273+
}
274+
}
275+
276+
object CreationJobManager {
277+
def props(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
278+
schedulerInstanceId: SchedulerInstanceId,
279+
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) =
280+
Props(new CreationJobManager(feedFactory, schedulerInstanceId, dataManagementService))
281+
}

0 commit comments

Comments
 (0)