Skip to content

Commit 71585f1

Browse files
[New Scheduler]Add CreationJobManager (#5116)
* Add CreationJobManager * Remove unused import * Use finite duration string for a config * Fix tests * Resolve rebase conflicts * Fix tests
1 parent 7791e4b commit 71585f1

File tree

6 files changed

+652
-29
lines changed

6 files changed

+652
-29
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ object ConfigKeys {
295295
val azBlob = "whisk.azure-blob"
296296

297297
val schedulerMaxPeek = "whisk.scheduler.max-peek"
298+
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
298299

299300
val whiskClusterName = "whisk.cluster.name"
300301

core/scheduler/src/main/resources/application.conf

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# limitations under the License.
1616
#
1717

18-
whisk{
18+
whisk {
1919
# tracing configuration
2020
tracing {
2121
component = "Scheduler"
@@ -25,4 +25,8 @@ whisk{
2525
managed-fraction: 90%
2626
blackbox-fraction: 10%
2727
}
28+
29+
scheduler {
30+
in-progress-job-retention = "20 seconds"
31+
}
2832
}

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.openwhisk.core.scheduler.container
1919
import java.nio.charset.StandardCharsets
2020
import java.util.concurrent.ThreadLocalRandom
21-
2221
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
2322
import akka.event.Logging.InfoLevel
2423
import org.apache.kafka.clients.producer.RecordMetadata
@@ -33,7 +32,6 @@ import org.apache.openwhisk.core.entity.size._
3332
import org.apache.openwhisk.core.entity.{
3433
Annotations,
3534
ByteSize,
36-
DocInfo,
3735
DocRevision,
3836
FullyQualifiedEntityName,
3937
InvokerInstanceId,
@@ -55,6 +53,7 @@ import org.apache.openwhisk.core.scheduler.message.{
5553
ReschedulingCreationJob,
5654
SuccessfulCreationJob
5755
}
56+
import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool}
5857
import org.apache.openwhisk.core.service.{
5958
DeleteEvent,
6059
PutEvent,
@@ -570,28 +569,3 @@ object ContainerManager {
570569
}
571570

572571
case class NoCapacityException(msg: String) extends Exception(msg)
573-
574-
/**
575-
* TODO This needs to be moved to the QueueManager component that will be added later.
576-
*/
577-
object QueuePool {
578-
private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]()
579-
580-
private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key)
581-
582-
private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value)
583-
584-
private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key)
585-
586-
private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader)
587-
588-
private[scheduler] def clear() = _queuePool.clear()
589-
590-
private[scheduler] def size = _queuePool.size
591-
592-
private[scheduler] def values = _queuePool.values
593-
594-
private[scheduler] def keys = _queuePool.keys
595-
}
596-
case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo)
597-
case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean)
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.scheduler.container
19+
20+
import java.nio.charset.StandardCharsets
21+
import java.util.concurrent.TimeUnit
22+
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Props}
23+
import org.apache.openwhisk.common.{GracefulShutdown, Logging}
24+
import org.apache.openwhisk.core.connector._
25+
import org.apache.openwhisk.core.entity._
26+
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer
27+
import org.apache.openwhisk.core.service.{RegisterData, UnregisterData}
28+
import org.apache.openwhisk.core.ConfigKeys
29+
import org.apache.openwhisk.core.scheduler.message.{
30+
CreationJobState,
31+
FailedCreationJob,
32+
FinishCreationJob,
33+
RegisterCreationJob,
34+
ReschedulingCreationJob,
35+
SuccessfulCreationJob
36+
}
37+
import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, QueuePool}
38+
import pureconfig.loadConfigOrThrow
39+
40+
import scala.collection.concurrent.TrieMap
41+
import scala.concurrent.duration._
42+
import scala.concurrent.{ExecutionContext, Future}
43+
44+
case object GetPoolStatus
45+
46+
case class JobEntry(action: FullyQualifiedEntityName, timer: Cancellable)
47+
48+
class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
49+
schedulerInstanceId: SchedulerInstanceId,
50+
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
51+
extends Actor {
52+
private implicit val ec: ExecutionContext = actorSystem.dispatcher
53+
private val baseTimeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetentionSecond)
54+
private val retryLimit = 5
55+
private val retryDelayTime = 100.milliseconds
56+
57+
/**
58+
* Store a JobEntry in local to get an alarm for key timeout
59+
* It does not matter whether the information stored in Local is redundant or null.
60+
* When a new JobEntry is created, it is overwritten if it is duplicated.
61+
* If there is no corresponding JobEntry at the time of deletion, nothing is done.
62+
*/
63+
protected val creationJobPool = TrieMap[CreationId, JobEntry]()
64+
65+
override def receive: Receive = {
66+
case RegisterCreationJob(
67+
ContainerCreationMessage(_, invocationNamespace, action, revision, actionMetaData, _, _, _, _, creationId)) =>
68+
val isBlackboxInvocation = actionMetaData.toExecutableWhiskAction.exists(a => a.exec.pull)
69+
registerJob(invocationNamespace, action, revision, creationId, isBlackboxInvocation)
70+
71+
case FinishCreationJob(
72+
ContainerCreationAckMessage(
73+
tid,
74+
creationId,
75+
invocationNamespace,
76+
action,
77+
revision,
78+
actionMetaData,
79+
_,
80+
schedulerHost,
81+
rpcPort,
82+
retryCount,
83+
error,
84+
reason)) =>
85+
if (error.isEmpty) {
86+
logging.info(this, s"[$creationId] create container successfully")
87+
deleteJob(
88+
invocationNamespace,
89+
action,
90+
revision,
91+
creationId,
92+
SuccessfulCreationJob(creationId, invocationNamespace, action, revision))
93+
94+
} else {
95+
val cause = reason.getOrElse("unknown reason")
96+
// if exceed the retry limit or meet errors which we don't need to reschedule, make it a failure
97+
if (retryCount >= retryLimit || !error.exists(ContainerCreationError.whiskErrors.contains)) {
98+
logging.error(
99+
this,
100+
s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Finished creation")
101+
// Delete from pool after all retries are failed
102+
deleteJob(
103+
invocationNamespace,
104+
action,
105+
revision,
106+
creationId,
107+
FailedCreationJob(creationId, invocationNamespace, action, revision, error.get, cause))
108+
} else {
109+
// Reschedule
110+
logging.error(
111+
this,
112+
s"[$creationId] Failed to create container $retryCount/$retryLimit times for $cause. Started rescheduling")
113+
// Add some time interval during retry create container, because etcd put operation needs some time if data inconsistant happens
114+
actorSystem.scheduler.scheduleOnce(retryDelayTime) {
115+
context.parent ! ReschedulingCreationJob(
116+
tid,
117+
creationId,
118+
invocationNamespace,
119+
action,
120+
revision,
121+
actionMetaData,
122+
schedulerHost,
123+
rpcPort,
124+
retryCount)
125+
}
126+
}
127+
}
128+
129+
case GracefulShutdown =>
130+
ackFeed ! GracefulShutdown
131+
}
132+
133+
private def registerJob(invocationNamespace: String,
134+
action: FullyQualifiedEntityName,
135+
revision: DocRevision,
136+
creationId: CreationId,
137+
isBlackboxInvocation: Boolean) = {
138+
creationJobPool getOrElseUpdate (creationId, {
139+
val key = inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId)
140+
dataManagementService ! RegisterData(key, "", failoverEnabled = false)
141+
JobEntry(action, createTimer(invocationNamespace, action, revision, creationId, isBlackboxInvocation))
142+
})
143+
}
144+
145+
private def deleteJob(invocationNamespace: String,
146+
action: FullyQualifiedEntityName,
147+
revision: DocRevision,
148+
creationId: CreationId,
149+
state: CreationJobState) = {
150+
val key = inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId)
151+
152+
// If there is a JobEntry, delete it.
153+
creationJobPool
154+
.remove(creationId)
155+
.foreach(entry => {
156+
sendState(state)
157+
entry.timer.cancel()
158+
})
159+
160+
dataManagementService ! UnregisterData(key)
161+
Future.successful({})
162+
}
163+
164+
private def sendState(state: CreationJobState): Unit = {
165+
context.parent ! state // send state to ContainerManager
166+
QueuePool.get(MemoryQueueKey(state.invocationNamespace, state.action.toDocId.asDocInfo(state.revision))) match {
167+
case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
168+
memoryQueueValue.queue ! state
169+
case _ =>
170+
logging.error(this, s"get a $state for a nonexistent memory queue or a follower")
171+
}
172+
}
173+
174+
protected def createTimer(invocationNamespace: String,
175+
action: FullyQualifiedEntityName,
176+
revision: DocRevision,
177+
creationId: CreationId,
178+
isBlackbox: Boolean): Cancellable = {
179+
val timeout = if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * 3, TimeUnit.SECONDS) else baseTimeout
180+
actorSystem.scheduler.scheduleOnce(timeout) {
181+
logging.warn(
182+
this,
183+
s"Failed to create a container for $action(blackbox: $isBlackbox), error: $creationId timed out after $timeout")
184+
creationJobPool
185+
.remove(creationId)
186+
.foreach(
187+
_ =>
188+
sendState(
189+
FailedCreationJob(
190+
creationId,
191+
invocationNamespace,
192+
action,
193+
revision,
194+
ContainerCreationError.TimeoutError,
195+
s"timeout waiting for the ack of $creationId after $timeout")))
196+
dataManagementService ! UnregisterData(
197+
inProgressContainer(invocationNamespace, action, revision, schedulerInstanceId, creationId))
198+
}
199+
}
200+
201+
private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
202+
private val topic = s"${topicPrefix}creationAck${schedulerInstanceId.asString}"
203+
private val maxActiveAcksPerPoll = 128
204+
private val ackFeed = feedFactory(actorSystem, topic, maxActiveAcksPerPoll, processAcknowledgement)
205+
206+
def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = {
207+
Future(ContainerCreationAckMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
208+
.flatMap(Future.fromTry)
209+
.flatMap { msg =>
210+
// forward msg to job manager
211+
self ! FinishCreationJob(msg)
212+
ackFeed ! MessageFeed.Processed
213+
Future.successful(())
214+
}
215+
.recoverWith {
216+
case t =>
217+
// Iff everything above failed, we have a terminal error at hand. Either the message failed
218+
// to deserialize, or something threw an error where it is not expected to throw.
219+
ackFeed ! MessageFeed.Processed
220+
logging.error(this, s"terminal failure while processing container creation ack message: $t")
221+
Future.successful(())
222+
}
223+
}
224+
}
225+
226+
object CreationJobManager {
227+
def props(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
228+
schedulerInstanceId: SchedulerInstanceId,
229+
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) =
230+
Props(new CreationJobManager(feedFactory, schedulerInstanceId, dataManagementService))
231+
}

tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import org.apache.openwhisk.core.scheduler.message.{
5050
ReschedulingCreationJob,
5151
SuccessfulCreationJob
5252
}
53+
import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool}
5354
import org.apache.openwhisk.core.service.WatchEndpointInserted
5455
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
5556
import org.junit.runner.RunWith
@@ -59,8 +60,8 @@ import org.scalatest.junit.JUnitRunner
5960
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
6061
import pureconfig.loadConfigOrThrow
6162
import spray.json.{JsArray, JsBoolean, JsString}
62-
6363
import pureconfig.generic.auto._
64+
6465
import scala.collection.mutable
6566
import scala.concurrent.Future
6667
import scala.concurrent.duration.{FiniteDuration, _}

0 commit comments

Comments
 (0)