Skip to content

Commit 8759cad

Browse files
authored
Change AcknowledegmentMessage interface (#4898)
* Add instance type to serde in InstanceId * Add serde test case * Fix type mismatch
1 parent a44db4b commit 8759cad

File tree

5 files changed

+193
-31
lines changed

5 files changed

+193
-31
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Me
8383
def messageType: String
8484

8585
/** Does message indicate slot is free? */
86-
def isSlotFree: Option[InvokerInstanceId]
86+
def isSlotFree: Option[InstanceId]
8787

8888
/** Does message contain a result? */
8989
def result: Option[Either[ActivationId, WhiskActivation]]
@@ -117,11 +117,11 @@ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Me
117117
case class CombinedCompletionAndResultMessage private (override val transid: TransactionId,
118118
response: Either[ActivationId, WhiskActivation],
119119
override val isSystemError: Option[Boolean],
120-
invoker: InvokerInstanceId)
120+
instance: InstanceId)
121121
extends AcknowledegmentMessage(transid) {
122122
override def messageType = "combined"
123123
override def result = Some(response)
124-
override def isSlotFree = Some(invoker)
124+
override def isSlotFree = Some(instance)
125125
override def activationId = response.fold(identity, _.activationId)
126126
override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
127127
override def shrink = copy(response = response.flatMap(a => Left(a.activationId)))
@@ -137,11 +137,11 @@ case class CombinedCompletionAndResultMessage private (override val transid: Tra
137137
case class CompletionMessage private (override val transid: TransactionId,
138138
override val activationId: ActivationId,
139139
override val isSystemError: Option[Boolean],
140-
invoker: InvokerInstanceId)
140+
instance: InstanceId)
141141
extends AcknowledegmentMessage(transid) {
142142
override def messageType = "completion"
143143
override def result = None
144-
override def isSlotFree = Some(invoker)
144+
override def isSlotFree = Some(instance)
145145
override def toJson = CompletionMessage.serdes.write(this)
146146
override def shrink = this
147147
override def toString = activationId.asString
@@ -179,34 +179,34 @@ object CombinedCompletionAndResultMessage extends DefaultJsonProtocol {
179179
private def apply(transid: TransactionId,
180180
activation: Either[ActivationId, WhiskActivation],
181181
isSystemError: Option[Boolean],
182-
invoker: InvokerInstanceId): CombinedCompletionAndResultMessage =
183-
new CombinedCompletionAndResultMessage(transid, activation, isSystemError, invoker)
182+
instance: InstanceId): CombinedCompletionAndResultMessage =
183+
new CombinedCompletionAndResultMessage(transid, activation, isSystemError, instance)
184184

185185
def apply(transid: TransactionId,
186186
activation: WhiskActivation,
187-
invoker: InvokerInstanceId): CombinedCompletionAndResultMessage =
188-
new CombinedCompletionAndResultMessage(transid, Right(activation), Some(activation.response.isWhiskError), invoker)
187+
instance: InstanceId): CombinedCompletionAndResultMessage =
188+
new CombinedCompletionAndResultMessage(transid, Right(activation), Some(activation.response.isWhiskError), instance)
189189

190190
implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
191191
implicit val serdes = jsonFormat4(
192192
CombinedCompletionAndResultMessage
193-
.apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _: Option[Boolean], _: InvokerInstanceId))
193+
.apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _: Option[Boolean], _: InstanceId))
194194
}
195195

196196
object CompletionMessage extends DefaultJsonProtocol {
197197
// this constructor is restricted to ensure the message is always created with certain invariants
198198
private def apply(transid: TransactionId,
199199
activation: WhiskActivation,
200200
isSystemError: Option[Boolean],
201-
invoker: InvokerInstanceId): CompletionMessage =
202-
new CompletionMessage(transid, activation.activationId, Some(activation.response.isWhiskError), invoker)
201+
instance: InstanceId): CompletionMessage =
202+
new CompletionMessage(transid, activation.activationId, Some(activation.response.isWhiskError), instance)
203203

204-
def apply(transid: TransactionId, activation: WhiskActivation, invoker: InvokerInstanceId): CompletionMessage = {
205-
new CompletionMessage(transid, activation.activationId, Some(activation.response.isWhiskError), invoker)
204+
def apply(transid: TransactionId, activation: WhiskActivation, instance: InstanceId): CompletionMessage = {
205+
new CompletionMessage(transid, activation.activationId, Some(activation.response.isWhiskError), instance)
206206
}
207207

208208
implicit val serdes = jsonFormat4(
209-
CompletionMessage.apply(_: TransactionId, _: ActivationId, _: Option[Boolean], _: InvokerInstanceId))
209+
CompletionMessage.apply(_: TransactionId, _: ActivationId, _: Option[Boolean], _: InstanceId))
210210
}
211211

212212
object ResultMessage extends DefaultJsonProtocol {
@@ -245,7 +245,7 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
245245
// and otherwise to a ResultMessage. If all conversions fail, an error will be thrown that needs to be handled.
246246
override def read(json: JsValue): AcknowledegmentMessage = {
247247
val JsObject(fields) = json
248-
val completion = fields.contains("invoker")
248+
val completion = fields.contains("instance")
249249
val result = fields.contains("response")
250250
if (completion && result) {
251251
json.convertTo[CombinedCompletionAndResultMessage]

common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717

1818
package org.apache.openwhisk.core.entity
1919

20-
import spray.json.DefaultJsonProtocol
20+
import spray.json.{deserializationError, DefaultJsonProtocol, JsNumber, JsObject, JsString, JsValue, RootJsonFormat}
21+
import spray.json._
22+
23+
import scala.collection.mutable.ListBuffer
24+
import scala.util.Try
2125

2226
/**
2327
* An instance id representing an invoker
@@ -38,6 +42,8 @@ case class InvokerInstanceId(val instance: Int,
3842
override val source = s"$instanceType$instance"
3943

4044
override val toString: String = (Seq("invoker" + instance) ++ uniqueName ++ displayedName).mkString("/")
45+
46+
override val toJson: JsValue = InvokerInstanceId.serdes.write(this)
4147
}
4248

4349
case class ControllerInstanceId(asString: String) extends InstanceId {
@@ -47,15 +53,61 @@ case class ControllerInstanceId(asString: String) extends InstanceId {
4753
override val source = s"$instanceType$asString"
4854

4955
override val toString: String = source
56+
57+
override val toJson: JsValue = ControllerInstanceId.serdes.write(this)
5058
}
5159

5260
object InvokerInstanceId extends DefaultJsonProtocol {
53-
import org.apache.openwhisk.core.entity.size.{serdes => xserds}
54-
implicit val serdes = jsonFormat(InvokerInstanceId.apply, "instance", "uniqueName", "displayedName", "userMemory")
61+
def parse(c: String): Try[InvokerInstanceId] = Try(serdes.read(c.parseJson))
62+
63+
implicit val serdes = new RootJsonFormat[InvokerInstanceId] {
64+
override def write(i: InvokerInstanceId): JsValue = {
65+
val fields = new ListBuffer[(String, JsValue)]
66+
fields ++= List("instance" -> JsNumber(i.instance))
67+
fields ++= List("userMemory" -> JsString(i.userMemory.toString))
68+
fields ++= List("instanceType" -> JsString(i.instanceType))
69+
i.uniqueName.foreach(uniqueName => fields ++= List("uniqueName" -> JsString(uniqueName)))
70+
i.displayedName.foreach(displayedName => fields ++= List("displayedName" -> JsString(displayedName)))
71+
JsObject(fields.toSeq: _*)
72+
}
73+
74+
override def read(json: JsValue): InvokerInstanceId = {
75+
val instance = fromField[Int](json, "instance")
76+
val uniqueName = fromField[Option[String]](json, "uniqueName")
77+
val displayedName = fromField[Option[String]](json, "displayedName")
78+
val userMemory = fromField[String](json, "userMemory")
79+
val instanceType = fromField[String](json, "instanceType")
80+
81+
if (instanceType == "invoker") {
82+
new InvokerInstanceId(instance, uniqueName, displayedName, ByteSize.fromString(userMemory))
83+
} else {
84+
deserializationError("could not read InvokerInstanceId")
85+
}
86+
}
87+
}
88+
5589
}
5690

5791
object ControllerInstanceId extends DefaultJsonProtocol {
58-
implicit val serdes = jsonFormat(ControllerInstanceId.apply _, "asString")
92+
def parse(c: String): Try[ControllerInstanceId] = Try(serdes.read(c.parseJson))
93+
94+
implicit val serdes = new RootJsonFormat[ControllerInstanceId] {
95+
override def write(c: ControllerInstanceId): JsValue =
96+
JsObject("asString" -> JsString(c.asString), "instanceType" -> JsString(c.instanceType))
97+
98+
override def read(json: JsValue): ControllerInstanceId = {
99+
json.asJsObject.getFields("asString", "instanceType") match {
100+
case Seq(JsString(asString), JsString(instanceType)) =>
101+
if (instanceType == "controller") {
102+
new ControllerInstanceId(asString)
103+
} else {
104+
deserializationError("could not read ControllerInstanceId")
105+
}
106+
case _ =>
107+
deserializationError("could not read ControllerInstanceId")
108+
}
109+
}
110+
}
59111
}
60112

61113
trait InstanceId {
@@ -67,6 +119,8 @@ trait InstanceId {
67119
// reserve some number of characters as the prefix to be added to topic names
68120
private val MAX_NAME_LENGTH = 249 - 121
69121

122+
def serialize: String = InstanceId.serdes.write(this).compactPrint
123+
70124
def validate(asString: String): Unit =
71125
require(
72126
asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS),
@@ -76,4 +130,28 @@ trait InstanceId {
76130

77131
val source: String
78132

133+
val toJson: JsValue
134+
}
135+
136+
object InstanceId extends DefaultJsonProtocol {
137+
def parse(i: String): Try[InstanceId] = Try(serdes.read(i.parseJson))
138+
139+
implicit val serdes = new RootJsonFormat[InstanceId] {
140+
override def write(i: InstanceId): JsValue = i.toJson
141+
142+
override def read(json: JsValue): InstanceId = {
143+
val JsObject(field) = json
144+
field
145+
.get("instanceType")
146+
.map(_.convertTo[String] match {
147+
case "invoker" =>
148+
json.convertTo[InvokerInstanceId]
149+
case "controller" =>
150+
json.convertTo[ControllerInstanceId]
151+
case _ =>
152+
deserializationError("could not read InstanceId")
153+
})
154+
.getOrElse(deserializationError("could not read InstanceId"))
155+
}
156+
}
79157
}

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
148148
activationSlots.getOrElseUpdate(
149149
msg.activationId, {
150150
val timeoutHandler = actorSystem.scheduler.scheduleOnce(completionAckTimeout) {
151-
processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, invoker = instance)
151+
processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, instance = instance)
152152
}
153153

154154
// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
@@ -206,13 +206,13 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
206206
val raw = new String(bytes, StandardCharsets.UTF_8)
207207
AcknowledegmentMessage.parse(raw) match {
208208
case Success(acknowledegment) =>
209-
acknowledegment.isSlotFree.foreach { invoker =>
209+
acknowledegment.isSlotFree.foreach { instance =>
210210
processCompletion(
211211
acknowledegment.activationId,
212212
acknowledegment.transid,
213213
forced = false,
214214
isSystemError = acknowledegment.isSystemError.getOrElse(false),
215-
invoker)
215+
instance)
216216
}
217217

218218
acknowledegment.result.foreach { response =>
@@ -261,7 +261,12 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
261261
tid: TransactionId,
262262
forced: Boolean,
263263
isSystemError: Boolean,
264-
invoker: InvokerInstanceId): Unit = {
264+
instance: InstanceId): Unit = {
265+
266+
val invoker = instance match {
267+
case i: InvokerInstanceId => Some(i)
268+
case _ => None
269+
}
265270

266271
val invocationResult = if (forced) {
267272
InvocationFinishedResult.Timeout
@@ -283,7 +288,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
283288
totalActivationMemory.add(entry.memoryLimit.toMB * (-1))
284289
activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
285290

286-
releaseInvoker(invoker, entry)
291+
invoker.foreach(releaseInvoker(_, entry))
287292

288293
if (!forced) {
289294
entry.timeoutHandler.cancel()
@@ -305,7 +310,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
305310
val completionAckTimeout = calculateCompletionAckTimeout(entry.timeLimit)
306311
logging.warn(
307312
this,
308-
s"forced completion ack for '$aid', action '${entry.fullyQualifiedEntityName}' ($actionType), $blockingType, mem limit ${entry.memoryLimit.toMB} MB, time limit ${entry.timeLimit.toMillis} ms, completion ack timeout $completionAckTimeout from $invoker")(
313+
s"forced completion ack for '$aid', action '${entry.fullyQualifiedEntityName}' ($actionType), $blockingType, mem limit ${entry.memoryLimit.toMB} MB, time limit ${entry.timeLimit.toMillis} ms, completion ack timeout $completionAckTimeout from $instance")(
309314
tid)
310315

311316
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED)
@@ -314,25 +319,25 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
314319
// Completion acks that are received here are strictly from user actions - health actions are not part of
315320
// the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
316321
// guard this
317-
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
322+
invoker.foreach(invokerPool ! InvocationFinishedMessage(_, invocationResult))
318323
case None if tid == TransactionId.invokerHealth =>
319324
// Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result
320325
// is important to pass to the invokerPool because they are used to determine if the invoker can be considered
321326
// healthy again.
322-
logging.info(this, s"received completion ack for health action on $invoker")(tid)
327+
logging.info(this, s"received completion ack for health action on $instance")(tid)
323328

324329
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_HEALTHCHECK)
325330

326331
// guard this
327-
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
332+
invoker.foreach(invokerPool ! InvocationFinishedMessage(_, invocationResult))
328333
case None if !forced =>
329334
// Received a completion ack that has already been taken out of the state because of a timeout (forced ack).
330335
// The result is ignored because a timeout has already been reported to the invokerPool per the force.
331336
// Logging this condition as a warning because the invoker processed the activation and sent a completion
332337
// message - but not in time.
333338
logging.warn(
334339
this,
335-
s"received completion ack for '$aid' from $invoker which has no entry, system error=$isSystemError")(tid)
340+
s"received completion ack for '$aid' from $instance which has no entry, system error=$isSystemError")(tid)
336341

337342
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED)
338343
case None =>

tests/src/test/scala/org/apache/openwhisk/core/entity/test/ControllerInstanceIdTests.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ import org.junit.runner.RunWith
2121
import org.scalatest.FlatSpec
2222
import org.scalatest.Matchers
2323
import org.scalatest.junit.JUnitRunner
24-
import org.apache.openwhisk.core.entity.ControllerInstanceId
24+
import org.apache.openwhisk.core.entity.{ControllerInstanceId, InstanceId}
25+
import spray.json.{JsObject, JsString}
26+
27+
import scala.util.Success
2528

2629
@RunWith(classOf[JUnitRunner])
2730
class ControllerInstanceIdTests extends FlatSpec with Matchers {
@@ -43,4 +46,11 @@ class ControllerInstanceIdTests extends FlatSpec with Matchers {
4346
}
4447
}
4548

49+
it should "serialize and deserialize ControllerInstanceId" in {
50+
val i = ControllerInstanceId("controller0")
51+
i.serialize shouldBe JsObject("asString" -> JsString(i.asString), "instanceType" -> JsString(i.instanceType)).compactPrint
52+
i.serialize shouldBe i.toJson.compactPrint
53+
InstanceId.parse(i.serialize) shouldBe Success(i)
54+
}
55+
4656
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.entity.test
19+
20+
import org.apache.openwhisk.core.entity.size.SizeInt
21+
import org.apache.openwhisk.core.entity.{ByteSize, InstanceId, InvokerInstanceId}
22+
import org.junit.runner.RunWith
23+
import org.scalatest.junit.JUnitRunner
24+
import org.scalatest.{FlatSpec, Matchers}
25+
import spray.json.{JsNumber, JsObject, JsString}
26+
27+
import scala.util.Success
28+
29+
@RunWith(classOf[JUnitRunner])
30+
class InvokerInstanceIdTests extends FlatSpec with Matchers {
31+
32+
behavior of "InvokerInstanceIdTests"
33+
34+
val defaultUserMemory: ByteSize = 1024.MB
35+
it should "serialize and deserialize InvokerInstanceId" in {
36+
val i = InvokerInstanceId(0, userMemory = defaultUserMemory)
37+
i.serialize shouldBe JsObject(
38+
"instance" -> JsNumber(i.instance),
39+
"userMemory" -> JsString(i.userMemory.toString),
40+
"instanceType" -> JsString(i.instanceType)).compactPrint
41+
i.serialize shouldBe i.toJson.compactPrint
42+
InstanceId.parse(i.serialize) shouldBe Success(i)
43+
}
44+
45+
it should "serialize and deserialize InvokerInstanceId with optional field" in {
46+
val i1 = InvokerInstanceId(0, uniqueName = Some("uniqueInvoker"), userMemory = defaultUserMemory)
47+
i1.serialize shouldBe JsObject(
48+
"instance" -> JsNumber(i1.instance),
49+
"userMemory" -> JsString(i1.userMemory.toString),
50+
"instanceType" -> JsString(i1.instanceType),
51+
"uniqueName" -> JsString(i1.uniqueName.getOrElse(""))).compactPrint
52+
i1.serialize shouldBe i1.toJson.compactPrint
53+
InstanceId.parse(i1.serialize) shouldBe Success(i1)
54+
55+
val i2 = InvokerInstanceId(
56+
0,
57+
uniqueName = Some("uniqueInvoker"),
58+
displayedName = Some("displayedInvoker"),
59+
userMemory = defaultUserMemory)
60+
i2.serialize shouldBe JsObject(
61+
"instance" -> JsNumber(i2.instance),
62+
"userMemory" -> JsString(i2.userMemory.toString),
63+
"instanceType" -> JsString(i2.instanceType),
64+
"uniqueName" -> JsString(i2.uniqueName.getOrElse("")),
65+
"displayedName" -> JsString(i2.displayedName.getOrElse(""))).compactPrint
66+
i2.serialize shouldBe i2.toJson.compactPrint
67+
InstanceId.parse(i2.serialize) shouldBe Success(i2)
68+
}
69+
}

0 commit comments

Comments
 (0)