Skip to content

add error handling to container manager when invoker query fails #5320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ class ContainerManager(jobManagerFactory: ActorRefFactory => ActorRef,
logging.info(this, s"received ${msgs.size} creation message [${msgs.head.invocationNamespace}:${msgs.head.action}]")
ContainerManager
.getAvailableInvokers(etcdClient, memory, invocationNamespace)
.recover({
case t: Throwable =>
logging.error(this, s"Unable to get available invokers: ${t.getMessage}.")
List.empty[InvokerHealth]
})
Copy link
Contributor

@ningyougang ningyougang Aug 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • container manager swallows the message
    You mean if Throwable happens, there has no ack message?
  • the memory queue will never hear back the status of the container creation
    Why?

Copy link
Contributor Author

@bdoyle0182 bdoyle0182 Aug 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A request is done to etcd to get the list of healthy invokers with .getAvailableInvokers. If the future for the request to etcd fails for any reason, there was no failure handing on that call prior to this. This function is just a synchronous unit so if there's no failure handling on that future it just completes and never makes an acknowledgement back to the memory queue that the creation message has been processed either successfully or failed for the memory queue to properly decrement creationIds. Also should clarify if it silently fails at this point, it hasn't yet registered the creation job so the akka timer to timeout the creation job to make the call back to MemoryQueue on timeout is not created. The memory queue indefinitely thinks that there is a container creation in progress. If the action never needs more than one container, it will never be able to execute because the memory queue thinks one is in progress. Also the memory queue can not be stopped on timeout in this case because of the creationIds not being 0.

else {
        logging.info(
          this,
          s"[$invocationNamespace:$action:$stateName] The queue is timed out but there are still ${queue.size} activation messages or (running: ${containers.size}, in-progress: ${creationIds.size}) containers")
        stay
      }

So while I think this covers the only edge case I know of, we really need an additional safeguard in MemoryQueue to eventually clear out knowledge of in progress containers if things get out of sync as there's no way to guarantee creationIds is perfectly in sync when it's dependent essentially on a fire and forget successfully making the callback at some point which is prone to introducing bugs even if this pr covers every case for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One solution could be to make the request from MemoryQueue to ContainerManager an ask rather than a tell and make the timeout of the ask the value of CONFIG_whisk_scheduler_inProgressJobRetention plus one second for buffer. That would probably significantly reduce the complexity of the MemoryQueue as well for message cases you need to account for

(I think the CreationJobManager actually gets the responsibility of responding to the MemoryQueue in most cases, but you should be able to just forward the ref of the ask as a param to CreationJobManager from ContainerManager)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got your point 👍

.foreach { invokers =>
if (invokers.isEmpty) {
logging.error(this, "there is no available invoker to schedule.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ class ContainerManagerTests
ScheduledPair(msg4, None, Some(NoAvailableInvokersError)))
}

it should "send FailedCreationJob to queue manager when no invokers are available" in {
it should "send FailedCreationJob to memory queue when no invokers are available" in {
val mockEtcd = mock[EtcdClient]
val probe = TestProbe()
(mockEtcd
Expand Down Expand Up @@ -910,6 +910,50 @@ class ContainerManagerTests
NoAvailableInvokersError))
}

it should "send FailedCreationJob to memory queue when available invoker query fails" in {
val mockEtcd = mock[EtcdClient]
val probe = TestProbe()
(mockEtcd
.getPrefix(_: String))
.expects(InvokerKeys.prefix)
.returning(Future.failed(new Exception("etcd request failed.")))
.twice()

val fqn = FullyQualifiedEntityName(EntityPath("ns1"), EntityName(testAction))

QueuePool.put(
MemoryQueueKey(testInvocationNamespace, fqn.toDocId.asDocInfo(testRevision)),
MemoryQueueValue(probe.ref, true))

val mockJobManager = TestProbe()
val mockWatcher = TestProbe()

val manager =
system.actorOf(
ContainerManager.props(factory(mockJobManager), mockMessaging(), testsid, mockEtcd, config, mockWatcher.ref))

val msg =
ContainerCreationMessage(
TransactionId.testing,
testInvocationNamespace,
fqn,
testRevision,
actionMetadata,
testsid,
schedulerHost,
rpcPort)

manager ! ContainerCreation(List(msg), testMemory, testInvocationNamespace)
probe.expectMsg(
FailedCreationJob(
msg.creationId,
testInvocationNamespace,
msg.action,
testRevision,
NoAvailableInvokersError,
NoAvailableInvokersError))
}

it should "schedule to the blackbox invoker when isBlackboxInvocation is true" in {
stream.reset()
val mockEtcd = mock[EtcdClient]
Expand Down