Skip to content

Commit 4ee105b

Browse files
committed
Fix test case error
1 parent c2ce94b commit 4ee105b

File tree

2 files changed

+139
-16
lines changed

2 files changed

+139
-16
lines changed

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,17 @@ import java.nio.charset.StandardCharsets
2222
import akka.actor.{ActorRef, ActorSystem, Props}
2323
import org.apache.kafka.clients.producer.RecordMetadata
2424
import org.apache.openwhisk.common.{GracefulShutdown, Logging, TransactionId}
25+
import org.apache.openwhisk.core.WarmUp.isWarmUpAction
2526
import org.apache.openwhisk.core.WhiskConfig
2627
import org.apache.openwhisk.core.connector.ContainerCreationError.DBFetchError
2728
import org.apache.openwhisk.core.connector._
2829
import org.apache.openwhisk.core.containerpool.v2.{CreationContainer, DeletionContainer}
29-
import org.apache.openwhisk.core.database.{ArtifactStore, DocumentTypeMismatchException, DocumentUnreadable, NoDocumentException}
30+
import org.apache.openwhisk.core.database.{
31+
ArtifactStore,
32+
DocumentTypeMismatchException,
33+
DocumentUnreadable,
34+
NoDocumentException
35+
}
3036
import org.apache.openwhisk.core.entity._
3137
import org.apache.openwhisk.http.Messages
3238

@@ -35,17 +41,17 @@ import scala.concurrent.{ExecutionContext, Future}
3541
import scala.util.{Failure, Success}
3642

3743
class ContainerMessageConsumer(
38-
invokerInstanceId: InvokerInstanceId,
39-
containerPool: ActorRef,
40-
entityStore: ArtifactStore[WhiskEntity],
41-
config: WhiskConfig,
42-
msgProvider: MessagingProvider,
43-
longPollDuration: FiniteDuration,
44-
maxPeek: Int,
45-
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
46-
implicit actorSystem: ActorSystem,
47-
executionContext: ExecutionContext,
48-
logging: Logging) {
44+
invokerInstanceId: InvokerInstanceId,
45+
containerPool: ActorRef,
46+
entityStore: ArtifactStore[WhiskEntity],
47+
config: WhiskConfig,
48+
msgProvider: MessagingProvider,
49+
longPollDuration: FiniteDuration,
50+
maxPeek: Int,
51+
sendAckToScheduler: (SchedulerInstanceId, ContainerCreationAckMessage) => Future[RecordMetadata])(
52+
implicit actorSystem: ActorSystem,
53+
executionContext: ExecutionContext,
54+
logging: Logging) {
4955

5056
private val topic = s"${Invoker.topicPrefix}invoker${invokerInstanceId.toInt}"
5157
private val consumer =
@@ -54,11 +60,16 @@ class ContainerMessageConsumer(
5460
private def handler(bytes: Array[Byte]): Future[Unit] = Future {
5561
val raw = new String(bytes, StandardCharsets.UTF_8)
5662
ContainerMessage.parse(raw) match {
57-
case Success(creation: ContainerCreationMessage) =>
58-
implicit val transid: TransactionId = creation.transid
63+
case Success(creation: ContainerCreationMessage) if isWarmUpAction(creation.action) =>
5964
logging.info(
6065
this,
6166
s"container creation message for ${creation.invocationNamespace}/${creation.action} is received (creationId: ${creation.creationId})")
67+
feed ! MessageFeed.Processed
68+
69+
case Success(creation: ContainerCreationMessage) =>
70+
implicit val transid: TransactionId = creation.transid
71+
logging
72+
.info(this, s"container creation message for ${creation.invocationNamespace}/${creation.action} is received")
6273
WhiskAction
6374
.get(entityStore, creation.action.toDocId, creation.revision, fromCache = true)
6475
.map { action =>

tests/src/test/scala/org/apache/openwhisk/core/invoker/test/ContainerMessageConsumerTests.scala

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import akka.stream.ActorMaterializer
2424
import akka.testkit.{TestKit, TestProbe}
2525
import common.StreamLogging
2626
import org.apache.kafka.clients.producer.RecordMetadata
27-
import org.apache.openwhisk.common.TransactionId
27+
import org.apache.openwhisk.common.{Logging, TransactionId}
28+
import org.apache.openwhisk.core.{WarmUp, WhiskConfig}
2829
import org.apache.openwhisk.core.connector.ContainerCreationError._
2930
import org.apache.openwhisk.core.connector._
3031
import org.apache.openwhisk.core.connector.test.TestConnector
@@ -34,6 +35,7 @@ import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest
3435
import org.apache.openwhisk.core.entity._
3536
import org.apache.openwhisk.core.entity.size._
3637
import org.apache.openwhisk.core.entity.test.ExecHelpers
38+
import org.apache.openwhisk.core.invoker.ContainerMessageConsumer
3739
import org.apache.openwhisk.http.Messages
3840
import org.apache.openwhisk.utils.{retry => utilRetry}
3941
import org.junit.runner.RunWith
@@ -43,10 +45,11 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Match
4345

4446
import scala.concurrent.Future
4547
import scala.concurrent.duration._
48+
import scala.util.Try
4649

4750
@RunWith(classOf[JUnitRunner])
4851
class ContainerMessageConsumerTests
49-
extends TestKit(ActorSystem("ContainerMessageConsumer"))
52+
extends TestKit(ActorSystem("ContainerMessageConsumer"))
5053
with FlatSpecLike
5154
with Matchers
5255
with BeforeAndAfterEach
@@ -67,7 +70,16 @@ class ContainerMessageConsumerTests
6770
super.afterAll()
6871
}
6972

73+
private val whiskConfig = new WhiskConfig(
74+
Map(
75+
WhiskConfig.actionInvokePerMinuteLimit -> null,
76+
WhiskConfig.triggerFirePerMinuteLimit -> null,
77+
WhiskConfig.actionInvokeConcurrentLimit -> null,
78+
WhiskConfig.runtimesManifest -> null,
79+
WhiskConfig.actionSequenceMaxLimit -> null))
80+
7081
private val entityStore = WhiskEntityStore.datastore()
82+
private val producer = stub[MessageProducer]
7183

7284
private val defaultUserMemory: ByteSize = 1024.MB
7385
private val invokerInstance = InvokerInstanceId(0, userMemory = defaultUserMemory)
@@ -83,6 +95,27 @@ class ContainerMessageConsumerTests
8395
cleanup()
8496
}
8597

98+
private def fakeMessageProvider(consumer: TestConnector): MessagingProvider = {
99+
new MessagingProvider {
100+
override def getConsumer(
101+
whiskConfig: WhiskConfig,
102+
groupId: String,
103+
topic: String,
104+
maxPeek: Int,
105+
maxPollInterval: FiniteDuration)(implicit logging: Logging, actorSystem: ActorSystem): MessageConsumer =
106+
consumer
107+
108+
override def getProducer(config: WhiskConfig, maxRequestSize: Option[ByteSize])(
109+
implicit logging: Logging,
110+
actorSystem: ActorSystem): MessageProducer = consumer.getProducer()
111+
112+
override def ensureTopic(config: WhiskConfig,
113+
topic: String,
114+
topicConfig: String,
115+
maxMessageBytes: Option[ByteSize])(implicit logging: Logging): Try[Unit] = Try {}
116+
}
117+
}
118+
86119
def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
87120
ackMessage: ContainerCreationAckMessage): Future[RecordMetadata] = {
88121
val topic = s"creationAck${schedulerInstanceId.asString}"
@@ -110,6 +143,18 @@ class ContainerMessageConsumerTests
110143
it should "forward ContainerCreationMessage to containerPool" in {
111144
val pool = TestProbe()
112145
val mockConsumer = new TestConnector("fakeTopic", 4, true)
146+
val msgProvider = fakeMessageProvider(mockConsumer)
147+
148+
val consumer =
149+
new ContainerMessageConsumer(
150+
invokerInstance,
151+
pool.ref,
152+
entityStore,
153+
whiskConfig,
154+
msgProvider,
155+
200.milliseconds,
156+
500,
157+
sendAckToScheduler(producer))
113158

114159
val exec = CodeExecAsString(RuntimeManifest("nodejs:10", ImageName("testImage")), "testCode", None)
115160
val action =
@@ -150,10 +195,22 @@ class ContainerMessageConsumerTests
150195
it should "send ack(failed) to scheduler when failed to get action from DB " in {
151196
val pool = TestProbe()
152197
val creationConsumer = new TestConnector("creation", 4, true)
198+
val msgProvider = fakeMessageProvider(creationConsumer)
153199

154200
val ackTopic = "ack"
155201
val ackConsumer = new TestConnector(ackTopic, 4, true)
156202

203+
val consumer =
204+
new ContainerMessageConsumer(
205+
invokerInstance,
206+
pool.ref,
207+
entityStore,
208+
whiskConfig,
209+
msgProvider,
210+
200.milliseconds,
211+
500,
212+
sendAckToScheduler(ackConsumer.getProducer()))
213+
157214
val exec = CodeExecAsString(RuntimeManifest("nodejs:10", ImageName("testImage")), "testCode", None)
158215
val whiskAction =
159216
WhiskAction(EntityPath("testns"), EntityName("testAction2"), exec, limits = ActionLimits(TimeLimit(1.minute)))
@@ -213,4 +270,59 @@ class ContainerMessageConsumerTests
213270
pool.expectNoMessage(2.seconds)
214271
}
215272
}
273+
274+
it should "drop messages of warm-up action" in {
275+
val pool = TestProbe()
276+
val mockConsumer = new TestConnector("fakeTopic", 4, true)
277+
val msgProvider = fakeMessageProvider(mockConsumer)
278+
279+
val consumer =
280+
new ContainerMessageConsumer(
281+
invokerInstance,
282+
pool.ref,
283+
entityStore,
284+
whiskConfig,
285+
msgProvider,
286+
200.milliseconds,
287+
500,
288+
sendAckToScheduler(producer))
289+
290+
val exec = CodeExecAsString(RuntimeManifest("nodejs:10", ImageName("testImage")), "testCode", None)
291+
val action =
292+
WhiskAction(
293+
WarmUp.warmUpAction.namespace.toPath,
294+
WarmUp.warmUpAction.name,
295+
exec,
296+
limits = ActionLimits(TimeLimit(1.minute)))
297+
val doc = put(entityStore, action)
298+
val execMetadata =
299+
CodeExecMetaDataAsString(exec.manifest, entryPoint = exec.entryPoint)
300+
301+
val actionMetadata =
302+
WhiskActionMetaData(
303+
action.namespace,
304+
action.name,
305+
execMetadata,
306+
action.parameters,
307+
action.limits,
308+
action.version,
309+
action.publish,
310+
action.annotations)
311+
312+
val msg =
313+
ContainerCreationMessage(
314+
transId,
315+
invocationNamespace.asString,
316+
action.fullyQualifiedName(false),
317+
DocRevision.empty,
318+
actionMetadata,
319+
schedulerInstanceId,
320+
schedulerHost,
321+
rpcPort,
322+
creationId = creationId)
323+
324+
mockConsumer.send(msg)
325+
326+
pool.expectNoMessage(1.seconds)
327+
}
216328
}

0 commit comments

Comments
 (0)