Skip to content

Prevent cycle sending #5251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ object LoggingMarkers {
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds)
def SCHEDULER_QUEUE_UPDATE(reason: String) =
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ case class UpdateMemoryQueue(oldAction: DocInfo,
case class CreateNewQueue(activationMessage: ActivationMessage,
action: FullyQualifiedEntityName,
actionMetadata: WhiskActionMetaData)
case class RecoverQueue(activationMessage: ActivationMessage,
action: FullyQualifiedEntityName,
actionMetadata: WhiskActionMetaData)

case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)

Expand All @@ -80,7 +83,7 @@ class QueueManager(

private val actorSelectionMap = TrieMap[String, ActorSelection]()

private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()

private implicit val askTimeout = Timeout(5.seconds)
private implicit val ec = context.dispatcher
Expand All @@ -90,6 +93,8 @@ class QueueManager(
// watch leaders and register them into actorSelectionMap
watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))

private var isShuttingDown = false

override def receive: Receive = {
case request: CreateQueue if isWarmUpAction(request.fqn) =>
logging.info(
Expand All @@ -114,12 +119,12 @@ class QueueManager(
msg.leadership match {
case Right(EtcdLeader(key, value, lease)) =>
leaderElectionCallbacks.remove(key).foreach { callback =>
callback(Right(EtcdLeader(key, value, lease)))
callback(Right(EtcdLeader(key, value, lease)), isShuttingDown)
}

case Left(EtcdFollower(key, value)) =>
leaderElectionCallbacks.remove(key).foreach { callback =>
callback(Left(EtcdFollower(key, value)))
callback(Left(EtcdFollower(key, value)), isShuttingDown)
}
}

Expand All @@ -129,7 +134,11 @@ class QueueManager(
s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
msg.transid)

handleActivationMessage(msg)
if (sender() == self) {
handleCycle(msg)(msg.transid)
} else {
handleActivationMessage(msg)
}

case UpdateMemoryQueue(oldAction, newAction, msg) =>
logging.info(
Expand Down Expand Up @@ -164,6 +173,24 @@ class QueueManager(
updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
queue ! msg
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
if (isShuttingDown) {
queue ! GracefulShutdown
}
}

case RecoverQueue(msg, action, actionMetaData) =>
QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
// queue is already recovered or a newer queue is created, send msg to new queue
case Some(key) if key.docInfo.rev >= msg.revision =>
QueuePool.get(key) match {
case Some(queue) if queue.isLeader =>
queue.queue ! msg.copy(revision = key.docInfo.rev)
logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
case _ =>
recreateQueue(action, msg, actionMetaData)
}
case _ =>
recreateQueue(action, msg, actionMetaData)
}

// leaderKey is now optional, it becomes None when the stale queue is removed
Expand Down Expand Up @@ -208,6 +235,7 @@ class QueueManager(
}

case GracefulShutdown =>
isShuttingDown = true
logging.info(this, s"Gracefully shutdown the queue manager")

watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
Expand Down Expand Up @@ -317,6 +345,47 @@ class QueueManager(
}
}

private def recreateQueue(action: FullyQualifiedEntityName,
msg: ActivationMessage,
actionMetaData: WhiskActionMetaData): Unit = {
logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid)
val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
queue ! msg
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
if (isShuttingDown) {
queue ! GracefulShutdown
}
}

private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
logging.warn(this, s"queue for ${msg.action} doesn't exist in memory but exist in etcd, recovering...")
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)

logging.info(this, s"Recover a queue for ${msg.action},")
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
.map { actionMetaData: WhiskActionMetaData =>
actionMetaData.toExecutableWhiskAction match {
case Some(_) =>
self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData)
transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}")

case None =>
val message =
s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager"
completeErrorActivation(msg, message)
transid.failed(this, start, message)
}
}
.recover {
case t =>
transid.failed(
this,
start,
s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}")
completeErrorActivation(msg, t.getMessage)
}
}

private def handleActivationMessage(msg: ActivationMessage): Any = {
implicit val transid = msg.transid

Expand Down Expand Up @@ -451,24 +520,24 @@ class QueueManager(
case None =>
dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
leaderElectionCallbacks.put(
leaderKey, {
case Right(EtcdLeader(_, _, _)) =>
val queue = childFactory(
context,
request.invocationNamespace,
request.fqn,
request.revision,
request.whiskActionMetaData)
queue ! Start
QueuePool.put(
MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
MemoryQueueValue(queue, true))
updateInitRevisionMap(leaderKey, request.revision)
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))

// in case of follower, do nothing
case Left(EtcdFollower(_, _)) =>
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
leaderKey,
(electResult, isShuttingDown) => {
electResult match {
case Right(EtcdLeader(_, _, _)) =>
val queue = createAndStartQueue(
request.invocationNamespace,
request.fqn,
request.revision,
request.whiskActionMetaData)
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
if (isShuttingDown) {
queue ! GracefulShutdown
}

// in case of follower, do nothing
case Left(EtcdFollower(_, _)) =>
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
}
})

// there is already a leader election for leaderKey, so skip it
Expand All @@ -488,6 +557,20 @@ class QueueManager(
}
}

private def createAndStartQueue(invocationNamespace: String,
action: FullyQualifiedEntityName,
revision: DocRevision,
actionMetaData: WhiskActionMetaData): ActorRef = {
val queue =
childFactory(context, invocationNamespace, action, revision, actionMetaData)
queue ! Start
QueuePool.put(
MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)),
MemoryQueueValue(queue, true))
updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
queue
}

private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,65 @@ class QueueManagerTests
probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
}

it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
val mockEtcdClient = mock[EtcdClient]
(mockEtcdClient
.get(_: String))
.expects(*)
.returning(Future.successful {
RangeResponse
.newBuilder()
.addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
.build()
})
.anyNumberOfTimes()
val dataManagementService = getTestDataManagementService()
val watcher = TestProbe()

val probe = TestProbe()

val childFactory =
(_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref

val queueManager =
TestActorRef(
QueueManager
.props(
entityStore,
get,
mockEtcdClient,
schedulerEndpoint,
schedulerId,
dataManagementService.ref,
watcher.ref,
ack,
store,
childFactory,
mockConsumer))

watcher.expectMsg(watchEndpoint)
//current queue's revision is `1-test-revision`
(queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
testInvocationNamespace,
testFQN,
true)

probe.expectMsg(Start)

// simulate queue superseded, the queue will be removed but leader key won't be deleted
queueManager ! QueueRemoved(
testInvocationNamespace,
testFQN.toDocId.asDocInfo(testDocRevision),
Some(testLeaderKey))

queueManager.!(activationMessage)(queueManager)
val msg2 = activationMessage.copy(activationId = ActivationId.generate())
queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue
probe.expectMsg(Start)
probe.expectMsg(activationMessage)
probe.expectMsg(msg2)
}

it should "not skip outdated activation when the revision is older than the one in a datastore" in {
stream.reset()
val mockEtcdClient = mock[EtcdClient]
Expand Down Expand Up @@ -1082,6 +1141,9 @@ class QueueManagerTests
val probe = TestProbe()
val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3"))
val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4"))
val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5"))

// probe will watch all actors which are created by these factories
val childFactory =
Expand Down Expand Up @@ -1129,5 +1191,14 @@ class QueueManagerTests
queueManager ! GracefulShutdown

probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)

// after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
(queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
.mapTo[CreateQueueResponse]
.futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true)
queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData)
queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData)

probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
}
}