Skip to content

Commit 5a2ea98

Browse files
committed
Consider binding action when creating or recovering queue.
1 parent c5970a6 commit 5a2ea98

File tree

10 files changed

+115
-86
lines changed

10 files changed

+115
-86
lines changed

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,8 @@ object LoggingMarkers {
593593
val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
594594
val SCHEDULER_KAFKA_WAIT_TIME =
595595
LogMarkerToken(scheduler, "kafkaWaitTime", counter)(MeasurementUnit.none)
596-
def SCHEDULER_WAIT_TIME(action: String) = LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
596+
def SCHEDULER_WAIT_TIME(action: String) =
597+
LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
597598

598599
def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
599600
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)

common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class WatcherService(etcdClient: EtcdClient)(implicit logging: Logging, actorSys
128128
deleteWatchers.update(watcherKey, sender())
129129

130130
case request: UnwatchEndpoint =>
131+
logging.info(this, s"unwatch endpoint: $request")
131132
val watcherKey = WatcherKey(request.watchKey, request.watchName)
132133
if (request.isPrefix) {
133134
prefixPutWatchers.remove(watcherKey)

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ class FPCPoolBalancer(config: WhiskConfig,
174174
if (retryCount >= 0)
175175
scheduler
176176
.getRemoteRef(QueueManager.actorName)
177-
.ask(CreateQueue(invocationNamespace, fullyQualifiedEntityName.copy(binding = None), revision, actionMetaData))
177+
.ask(CreateQueue(invocationNamespace, fullyQualifiedEntityName, revision, actionMetaData))
178178
.mapTo[CreateQueueResponse]
179179
.onComplete {
180180
case Success(_) =>

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,13 @@ class FunctionPullingContainerPool(
680680
case Some(((proxy, data), containerState)) =>
681681
// record creationMessage so when container created failed, we can send failed message to scheduler
682682
creationMessages.getOrElseUpdate(proxy, create)
683-
proxy ! Initialize(create.invocationNamespace, executable, create.schedulerHost, create.rpcPort, create.transid)
683+
proxy ! Initialize(
684+
create.invocationNamespace,
685+
create.action,
686+
executable,
687+
create.schedulerHost,
688+
create.rpcPort,
689+
create.transid)
684690
inProgressPool = inProgressPool + (proxy -> data)
685691
logContainerStart(create, executable.toWhiskAction, containerState)
686692

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ case class InitCodeCompleted(data: WarmData)
6868

6969
// Events received by the actor
7070
case class Initialize(invocationNamespace: String,
71+
fqn: FullyQualifiedEntityName,
7172
action: ExecutableWhiskAction,
7273
schedulerHost: String,
7374
rpcPort: Int,
@@ -251,7 +252,7 @@ class FunctionPullingContainerProxy(
251252
clientProxyFactory(
252253
context,
253254
job.invocationNamespace,
254-
job.action.fullyQualifiedName(true),
255+
job.fqn, // include binding field
255256
job.action.rev,
256257
job.schedulerHost,
257258
job.rpcPort,
@@ -292,7 +293,7 @@ class FunctionPullingContainerProxy(
292293
clientProxyFactory(
293294
context,
294295
job.invocationNamespace,
295-
job.action.fullyQualifiedName(true),
296+
job.fqn, // include binding field
296297
job.action.rev,
297298
job.schedulerHost,
298299
job.rpcPort,

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ class InvokerHealthManager(instanceId: InvokerInstanceId,
172172

173173
WhiskAction.get(entityStore, docId).onComplete {
174174
case Success(action) =>
175-
val initialize = Initialize(namespace, action.toExecutableWhiskAction.get, "", 0, transid)
175+
val initialize =
176+
Initialize(namespace, action.fullyQualifiedName(true), action.toExecutableWhiskAction.get, "", 0, transid)
176177
startHealthAction(initialize, manager)
177178
case Failure(t) => logging.error(this, s"get health action error: ${t.getMessage}")
178179
}

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@ import org.apache.openwhisk.core.etcd.EtcdClient
3535
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
3636
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
3737
import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse}
38-
import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
38+
import org.apache.openwhisk.core.scheduler.message.{
39+
ContainerCreation,
40+
ContainerDeletion,
41+
FailedCreationJob,
42+
SuccessfulCreationJob
43+
}
3944
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
4045
import org.apache.openwhisk.core.service._
4146
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
@@ -47,7 +52,7 @@ import scala.annotation.tailrec
4752
import scala.collection.immutable.Queue
4853
import scala.collection.mutable
4954
import scala.concurrent.duration._
50-
import scala.concurrent.{ExecutionContextExecutor, Future, Promise, duration}
55+
import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
5156
import scala.util.{Failure, Success}
5257

5358
// States
@@ -766,7 +771,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
766771
activation.transid)
767772

768773
val totalTimeInScheduler = Interval(activation.transid.meta.start, Instant.now()).duration
769-
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis)
774+
MetricEmitter.emitHistogramMetric(
775+
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
776+
totalTimeInScheduler.toMillis)
770777

771778
val activationResponse =
772779
if (isWhiskError)
@@ -938,7 +945,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
938945
takeUncompletedRequest()
939946
.map { res =>
940947
val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration
941-
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis)
948+
MetricEmitter.emitHistogramMetric(
949+
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
950+
totalTimeInScheduler.toMillis)
942951
res.trySuccess(Right(msg))
943952
in.decrementAndGet()
944953
stay
@@ -960,7 +969,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
960969
this,
961970
s"[$invocationNamespace:$action:$stateName] Get activation request ${request.containerId}, send one message: ${msg.activationId}")
962971
val totalTimeInScheduler = Interval(msg.transid.meta.start, Instant.now()).duration
963-
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString), totalTimeInScheduler.toMillis)
972+
MetricEmitter.emitHistogramMetric(
973+
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString),
974+
totalTimeInScheduler.toMillis)
964975

965976
sender ! GetActivationResponse(Right(msg))
966977
tryDisableActionThrottling()

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ class QueueManager(
179179
}
180180

181181
case RecoverQueue(msg, action, actionMetaData) =>
182-
QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
182+
QueuePool.keys.find(k =>
183+
k.invocationNamespace == msg.user.namespace.name.asString && k.docInfo.id == action.toDocId) match {
183184
// queue is already recovered or a newer queue is created, send msg to new queue
184185
case Some(key) if key.docInfo.rev >= msg.revision =>
185186
QueuePool.get(key) match {
@@ -358,10 +359,12 @@ class QueueManager(
358359
}
359360

360361
private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
361-
logging.warn(this, s"queue for ${msg.action} doesn't exist in memory but exist in etcd, recovering...")
362+
logging.warn(
363+
this,
364+
s"queue for ${msg.user.namespace.name.asString}/${msg.action} doesn't exist in memory but exist in etcd, recovering...")
362365
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
363366

364-
logging.info(this, s"Recover a queue for ${msg.action},")
367+
logging.info(this, s"Recover a queue for ${msg.user.namespace.name.asString}/${msg.action},")
365368
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
366369
.map { actionMetaData: WhiskActionMetaData =>
367370
actionMetaData.toExecutableWhiskAction match {
@@ -397,7 +400,7 @@ class QueueManager(
397400
logging.warn(
398401
this,
399402
s"[${msg.activationId}] the activation message has not been scheduled for ${queueManagerConfig.maxSchedulingTime.toSeconds} sec")
400-
completeErrorActivation(msg, "The activation has not been processed")
403+
completeErrorActivation(msg, "The activation has not been processed: too old activation is arrived.")
401404
} else {
402405
QueuePool.get(MemoryQueueKey(msg.user.namespace.name.asString, msg.action.toDocId.asDocInfo(msg.revision))) match {
403406
case Some(memoryQueueValue) if memoryQueueValue.isLeader =>
@@ -466,13 +469,15 @@ class QueueManager(
466469
.recoverWith {
467470
case t =>
468471
logging.warn(this, s"[${msg.activationId}] failed to parse endpoints (${t.getMessage})")
469-
completeErrorActivation(msg, "The activation has not been processed")
472+
completeErrorActivation(
473+
msg,
474+
"The activation has not been processed: failed to parse the scheduler endpoint.")
470475
}
471476

472477
} recoverWith {
473478
case t =>
474479
logging.warn(this, s"[${msg.activationId}] activation has been dropped (${t.getMessage})")
475-
completeErrorActivation(msg, "The activation has not been processed")
480+
completeErrorActivation(msg, "The activation has not been processed: failed to get the queue endpoint.")
476481
}
477482
}
478483

0 commit comments

Comments
 (0)