Skip to content

[New Scheduler] Manage memory queues in scheduler #5118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -465,5 +465,9 @@ etcd_connect_string: "{% set ret = [] %}\

scheduler:
protocol: "{{ scheduler_protocol | default('http') }}"
maxPeek: "{{ scheduler_max_peek | default(128) }}"
queueManager:
maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,17 @@ object LoggingMarkers {

// Time that is needed to produce message in kafka
val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
val SCHEDULER_WAIT_TIME =
LogMarkerToken(scheduler, "waitTime", counter)(MeasurementUnit.none)

def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
def SCHEDULER_QUEUE_UPDATE(reason: String) =
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
LogMarkerToken(scheduler, "queueActivation", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
/*
* General markers
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ object ConfigKeys {
val azBlob = "whisk.azure-blob"

val schedulerMaxPeek = "whisk.scheduler.max-peek"
val schedulerQueueManager = "whisk.scheduler.queue-manager"
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"

val whiskClusterName = "whisk.cluster.name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class ActivationMessage(override val transid: TransactionId,
lockedArgs: Map[String, String] = Map.empty,
cause: Option[ActivationId] = None,
traceContext: Option[Map[String, String]] = None)
extends Message {
extends Message {

override def serialize = ActivationMessage.serdes.write(this).compactPrint

Expand Down Expand Up @@ -116,11 +116,11 @@ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Me
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
* Right when this message is created.
*/
case class CombinedCompletionAndResultMessage private(override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation],
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
case class CombinedCompletionAndResultMessage private (override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation],
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
override def messageType = "combined"

override def result = Some(response)
Expand All @@ -142,11 +142,11 @@ case class CombinedCompletionAndResultMessage private(override val transid: Tran
* phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the
* `CompletionMessage`.
*/
case class CompletionMessage private(override val transid: TransactionId,
override val activationId: ActivationId,
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
case class CompletionMessage private (override val transid: TransactionId,
override val activationId: ActivationId,
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
override def messageType = "completion"

override def result = None
Expand All @@ -168,8 +168,8 @@ case class CompletionMessage private(override val transid: TransactionId,
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
* Right when this message is created.
*/
case class ResultMessage private(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
extends AcknowledegmentMessage(transid) {
case class ResultMessage private (override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
extends AcknowledegmentMessage(transid) {
override def messageType = "result"

override def result = Some(response)
Expand Down Expand Up @@ -253,7 +253,7 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
Left(value.convertTo[ActivationId])

case _: JsObject => Right(value.convertTo[WhiskActivation])
case _ => deserializationError("could not read ResultMessage")
case _ => deserializationError("could not read ResultMessage")
}
}

Expand Down Expand Up @@ -296,7 +296,7 @@ object EventMessageBody extends DefaultJsonProtocol {

implicit val format = new JsonFormat[EventMessageBody] {
def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
case m: Metric => m.toJson
case m: Metric => m.toJson
case a: Activation => a.toJson
}

Expand All @@ -321,7 +321,7 @@ case class Activation(name: String,
causedBy: Option[String],
size: Option[Int] = None,
userDefinedStatusCode: Option[Int] = None)
extends EventMessageBody {
extends EventMessageBody {
val typeName = Activation.typeName

override def serialize = toJson.compactPrint
Expand Down Expand Up @@ -349,12 +349,12 @@ object Activation extends DefaultJsonProtocol {
private implicit val durationFormat = new RootJsonFormat[Duration] {
override def write(obj: Duration): JsValue = obj match {
case o if o.isFinite => JsNumber(o.toMillis)
case _ => JsNumber.zero
case _ => JsNumber.zero
}

override def read(json: JsValue): Duration = json match {
case JsNumber(n) if n <= 0 => Duration.Zero
case JsNumber(n) => toDuration(n.longValue)
case JsNumber(n) => toDuration(n.longValue)
}
}

Expand Down Expand Up @@ -437,7 +437,7 @@ case class EventMessage(source: String,
userId: UUID,
eventType: String,
timestamp: Long = System.currentTimeMillis())
extends Message {
extends Message {
override def serialize = EventMessage.format.write(this).compactPrint
}

Expand All @@ -460,7 +460,7 @@ case class InvokerResourceMessage(status: String,
inProgressMemory: Long,
tags: Seq[String],
dedicatedNamespaces: Seq[String])
extends Message {
extends Message {

/**
* Serializes message to string. Must be idempotent.
Expand Down Expand Up @@ -502,7 +502,7 @@ object InvokerResourceMessage extends DefaultJsonProtocol {
object StatusQuery

case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
extends Message {
extends Message {

override def serialize: String = StatusData.serdes.write(this).compactPrint

Expand All @@ -524,7 +524,7 @@ case class ContainerCreationMessage(override val transid: TransactionId,
rpcPort: Int,
retryCount: Int = 0,
creationId: CreationId = CreationId.generate())
extends ContainerMessage(transid) {
extends ContainerMessage(transid) {

override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)

Expand Down Expand Up @@ -556,7 +556,7 @@ case class ContainerDeletionMessage(override val transid: TransactionId,
action: FullyQualifiedEntityName,
revision: DocRevision,
whiskActionMetaData: WhiskActionMetaData)
extends ContainerMessage(transid) {
extends ContainerMessage(transid) {
override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)

override def serialize: String = toJson.compactPrint
Expand Down Expand Up @@ -640,17 +640,17 @@ object ContainerCreationError extends Enumeration {
ZeroNamespaceLimit)

private def parse(name: String) = name.toUpperCase match {
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
case "DBFETCHERROR" => DBFetchError
case "WHISKERROR" => WhiskError
case "BLACKBOXERROR" => BlackBoxError
case "TIMEOUTERROR" => TimeoutError
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
case "UNKNOWNERROR" => UnknownError
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
case "DBFETCHERROR" => DBFetchError
case "WHISKERROR" => WhiskError
case "BLACKBOXERROR" => BlackBoxError
case "TIMEOUTERROR" => TimeoutError
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
case "UNKNOWNERROR" => UnknownError
}

implicit val serds = new RootJsonFormat[ContainerCreationError] {
Expand Down Expand Up @@ -678,7 +678,7 @@ case class ContainerCreationAckMessage(override val transid: TransactionId,
retryCount: Int = 0,
error: Option[ContainerCreationError] = None,
reason: Option[String] = None)
extends Message {
extends Message {

/**
* Serializes message to string. Must be idempotent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,15 @@ case class ByteSize(size: Long, unit: SizeUnits.Unit) extends Ordered[ByteSize]

override def equals(that: Any): Boolean = that match {
case t: ByteSize => compareTo(t) == 0
case _ => false
case _ => false
}

override def toString = {
unit match {
case SizeUnits.BYTE => s"$size B"
case SizeUnits.KB => s"$size KB"
case SizeUnits.MB => s"$size MB"
case SizeUnits.GB => s"$size GB"
case SizeUnits.KB => s"$size KB"
case SizeUnits.MB => s"$size MB"
case SizeUnits.GB => s"$size GB"
}
}
}
Expand Down Expand Up @@ -190,7 +190,7 @@ object size {

def read(value: JsValue): ByteSize = value match {
case JsString(s) => ByteSize.fromString(s)
case _ => deserializationError(formatError)
case _ => deserializationError(formatError)
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions core/scheduler/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ whisk {
}

scheduler {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ningyougang I have added related configuration

queue-manager {
max-scheduling-time = "20 seconds"
max-retries-to-get-queue = "13"
}
max-peek = "128"
in-progress-job-retention = "20 seconds"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.scheduler.queue

import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.entity.{DocInfo, FullyQualifiedEntityName}

import scala.concurrent.Promise

// Events sent by the actor
case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey: Option[String])
case class QueueReactivated(invocationNamespace: String, action: FullyQualifiedEntityName, docInfo: DocInfo)
case class CancelPoll(promise: Promise[Either[MemoryQueueError, ActivationMessage]])
case object QueueRemovedCompleted
case object FlushPulse

// Events received by the actor
case object Start
case object VersionUpdated
case object StopSchedulingAsOutdated
Loading