Skip to content

[Scheduler Enhancement] Increase the retention timeout for the blackbox action. #5266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ scheduler:
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
inProgressJobRetention: "{{ scheduler_inProgressJobRetention | default('20 seconds') }}"
blackboxMultiple: "{{ scheduler_blackboxMultiple | default(15) }}"
managedFraction: "{{ scheduler_managed_fraction | default(1.0 - (scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
blackboxFraction: "{{ scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction) }}"
scheduling:
Expand Down
2 changes: 2 additions & 0 deletions ansible/roles/schedulers/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
"CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
"CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ scheduler.dataManagementService.retryInterval }}"
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{ scheduler.inProgressJobRetention }}"
"CONFIG_whisk_scheduler_blackboxMultiple": "{{ scheduler.blackboxMultiple }}"
"CONFIG_whisk_scheduler_scheduling_staleThreshold": "{{ scheduler.scheduling.staleThreshold }}"
"CONFIG_whisk_scheduler_scheduling_checkInterval": "{{ scheduler.scheduling.checkInterval }}"
"CONFIG_whisk_scheduler_scheduling_dropInterval": "{{ scheduler.scheduling.dropInterval }}"
Expand All @@ -124,6 +125,7 @@
"CONFIG_whisk_scheduler_queue_gracefulShutdownTimeout": "{{ scheduler.queue.gracefulShutdownTimeout }}"
"CONFIG_whisk_scheduler_queue_maxRetentionSize": "{{ scheduler.queue.maxRetentionSize }}"
"CONFIG_whisk_scheduler_queue_maxRetentionMs": "{{ scheduler.queue.maxRetentionMs }}"
"CONFIG_whisk_scheduler_queue_maxBlackboxRetentionMs": "{{ scheduler.queue.maxBlackboxRetentionMs }}"
"CONFIG_whisk_scheduler_queue_throttlingFraction": "{{ scheduler.queue.throttlingFraction }}"
"CONFIG_whisk_scheduler_queue_durationBufferSize": "{{ scheduler.queue.durationBufferSize }}"
"CONFIG_whisk_durationChecker_timeWindow": "{{ durationChecker.timeWindow }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,8 @@ object LoggingMarkers {
val SCHEDULER_KAFKA_WAIT_TIME =
LogMarkerToken(scheduler, "kafkaWaitTime", counter)(MeasurementUnit.time.milliseconds)
def SCHEDULER_WAIT_TIME(action: String) =
LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action" -> action))(MeasurementUnit.time.milliseconds)
LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action" -> action))(
MeasurementUnit.time.milliseconds)

def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ object ConfigKeys {
val schedulerQueue = "whisk.scheduler.queue"
val schedulerQueueManager = "whisk.scheduler.queue-manager"
val schedulerInProgressJobRetention = "whisk.scheduler.in-progress-job-retention"
val schedulerBlackboxMultiple = "whisk.scheduler.blackbox-multiple"
val schedulerStaleThreshold = "whisk.scheduler.stale-threshold"

val whiskClusterName = "whisk.cluster.name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ class FunctionPullingContainerPool(

implicit val ec = context.system.dispatcher

private var busyPool = immutable.Map.empty[ActorRef, Data]
private var inProgressPool = immutable.Map.empty[ActorRef, Data]
private var warmedPool = immutable.Map.empty[ActorRef, WarmData]
private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, Data]
protected[containerpool] var inProgressPool = immutable.Map.empty[ActorRef, Data]
protected[containerpool] var warmedPool = immutable.Map.empty[ActorRef, WarmData]
protected[containerpool] var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
protected[containerpool] var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]

// for shutting down
private var disablingPool = immutable.Set.empty[ActorRef]
protected[containerpool] var disablingPool = immutable.Set.empty[ActorRef]

private var shuttingDown = false

Expand Down
2 changes: 2 additions & 0 deletions core/scheduler/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ whisk {
graceful-shutdown-timeout = "5 seconds"
max-retention-size = "10000"
max-retention-ms = "60000"
max-blackbox-retention-ms = "300000"
throttling-fraction = "0.9"
duration-buffer-size = "10"
}
Expand All @@ -85,6 +86,7 @@ whisk {
}
max-peek = "128"
in-progress-job-retention = "20 seconds"
blackbox-multiple = "15"
data-management-service {
retry-interval = "1 second"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ case class JobEntry(action: FullyQualifiedEntityName, timer: Cancellable)

class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
schedulerInstanceId: SchedulerInstanceId,
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
dataManagementService: ActorRef,
baseTimeout: FiniteDuration,
blackboxMultiple: Int)(implicit actorSystem: ActorSystem, logging: Logging)
extends Actor {
private implicit val ec: ExecutionContext = actorSystem.dispatcher
private val baseTimeout = loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetention)
private val retryLimit = 5

/**
Expand Down Expand Up @@ -152,10 +153,10 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
// If there is a JobEntry, delete it.
creationJobPool
.remove(creationId)
.foreach(entry => {
sendState(state)
entry.timer.cancel()
})
.map(entry => entry.timer.cancel())

// even if there is no entry because of timeout, we still need to send the state to the queue if the queue exists
sendState(state)

dataManagementService ! UnregisterData(key)
Future.successful({})
Expand All @@ -176,7 +177,8 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
revision: DocRevision,
creationId: CreationId,
isBlackbox: Boolean): Cancellable = {
val timeout = if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * 3, TimeUnit.SECONDS) else baseTimeout
val timeout =
if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * blackboxMultiple, TimeUnit.SECONDS) else baseTimeout
actorSystem.scheduler.scheduleOnce(timeout) {
logging.warn(
this,
Expand Down Expand Up @@ -222,8 +224,12 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Arr
}

object CreationJobManager {
private val baseTimeout = loadConfigOrThrow[Int](ConfigKeys.schedulerInProgressJobRetention).seconds
private val blackboxMultiple = loadConfigOrThrow[Int](ConfigKeys.schedulerBlackboxMultiple)

def props(feedFactory: (ActorRefFactory, String, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
schedulerInstanceId: SchedulerInstanceId,
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) =
Props(new CreationJobManager(feedFactory, schedulerInstanceId, dataManagementService))
Props(
new CreationJobManager(feedFactory, schedulerInstanceId, dataManagementService, baseTimeout, blackboxMultiple))
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey:
case class QueueReactivated(invocationNamespace: String, action: FullyQualifiedEntityName, docInfo: DocInfo)
case class CancelPoll(promise: Promise[Either[MemoryQueueError, ActivationMessage]])
case object QueueRemovedCompleted
case object FlushPulse

// Events received by the actor
case object Start
Expand Down Expand Up @@ -125,7 +124,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
checkToDropStaleActivation: (Queue[TimeSeriesActivationEntry],
Long,
String,
FullyQualifiedEntityName,
WhiskActionMetaData,
MemoryQueueState,
ActorRef) => Unit,
queueConfig: QueueConfig)(implicit logging: Logging)
Expand All @@ -151,6 +150,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private val memory = actionMetaData.limits.memory.megabytes.MB
private val queueRemovedMsg = QueueRemoved(invocationNamespace, action.toDocId.asDocInfo(revision), Some(leaderKey))
private val staleQueueRemovedMsg = QueueRemoved(invocationNamespace, action.toDocId.asDocInfo(revision), None)
private val actionRetentionTimeout = MemoryQueue.getRetentionTimeout(actionMetaData, queueConfig)

private[queue] var containers = Set.empty[String]
private[queue] var creationIds = Set.empty[String]
Expand Down Expand Up @@ -197,7 +197,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,

when(Uninitialized) {
case Event(Start, _) =>
logging.info(this, s"[$invocationNamespace:$action:$stateName] a new queue is created.")
logging.info(
this,
s"[$invocationNamespace:$action:$stateName] a new queue is created, retentionTimeout: $actionRetentionTimeout, kind: ${actionMetaData.exec.kind}.")
val (schedulerActor, droppingActor) = startMonitoring()
initializeThrottling()

Expand Down Expand Up @@ -256,7 +258,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// when there is no container, it moves to the Flushing state as no activations can be invoked
if (containers.size <= 0) {
val isWhiskError = ContainerCreationError.whiskErrors.contains(error)
completeAllActivations(message, isWhiskError)
if (!isWhiskError) {
completeAllActivations(message, isWhiskError)
}
logging.error(
this,
s"[$invocationNamespace:$action:$stateName] Failed to create an initial container due to ${if (isWhiskError) "whiskError"
Expand All @@ -271,7 +275,11 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// there is no timeout for this state as when there is no further message, it would move to the Running state again.
when(NamespaceThrottled) {
case Event(msg: ActivationMessage, _: ThrottledData) =>
handleActivationMessage(msg)
if (containers.size + creationIds.size == 0) {
completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = false)
} else {
handleActivationMessage(msg)
}
stay

case Event(DisableNamespaceThrottling, data: ThrottledData) =>
Expand Down Expand Up @@ -328,33 +336,51 @@ class MemoryQueue(private val etcdClient: EtcdClient,
goto(Running) using RunningData(schedulerActor, droppingActor)

// log the failed information
case Event(FailedCreationJob(creationId, _, _, _, _, message), data: FlushingData) =>
case Event(FailedCreationJob(creationId, _, _, _, error, message), data: FlushingData) =>
creationIds -= creationId.asString
logging.info(
this,
s"[$invocationNamespace:$action:$stateName][$creationId] Failed to create a container due to $message")

// keep updating the reason
stay using data.copy(reason = message)
stay using data.copy(error = error, reason = message)

// since there is no container, activations cannot be handled.
case Event(msg: ActivationMessage, data: FlushingData) =>
completeErrorActivation(msg, data.reason, ContainerCreationError.whiskErrors.contains(data.error))
logging.info(this, s"[$invocationNamespace:$action:$stateName] got a new activation message ${msg.activationId}")(
msg.transid)
val whiskError = isWhiskError(data.error)
if (whiskError)
queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg))
else
completeErrorActivation(msg, data.reason, whiskError)
stay() using data.copy(activeDuringFlush = true)

// Since SchedulingDecisionMaker keep sending a message to create a container, this state is not automatically timed out.
// Instead, StateTimeout message will be sent by a timer.
case Event(StateTimeout, data: FlushingData) =>
completeAllActivations(data.reason, ContainerCreationError.whiskErrors.contains(data.error))
if (data.activeDuringFlush)
case Event(StateTimeout | DropOld, data: FlushingData) =>
logging.info(this, s"[$invocationNamespace:$action:$stateName] Received StateTimeout, drop stale messages.")
queue =
MemoryQueue.dropOld(queue, Duration.ofMillis(actionRetentionTimeout), data.reason, completeErrorActivation)
if (data.activeDuringFlush || queue.nonEmpty)
stay using data.copy(activeDuringFlush = false)
else
cleanUpActorsAndGotoRemoved(data)

case Event(GracefulShutdown, data: FlushingData) =>
completeAllActivations(data.reason, ContainerCreationError.whiskErrors.contains(data.error))
completeAllActivations(data.reason, isWhiskError(data.error))
Copy link
Contributor

@ningyougang ningyougang Jul 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our downstream didn't have this statement

completeAllActivations(data.reason, isWhiskError(data.error))

So, just add completeAllActivations, is it for safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. I added the case back.

logging.info(this, s"[$invocationNamespace:$action:$stateName] Received GracefulShutdown, stop the queue.")
cleanUpActorsAndGotoRemoved(data)

case Event(StopSchedulingAsOutdated, data: FlushingData) =>
logging.info(this, s"[$invocationNamespace:$action:$stateName] stop further scheduling.")
completeAllActivations(data.reason, isWhiskError(data.error))
// let QueueManager know this queue is no longer in charge.
context.parent ! staleQueueRemovedMsg
cleanUpActors(data)
cleanUpData()

goto(Removed) using NoData()
}

// in case there is any activation in the queue, it waits until all of them are handled.
Expand Down Expand Up @@ -399,6 +425,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,

// actors and data are already wiped
case Event(QueueRemovedCompleted, _: NoData) =>
logging.info(this, "stop fsm")
stop()

// This is not supposed to happen. This will ensure the queue does not run forever.
Expand Down Expand Up @@ -523,15 +550,19 @@ class MemoryQueue(private val etcdClient: EtcdClient,
case Event(DropOld, _) =>
if (queue.nonEmpty && Duration
.between(queue.head.timestamp, Instant.now)
.compareTo(Duration.ofMillis(queueConfig.maxRetentionMs)) < 0) {
.compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
logging.error(
this,
s"[$invocationNamespace:$action:$stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}.")
logging.error(
this,
s"[$invocationNamespace:$action:$stateName] the head stale message: ${queue.head.msg.activationId}")
}
queue = MemoryQueue.dropOld(queue, Duration.ofMillis(queueConfig.maxRetentionMs), completeErrorActivation)
queue = MemoryQueue.dropOld(
queue,
Duration.ofMillis(actionRetentionTimeout),
s"Activation processing is not initiated for $actionRetentionTimeout ms",
completeErrorActivation)

stay

Expand Down Expand Up @@ -861,7 +892,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// these schedulers will run forever and stop when the memory queue stops
private def startMonitoring(): (ActorRef, ActorRef) = {
val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
checkToDropStaleActivation(queue, queueConfig.maxRetentionMs, invocationNamespace, action, stateName, self)
checkToDropStaleActivation(queue, actionRetentionTimeout, invocationNamespace, actionMetaData, stateName, self)
Future.successful(())
}

Expand Down Expand Up @@ -1055,11 +1086,12 @@ class MemoryQueue(private val etcdClient: EtcdClient,
causedBy ++ limits ++ binding
})
}

private def isWhiskError(error: ContainerCreationError): Boolean = ContainerCreationError.whiskErrors.contains(error)
}

object MemoryQueue {
private[queue] val queueConfig = loadConfigOrThrow[QueueConfig](ConfigKeys.schedulerQueue)
private[queue] val MaxRetentionTime = queueConfig.maxRetentionMs

def props(etcdClient: EtcdClient,
durationChecker: DurationChecker,
Expand Down Expand Up @@ -1105,21 +1137,27 @@ object MemoryQueue {
def dropOld(
queue: Queue[TimeSeriesActivationEntry],
retention: Duration,
reason: String,
completeErrorActivation: (ActivationMessage, String, Boolean) => Future[Any]): Queue[TimeSeriesActivationEntry] = {
if (queue.isEmpty || Duration.between(queue.head.timestamp, Instant.now).compareTo(retention) < 0)
queue
else {
completeErrorActivation(queue.head.msg, s"activation processing is not initiated for $MaxRetentionTime ms", true)
dropOld(queue.tail, retention, completeErrorActivation)
completeErrorActivation(queue.head.msg, reason, true)
dropOld(queue.tail, retention, reason, completeErrorActivation)
}
}

def checkToDropStaleActivation(queue: Queue[TimeSeriesActivationEntry],
maxRetentionMs: Long,
invocationNamespace: String,
action: FullyQualifiedEntityName,
actionMetaData: WhiskActionMetaData,
stateName: MemoryQueueState,
queueRef: ActorRef)(implicit logging: Logging) = {
val action = actionMetaData.fullyQualifiedName(true)
logging.debug(
this,
s"[$invocationNamespace:$action:$stateName] use the given retention timeout: $maxRetentionMs for this action kind: ${actionMetaData.exec.kind}.")

if (queue.nonEmpty && Duration
.between(queue.head.timestamp, Instant.now)
.compareTo(Duration.ofMillis(maxRetentionMs)) >= 0) {
Expand All @@ -1130,6 +1168,14 @@ object MemoryQueue {
queueRef ! DropOld
}
}

private def getRetentionTimeout(actionMetaData: WhiskActionMetaData, queueConfig: QueueConfig): Long = {
if (actionMetaData.exec.kind == ExecMetaDataBase.BLACKBOX) {
queueConfig.maxBlackboxRetentionMs
} else {
queueConfig.maxRetentionMs
}
}
}

case class QueueSnapshot(initialized: Boolean,
Expand All @@ -1151,6 +1197,7 @@ case class QueueConfig(idleGrace: FiniteDuration,
gracefulShutdownTimeout: FiniteDuration,
maxRetentionSize: Int,
maxRetentionMs: Long,
maxBlackboxRetentionMs: Long,
throttlingFraction: Double,
durationBufferSize: Int)

Expand Down
1 change: 1 addition & 0 deletions tests/src/test/resources/application.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ whisk {
graceful-shutdown-timeout = "{{ scheduler.queue.gracefulShutdownTimeout | default('5 seconds') }}"
max-retention-size = "{{ scheduler.queue.maxRetentionSize | default(10000) }}"
max-retention-ms = "{{ scheduler.queue.maxRetentionMs | default(60000) }}"
max-blackbox-retention-ms = "{{ scheduler.queue.maxBlackboxRetentionMs}}"
throttling-fraction = "{{ scheduler.queue.throttlingFraction | default(0.9) }}"
duration-buffer-size = "{{ scheduler.queue.durationBufferSize | default(10) }}"
}
Expand Down
Loading