Skip to content

Commit 0cdfdb3

Browse files
authored
[New Scheduler] Manage memory queues in scheduler (#5118)
* Add QueueManager * Add test configuration * Remove deprecated functions * Fix memory queue test * Change string to FiniteDuration
1 parent dc7c666 commit 0cdfdb3

File tree

10 files changed

+1607
-42
lines changed

10 files changed

+1607
-42
lines changed

ansible/group_vars/all

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,5 +465,9 @@ etcd_connect_string: "{% set ret = [] %}\
465465

466466
scheduler:
467467
protocol: "{{ scheduler_protocol | default('http') }}"
468+
maxPeek: "{{ scheduler_max_peek | default(128) }}"
469+
queueManager:
470+
maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
471+
maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"
468472
dataManagementService:
469473
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,9 +562,17 @@ object LoggingMarkers {
562562

563563
// Time that is needed to produce message in kafka
564564
val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
565+
val SCHEDULER_WAIT_TIME =
566+
LogMarkerToken(scheduler, "waitTime", counter)(MeasurementUnit.none)
565567

566568
def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
567569
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
570+
def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
571+
def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
572+
def SCHEDULER_QUEUE_UPDATE(reason: String) =
573+
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
574+
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
575+
LogMarkerToken(scheduler, "queueActivation", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
568576
/*
569577
* General markers
570578
*/

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ object ConfigKeys {
296296
val azBlob = "whisk.azure-blob"
297297

298298
val schedulerMaxPeek = "whisk.scheduler.max-peek"
299+
val schedulerQueueManager = "whisk.scheduler.queue-manager"
299300
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"
300301

301302
val whiskClusterName = "whisk.cluster.name"

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ case class ActivationMessage(override val transid: TransactionId,
6060
lockedArgs: Map[String, String] = Map.empty,
6161
cause: Option[ActivationId] = None,
6262
traceContext: Option[Map[String, String]] = None)
63-
extends Message {
63+
extends Message {
6464

6565
override def serialize = ActivationMessage.serdes.write(this).compactPrint
6666

@@ -116,11 +116,11 @@ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Me
116116
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
117117
* Right when this message is created.
118118
*/
119-
case class CombinedCompletionAndResultMessage private(override val transid: TransactionId,
120-
response: Either[ActivationId, WhiskActivation],
121-
override val isSystemError: Option[Boolean],
122-
instance: InstanceId)
123-
extends AcknowledegmentMessage(transid) {
119+
case class CombinedCompletionAndResultMessage private (override val transid: TransactionId,
120+
response: Either[ActivationId, WhiskActivation],
121+
override val isSystemError: Option[Boolean],
122+
instance: InstanceId)
123+
extends AcknowledegmentMessage(transid) {
124124
override def messageType = "combined"
125125

126126
override def result = Some(response)
@@ -142,11 +142,11 @@ case class CombinedCompletionAndResultMessage private(override val transid: Tran
142142
* phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the
143143
* `CompletionMessage`.
144144
*/
145-
case class CompletionMessage private(override val transid: TransactionId,
146-
override val activationId: ActivationId,
147-
override val isSystemError: Option[Boolean],
148-
instance: InstanceId)
149-
extends AcknowledegmentMessage(transid) {
145+
case class CompletionMessage private (override val transid: TransactionId,
146+
override val activationId: ActivationId,
147+
override val isSystemError: Option[Boolean],
148+
instance: InstanceId)
149+
extends AcknowledegmentMessage(transid) {
150150
override def messageType = "completion"
151151

152152
override def result = None
@@ -168,8 +168,8 @@ case class CompletionMessage private(override val transid: TransactionId,
168168
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
169169
* Right when this message is created.
170170
*/
171-
case class ResultMessage private(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
172-
extends AcknowledegmentMessage(transid) {
171+
case class ResultMessage private (override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
172+
extends AcknowledegmentMessage(transid) {
173173
override def messageType = "result"
174174

175175
override def result = Some(response)
@@ -253,7 +253,7 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
253253
Left(value.convertTo[ActivationId])
254254

255255
case _: JsObject => Right(value.convertTo[WhiskActivation])
256-
case _ => deserializationError("could not read ResultMessage")
256+
case _ => deserializationError("could not read ResultMessage")
257257
}
258258
}
259259

@@ -296,7 +296,7 @@ object EventMessageBody extends DefaultJsonProtocol {
296296

297297
implicit val format = new JsonFormat[EventMessageBody] {
298298
def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
299-
case m: Metric => m.toJson
299+
case m: Metric => m.toJson
300300
case a: Activation => a.toJson
301301
}
302302

@@ -321,7 +321,7 @@ case class Activation(name: String,
321321
causedBy: Option[String],
322322
size: Option[Int] = None,
323323
userDefinedStatusCode: Option[Int] = None)
324-
extends EventMessageBody {
324+
extends EventMessageBody {
325325
val typeName = Activation.typeName
326326

327327
override def serialize = toJson.compactPrint
@@ -349,12 +349,12 @@ object Activation extends DefaultJsonProtocol {
349349
private implicit val durationFormat = new RootJsonFormat[Duration] {
350350
override def write(obj: Duration): JsValue = obj match {
351351
case o if o.isFinite => JsNumber(o.toMillis)
352-
case _ => JsNumber.zero
352+
case _ => JsNumber.zero
353353
}
354354

355355
override def read(json: JsValue): Duration = json match {
356356
case JsNumber(n) if n <= 0 => Duration.Zero
357-
case JsNumber(n) => toDuration(n.longValue)
357+
case JsNumber(n) => toDuration(n.longValue)
358358
}
359359
}
360360

@@ -437,7 +437,7 @@ case class EventMessage(source: String,
437437
userId: UUID,
438438
eventType: String,
439439
timestamp: Long = System.currentTimeMillis())
440-
extends Message {
440+
extends Message {
441441
override def serialize = EventMessage.format.write(this).compactPrint
442442
}
443443

@@ -460,7 +460,7 @@ case class InvokerResourceMessage(status: String,
460460
inProgressMemory: Long,
461461
tags: Seq[String],
462462
dedicatedNamespaces: Seq[String])
463-
extends Message {
463+
extends Message {
464464

465465
/**
466466
* Serializes message to string. Must be idempotent.
@@ -502,7 +502,7 @@ object InvokerResourceMessage extends DefaultJsonProtocol {
502502
object StatusQuery
503503

504504
case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
505-
extends Message {
505+
extends Message {
506506

507507
override def serialize: String = StatusData.serdes.write(this).compactPrint
508508

@@ -524,7 +524,7 @@ case class ContainerCreationMessage(override val transid: TransactionId,
524524
rpcPort: Int,
525525
retryCount: Int = 0,
526526
creationId: CreationId = CreationId.generate())
527-
extends ContainerMessage(transid) {
527+
extends ContainerMessage(transid) {
528528

529529
override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)
530530

@@ -556,7 +556,7 @@ case class ContainerDeletionMessage(override val transid: TransactionId,
556556
action: FullyQualifiedEntityName,
557557
revision: DocRevision,
558558
whiskActionMetaData: WhiskActionMetaData)
559-
extends ContainerMessage(transid) {
559+
extends ContainerMessage(transid) {
560560
override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)
561561

562562
override def serialize: String = toJson.compactPrint
@@ -640,17 +640,17 @@ object ContainerCreationError extends Enumeration {
640640
ZeroNamespaceLimit)
641641

642642
private def parse(name: String) = name.toUpperCase match {
643-
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
643+
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
644644
case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
645-
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
646-
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
647-
case "DBFETCHERROR" => DBFetchError
648-
case "WHISKERROR" => WhiskError
649-
case "BLACKBOXERROR" => BlackBoxError
650-
case "TIMEOUTERROR" => TimeoutError
651-
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
652-
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
653-
case "UNKNOWNERROR" => UnknownError
645+
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
646+
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
647+
case "DBFETCHERROR" => DBFetchError
648+
case "WHISKERROR" => WhiskError
649+
case "BLACKBOXERROR" => BlackBoxError
650+
case "TIMEOUTERROR" => TimeoutError
651+
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
652+
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
653+
case "UNKNOWNERROR" => UnknownError
654654
}
655655

656656
implicit val serds = new RootJsonFormat[ContainerCreationError] {
@@ -678,7 +678,7 @@ case class ContainerCreationAckMessage(override val transid: TransactionId,
678678
retryCount: Int = 0,
679679
error: Option[ContainerCreationError] = None,
680680
reason: Option[String] = None)
681-
extends Message {
681+
extends Message {
682682

683683
/**
684684
* Serializes message to string. Must be idempotent.

common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,15 @@ case class ByteSize(size: Long, unit: SizeUnits.Unit) extends Ordered[ByteSize]
123123

124124
override def equals(that: Any): Boolean = that match {
125125
case t: ByteSize => compareTo(t) == 0
126-
case _ => false
126+
case _ => false
127127
}
128128

129129
override def toString = {
130130
unit match {
131131
case SizeUnits.BYTE => s"$size B"
132-
case SizeUnits.KB => s"$size KB"
133-
case SizeUnits.MB => s"$size MB"
134-
case SizeUnits.GB => s"$size GB"
132+
case SizeUnits.KB => s"$size KB"
133+
case SizeUnits.MB => s"$size MB"
134+
case SizeUnits.GB => s"$size GB"
135135
}
136136
}
137137
}
@@ -190,7 +190,7 @@ object size {
190190

191191
def read(value: JsValue): ByteSize = value match {
192192
case JsString(s) => ByteSize.fromString(s)
193-
case _ => deserializationError(formatError)
193+
case _ => deserializationError(formatError)
194194
}
195195
}
196196
}

core/scheduler/src/main/resources/application.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ whisk {
2727
}
2828

2929
scheduler {
30+
queue-manager {
31+
max-scheduling-time = "20 seconds"
32+
max-retries-to-get-queue = "13"
33+
}
34+
max-peek = "128"
3035
in-progress-job-retention = "20 seconds"
3136
}
3237
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.scheduler.queue
19+
20+
import org.apache.openwhisk.core.connector.ActivationMessage
21+
import org.apache.openwhisk.core.entity.{DocInfo, FullyQualifiedEntityName}
22+
23+
import scala.concurrent.Promise
24+
25+
// Events sent by the actor
26+
case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey: Option[String])
27+
case class QueueReactivated(invocationNamespace: String, action: FullyQualifiedEntityName, docInfo: DocInfo)
28+
case class CancelPoll(promise: Promise[Either[MemoryQueueError, ActivationMessage]])
29+
case object QueueRemovedCompleted
30+
case object FlushPulse
31+
32+
// Events received by the actor
33+
case object Start
34+
case object VersionUpdated
35+
case object StopSchedulingAsOutdated

0 commit comments

Comments
 (0)