Skip to content

[Scheduler Enhancement] Consider binding action when creating or recovering queue. #5267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class WatcherService(etcdClient: EtcdClient)(implicit logging: Logging, actorSys
deleteWatchers.update(watcherKey, sender())

case request: UnwatchEndpoint =>
logging.info(this, s"unwatch endpoint: $request")
val watcherKey = WatcherKey(request.watchKey, request.watchName)
if (request.isPrefix) {
prefixPutWatchers.remove(watcherKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class FPCPoolBalancer(config: WhiskConfig,
if (retryCount >= 0)
scheduler
.getRemoteRef(QueueManager.actorName)
.ask(CreateQueue(invocationNamespace, fullyQualifiedEntityName.copy(binding = None), revision, actionMetaData))
.ask(CreateQueue(invocationNamespace, fullyQualifiedEntityName, revision, actionMetaData))
.mapTo[CreateQueueResponse]
.onComplete {
case Success(_) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,13 @@ class FunctionPullingContainerPool(
case Some(((proxy, data), containerState)) =>
// record creationMessage so when container created failed, we can send failed message to scheduler
creationMessages.getOrElseUpdate(proxy, create)
proxy ! Initialize(create.invocationNamespace, executable, create.schedulerHost, create.rpcPort, create.transid)
proxy ! Initialize(
create.invocationNamespace,
create.action,
executable,
create.schedulerHost,
create.rpcPort,
create.transid)
inProgressPool = inProgressPool + (proxy -> data)
logContainerStart(create, executable.toWhiskAction, containerState)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ case class InitCodeCompleted(data: WarmData)

// Events received by the actor
case class Initialize(invocationNamespace: String,
fqn: FullyQualifiedEntityName,
action: ExecutableWhiskAction,
schedulerHost: String,
rpcPort: Int,
Expand Down Expand Up @@ -251,7 +252,7 @@ class FunctionPullingContainerProxy(
clientProxyFactory(
context,
job.invocationNamespace,
job.action.fullyQualifiedName(true),
job.fqn, // include binding field
job.action.rev,
job.schedulerHost,
job.rpcPort,
Expand Down Expand Up @@ -292,7 +293,7 @@ class FunctionPullingContainerProxy(
clientProxyFactory(
context,
job.invocationNamespace,
job.action.fullyQualifiedName(true),
job.fqn, // include binding field
job.action.rev,
job.schedulerHost,
job.rpcPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ class InvokerHealthManager(instanceId: InvokerInstanceId,

WhiskAction.get(entityStore, docId).onComplete {
case Success(action) =>
val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
val initialize =
Initialize(namespace, action.fullyQualifiedName(true), action.toExecutableWhiskAction.get, "", 0, transid)
startHealthAction(initialize, manager)
case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ class QueueManager(
}

case RecoverQueue(msg, action, actionMetaData) =>
QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
QueuePool.keys.find(k =>
k.invocationNamespace == msg.user.namespace.name.asString && k.docInfo.id == action.toDocId) match {
// queue is already recovered or a newer queue is created, send msg to new queue
case Some(key) if key.docInfo.rev >= msg.revision =>
QueuePool.get(key) match {
Expand Down Expand Up @@ -358,10 +359,12 @@ class QueueManager(
}

private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
logging.warn(this, s"queue for ${msg.action} doesn't exist in memory but exist in etcd, recovering...")
logging.warn(
this,
s"queue for ${msg.user.namespace.name.asString}/${msg.action} doesn't exist in memory but exist in etcd, recovering...")
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)

logging.info(this, s"Recover a queue for ${msg.action},")
logging.info(this, s"Recover a queue for ${msg.user.namespace.name.asString}/${msg.action},")
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
.map { actionMetaData: WhiskActionMetaData =>
actionMetaData.toExecutableWhiskAction match {
Expand Down Expand Up @@ -397,7 +400,7 @@ class QueueManager(
logging.warn(
this,
s"[${msg.activationId}] the activation message has not been scheduled for ${queueManagerConfig.maxSchedulingTime.toSeconds} sec")
completeErrorActivation(msg, "The activation has not been processed")
completeErrorActivation(msg, "The activation has not been processed: too old activation is arrived.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
completeErrorActivation(msg, "The activation has not been processed: too old activation is arrived.")
completeErrorActivation(msg, "The activation has not been processed due to timeout waiting for processing in the scheduler.")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this effectively describes the case.
This is the case that activations arrived after the max scheduling wait time.
For example, this can happen when there was a failure in Kafka so activations could not be delivered and just stored in it. When Kafka becomes available again, it will start delivering activations.
But if it took so much time to restore Kafka such as 1 hour, it will send too old(1 hour-old) activations.
Also, if there were many activations stored in Kafka before the failure, it would cause a thundering herd by sending them all at the same time. So we complete them with an error.

} else {
QueuePool.get(MemoryQueueKey(msg.user.namespace.name.asString, msg.action.toDocId.asDocInfo(msg.revision))) match {
case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
Expand Down Expand Up @@ -466,13 +469,15 @@ class QueueManager(
.recoverWith {
case t =>
logging.warn(this, s"[${msg.activationId}] failed to parse endpoints (${t.getMessage})")
completeErrorActivation(msg, "The activation has not been processed")
completeErrorActivation(
msg,
"The activation has not been processed: failed to parse the scheduler endpoint.")
}

} recoverWith {
case t =>
logging.warn(this, s"[${msg.activationId}] activation has been dropped (${t.getMessage})")
completeErrorActivation(msg, "The activation has not been processed")
completeErrorActivation(msg, "The activation has not been processed: failed to get the queue endpoint.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,12 @@ class FunctionPullingContainerPoolTests

pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}

pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(1).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
}

Expand All @@ -254,7 +254,7 @@ class FunctionPullingContainerPoolTests
// Start first action
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}

// Send second action to the pool
Expand All @@ -267,7 +267,7 @@ class FunctionPullingContainerPoolTests
pool ! CreationContainer(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction)
// Second container should run now
containers(1).expectMsgPF() {
case Initialize(invocationNamespace, bigExecuteAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, bigExecuteAction, schedulerHost, rpcPort, _) => true
}
}

Expand Down Expand Up @@ -307,7 +307,7 @@ class FunctionPullingContainerPoolTests
pool ! Enable
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
}

Expand All @@ -328,7 +328,7 @@ class FunctionPullingContainerPoolTests
(0 to 10).foreach(_ => pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)) // 11 * stdMemory taken)
(0 to 10).foreach(i => {
containers(i).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
// create 5 container in busy pool, and 6 in warmed pool
if (i < 5)
Expand Down Expand Up @@ -526,7 +526,7 @@ class FunctionPullingContainerPoolTests
// the prewarm container with matched memory should be chose
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}

// prewarm a new container
Expand All @@ -537,7 +537,7 @@ class FunctionPullingContainerPoolTests
// the prewarm container with bigger memory should not be chose
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(3).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
}

Expand Down Expand Up @@ -566,7 +566,7 @@ class FunctionPullingContainerPoolTests
// the prewarm container with smallest memory should be chose
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}

// prewarm a new container
Expand All @@ -577,7 +577,7 @@ class FunctionPullingContainerPoolTests
// the prewarm container with bigger memory should be chose
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(1).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}

// prewarm a new container
Expand All @@ -589,7 +589,7 @@ class FunctionPullingContainerPoolTests
// a new container should be created
pool ! CreationContainer(creationMessageLarge.copy(revision = doc.rev), bigWhiskAction)
containers(4).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}

// no new prewarmed container should be created
Expand All @@ -616,7 +616,7 @@ class FunctionPullingContainerPoolTests

pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(1).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
}

Expand All @@ -640,7 +640,7 @@ class FunctionPullingContainerPoolTests

pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(1).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
}

Expand Down Expand Up @@ -674,24 +674,24 @@ class FunctionPullingContainerPoolTests
// the revision doesn't match, create 1 container
pool ! CreationContainer(creationMessage, whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}

// the invocation namespace doesn't match, create 1 container
pool ! CreationContainer(creationMessage.copy(invocationNamespace = "otherNamespace"), whiskAction)
containers(1).expectMsgPF() {
case Initialize("otherNamespace", executeAction, schedulerHost, rpcPort, _) => true
case Initialize("otherNamespace", fqn, executeAction, schedulerHost, rpcPort, _) => true
}

pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
container.expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}

// warmed container is occupied, create 1 more container
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(2).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
}

Expand Down Expand Up @@ -726,7 +726,7 @@ class FunctionPullingContainerPoolTests
// choose the warmed container
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
container.expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}

// warmed container is failed to resume
Expand All @@ -743,7 +743,7 @@ class FunctionPullingContainerPoolTests

// then a new container will be created
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
}

Expand Down Expand Up @@ -806,7 +806,7 @@ class FunctionPullingContainerPoolTests

// a new container will be created
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
}

Expand Down Expand Up @@ -850,7 +850,7 @@ class FunctionPullingContainerPoolTests

pool ! CreationContainer(actualCreationMessage, whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
containers(0).send(pool, Initialized(initializedData)) // container is initialized

Expand Down Expand Up @@ -898,7 +898,7 @@ class FunctionPullingContainerPoolTests
// choose the warmed container
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
container.expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
pool.tell(
Resumed(
Expand Down Expand Up @@ -956,7 +956,7 @@ class FunctionPullingContainerPoolTests

pool ! CreationContainer(actualCreationMessage, whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
containers(0).send(pool, ContainerRemoved(true)) // the container0 init failed or create container failed

Expand Down Expand Up @@ -1127,11 +1127,11 @@ class FunctionPullingContainerPoolTests
// 2 cold start happened
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(2).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(3).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
Expand Down Expand Up @@ -1165,23 +1165,23 @@ class FunctionPullingContainerPoolTests
// 5 code start happened(5 > maxCount)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(6).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(7).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(8).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(9).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(10).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
case Initialize(invocationNamespace, fqn, executeAction, schedulerHost, rpcPort, _) => true
}

// Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler after prewarmExpirationCheckIntervel time
Expand Down
Loading