@@ -60,7 +60,7 @@ case class ActivationMessage(override val transid: TransactionId,
60
60
lockedArgs : Map [String , String ] = Map .empty,
61
61
cause : Option [ActivationId ] = None ,
62
62
traceContext : Option [Map [String , String ]] = None )
63
- extends Message {
63
+ extends Message {
64
64
65
65
override def serialize = ActivationMessage .serdes.write(this ).compactPrint
66
66
@@ -78,6 +78,7 @@ case class ActivationMessage(override val transid: TransactionId,
78
78
*/
79
79
abstract class AcknowledegmentMessage (private val tid : TransactionId ) extends Message {
80
80
override val transid : TransactionId = tid
81
+
81
82
override def serialize : String = AcknowledegmentMessage .serdes.write(this ).compactPrint
82
83
83
84
/** Pithy descriptor for logging. */
@@ -115,17 +116,23 @@ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Me
115
116
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
116
117
* Right when this message is created.
117
118
*/
118
- case class CombinedCompletionAndResultMessage private (override val transid : TransactionId ,
119
- response : Either [ActivationId , WhiskActivation ],
120
- override val isSystemError : Option [Boolean ],
121
- instance : InstanceId )
122
- extends AcknowledegmentMessage (transid) {
119
+ case class CombinedCompletionAndResultMessage private (override val transid : TransactionId ,
120
+ response : Either [ActivationId , WhiskActivation ],
121
+ override val isSystemError : Option [Boolean ],
122
+ instance : InstanceId )
123
+ extends AcknowledegmentMessage (transid) {
123
124
override def messageType = " combined"
125
+
124
126
override def result = Some (response)
127
+
125
128
override def isSlotFree = Some (instance)
129
+
126
130
override def activationId = response.fold(identity, _.activationId)
131
+
127
132
override def toJson = CombinedCompletionAndResultMessage .serdes.write(this )
133
+
128
134
override def shrink = copy(response = response.flatMap(a => Left (a.activationId)))
135
+
129
136
override def toString = activationId.asString
130
137
}
131
138
@@ -135,16 +142,21 @@ case class CombinedCompletionAndResultMessage private (override val transid: Tra
135
142
* phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the
136
143
* `CompletionMessage`.
137
144
*/
138
- case class CompletionMessage private (override val transid : TransactionId ,
139
- override val activationId : ActivationId ,
140
- override val isSystemError : Option [Boolean ],
141
- instance : InstanceId )
142
- extends AcknowledegmentMessage (transid) {
145
+ case class CompletionMessage private (override val transid : TransactionId ,
146
+ override val activationId : ActivationId ,
147
+ override val isSystemError : Option [Boolean ],
148
+ instance : InstanceId )
149
+ extends AcknowledegmentMessage (transid) {
143
150
override def messageType = " completion"
151
+
144
152
override def result = None
153
+
145
154
override def isSlotFree = Some (instance)
155
+
146
156
override def toJson = CompletionMessage .serdes.write(this )
157
+
147
158
override def shrink = this
159
+
148
160
override def toString = activationId.asString
149
161
}
150
162
@@ -156,15 +168,22 @@ case class CompletionMessage private (override val transid: TransactionId,
156
168
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
157
169
* Right when this message is created.
158
170
*/
159
- case class ResultMessage private (override val transid : TransactionId , response : Either [ActivationId , WhiskActivation ])
160
- extends AcknowledegmentMessage (transid) {
171
+ case class ResultMessage private (override val transid : TransactionId , response : Either [ActivationId , WhiskActivation ])
172
+ extends AcknowledegmentMessage (transid) {
161
173
override def messageType = " result"
174
+
162
175
override def result = Some (response)
176
+
163
177
override def isSlotFree = None
178
+
164
179
override def isSystemError = response.fold(_ => None , a => Some (a.response.isWhiskError))
180
+
165
181
override def activationId = response.fold(identity, _.activationId)
182
+
166
183
override def toJson = ResultMessage .serdes.write(this )
184
+
167
185
override def shrink = copy(response = response.flatMap(a => Left (a.activationId)))
186
+
168
187
override def toString = activationId.asString
169
188
}
170
189
@@ -234,7 +253,7 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
234
253
Left (value.convertTo[ActivationId ])
235
254
236
255
case _ : JsObject => Right (value.convertTo[WhiskActivation ])
237
- case _ => deserializationError(" could not read ResultMessage" )
256
+ case _ => deserializationError(" could not read ResultMessage" )
238
257
}
239
258
}
240
259
@@ -265,6 +284,7 @@ case class PingMessage(instance: InvokerInstanceId) extends Message {
265
284
266
285
object PingMessage extends DefaultJsonProtocol {
267
286
def parse (msg : String ) = Try (serdes.read(msg.parseJson))
287
+
268
288
implicit val serdes = jsonFormat(PingMessage .apply _, " name" )
269
289
}
270
290
@@ -276,7 +296,7 @@ object EventMessageBody extends DefaultJsonProtocol {
276
296
277
297
implicit val format = new JsonFormat [EventMessageBody ] {
278
298
def write (eventMessageBody : EventMessageBody ) = eventMessageBody match {
279
- case m : Metric => m.toJson
299
+ case m : Metric => m.toJson
280
300
case a : Activation => a.toJson
281
301
}
282
302
@@ -301,9 +321,11 @@ case class Activation(name: String,
301
321
causedBy : Option [String ],
302
322
size : Option [Int ] = None ,
303
323
userDefinedStatusCode : Option [Int ] = None )
304
- extends EventMessageBody {
324
+ extends EventMessageBody {
305
325
val typeName = Activation .typeName
326
+
306
327
override def serialize = toJson.compactPrint
328
+
307
329
def entityPath : FullyQualifiedEntityName = EntityPath (name).toFullyQualifiedEntityName
308
330
309
331
def toJson = Activation .activationFormat.write(this )
@@ -327,12 +349,12 @@ object Activation extends DefaultJsonProtocol {
327
349
private implicit val durationFormat = new RootJsonFormat [Duration ] {
328
350
override def write (obj : Duration ): JsValue = obj match {
329
351
case o if o.isFinite => JsNumber (o.toMillis)
330
- case _ => JsNumber .zero
352
+ case _ => JsNumber .zero
331
353
}
332
354
333
355
override def read (json : JsValue ): Duration = json match {
334
356
case JsNumber (n) if n <= 0 => Duration .Zero
335
- case JsNumber (n) => toDuration(n.longValue)
357
+ case JsNumber (n) => toDuration(n.longValue)
336
358
}
337
359
}
338
360
@@ -352,7 +374,7 @@ object Activation extends DefaultJsonProtocol {
352
374
" size" ,
353
375
" userDefinedStatusCode" )
354
376
355
- /** Get "StatusCode" from result response set by action developer **/
377
+ /** Get "StatusCode" from result response set by action developer * */
356
378
def userDefinedStatusCode (result : Option [JsValue ]): Option [Int ] = {
357
379
val statusCode = JsHelpers
358
380
.getFieldPath(result.get.asJsObject, ERROR_FIELD , " statusCode" )
@@ -394,13 +416,17 @@ object Activation extends DefaultJsonProtocol {
394
416
395
417
case class Metric (metricName : String , metricValue : Long ) extends EventMessageBody {
396
418
val typeName = " Metric"
419
+
397
420
override def serialize = toJson.compactPrint
421
+
398
422
def toJson = Metric .metricFormat.write(this ).asJsObject
399
423
}
400
424
401
425
object Metric extends DefaultJsonProtocol {
402
426
val typeName = " Metric"
427
+
403
428
def parse (msg : String ) = Try (metricFormat.read(msg.parseJson))
429
+
404
430
implicit val metricFormat = jsonFormat(Metric .apply _, " metricName" , " metricValue" )
405
431
}
406
432
@@ -411,7 +437,7 @@ case class EventMessage(source: String,
411
437
userId : UUID ,
412
438
eventType : String ,
413
439
timestamp : Long = System .currentTimeMillis())
414
- extends Message {
440
+ extends Message {
415
441
override def serialize = EventMessage .format.write(this ).compactPrint
416
442
}
417
443
@@ -434,7 +460,7 @@ case class InvokerResourceMessage(status: String,
434
460
inProgressMemory : Long ,
435
461
tags : Seq [String ],
436
462
dedicatedNamespaces : Seq [String ])
437
- extends Message {
463
+ extends Message {
438
464
439
465
/**
440
466
* Serializes message to string. Must be idempotent.
@@ -444,6 +470,7 @@ case class InvokerResourceMessage(status: String,
444
470
445
471
object InvokerResourceMessage extends DefaultJsonProtocol {
446
472
def parse (msg : String ): Try [InvokerResourceMessage ] = Try (serdes.read(msg.parseJson))
473
+
447
474
implicit val serdes =
448
475
jsonFormat(
449
476
InvokerResourceMessage .apply _,
@@ -462,23 +489,25 @@ object InvokerResourceMessage extends DefaultJsonProtocol {
462
489
*
463
490
* [
464
491
* ...
465
- * {
466
- * "data": "RunningData",
467
- * "fqn": "whisk.system/elasticsearch/[email protected] ",
468
- * "invocationNamespace": "style95",
469
- * "status": "Running",
470
- * "waitingActivation": 1
471
- * },
492
+ * {
493
+ * "data": "RunningData",
494
+ * "fqn": "whisk.system/elasticsearch/[email protected] ",
495
+ * "invocationNamespace": "style95",
496
+ * "status": "Running",
497
+ * "waitingActivation": 1
498
+ * },
472
499
* ...
473
500
* ]
474
501
*/
475
502
object StatusQuery
503
+
476
504
case class StatusData (invocationNamespace : String , fqn : String , waitingActivation : Int , status : String , data : String )
477
- extends Message {
505
+ extends Message {
478
506
479
507
override def serialize : String = StatusData .serdes.write(this ).compactPrint
480
508
481
509
}
510
+
482
511
object StatusData extends DefaultJsonProtocol {
483
512
484
513
implicit val serdes =
@@ -495,9 +524,10 @@ case class ContainerCreationMessage(override val transid: TransactionId,
495
524
rpcPort : Int ,
496
525
retryCount : Int = 0 ,
497
526
creationId : CreationId = CreationId .generate())
498
- extends ContainerMessage (transid) {
527
+ extends ContainerMessage (transid) {
499
528
500
529
override def toJson : JsValue = ContainerCreationMessage .serdes.write(this )
530
+
501
531
override def serialize : String = toJson.compactPrint
502
532
}
503
533
@@ -526,8 +556,9 @@ case class ContainerDeletionMessage(override val transid: TransactionId,
526
556
action : FullyQualifiedEntityName ,
527
557
revision : DocRevision ,
528
558
whiskActionMetaData : WhiskActionMetaData )
529
- extends ContainerMessage (transid) {
559
+ extends ContainerMessage (transid) {
530
560
override def toJson : JsValue = ContainerDeletionMessage .serdes.write(this )
561
+
531
562
override def serialize : String = toJson.compactPrint
532
563
}
533
564
@@ -544,6 +575,7 @@ object ContainerDeletionMessage extends DefaultJsonProtocol {
544
575
545
576
abstract class ContainerMessage (private val tid : TransactionId ) extends Message {
546
577
override val transid : TransactionId = tid
578
+
547
579
override def serialize : String = ContainerMessage .serdes.write(this ).compactPrint
548
580
549
581
/** Serializes the message to JSON. */
@@ -569,18 +601,31 @@ object ContainerMessage extends DefaultJsonProtocol {
569
601
}
570
602
571
603
sealed trait ContainerCreationError
604
+
572
605
object ContainerCreationError extends Enumeration {
606
+
573
607
case object NoAvailableInvokersError extends ContainerCreationError
608
+
574
609
case object NoAvailableResourceInvokersError extends ContainerCreationError
610
+
575
611
case object ResourceNotEnoughError extends ContainerCreationError
612
+
576
613
case object WhiskError extends ContainerCreationError
614
+
577
615
case object UnknownError extends ContainerCreationError
616
+
578
617
case object TimeoutError extends ContainerCreationError
618
+
579
619
case object ShuttingDownError extends ContainerCreationError
620
+
580
621
case object NonExecutableActionError extends ContainerCreationError
622
+
581
623
case object DBFetchError extends ContainerCreationError
624
+
582
625
case object BlackBoxError extends ContainerCreationError
626
+
583
627
case object ZeroNamespaceLimit extends ContainerCreationError
628
+
584
629
case object TooManyConcurrentRequests extends ContainerCreationError
585
630
586
631
val whiskErrors : Set [ContainerCreationError ] =
@@ -594,26 +639,27 @@ object ContainerCreationError extends Enumeration {
594
639
TimeoutError ,
595
640
ZeroNamespaceLimit )
596
641
597
- def fromName (name : String ) = name.toUpperCase match {
598
- case " NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
642
+ private def parse (name : String ) = name.toUpperCase match {
643
+ case " NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
599
644
case " NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
600
- case " RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
601
- case " NONEXECUTBLEACTIONERROR" => NonExecutableActionError
602
- case " DBFETCHERROR" => DBFetchError
603
- case " WHISKERROR" => WhiskError
604
- case " BLACKBOXERROR" => BlackBoxError
605
- case " TIMEOUTERROR" => TimeoutError
606
- case " ZERONAMESPACELIMIT" => ZeroNamespaceLimit
607
- case " TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
608
- case " UNKNOWNERROR" => UnknownError
645
+ case " RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
646
+ case " NONEXECUTBLEACTIONERROR" => NonExecutableActionError
647
+ case " DBFETCHERROR" => DBFetchError
648
+ case " WHISKERROR" => WhiskError
649
+ case " BLACKBOXERROR" => BlackBoxError
650
+ case " TIMEOUTERROR" => TimeoutError
651
+ case " ZERONAMESPACELIMIT" => ZeroNamespaceLimit
652
+ case " TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
653
+ case " UNKNOWNERROR" => UnknownError
609
654
}
610
655
611
656
implicit val serds = new RootJsonFormat [ContainerCreationError ] {
612
657
override def write (error : ContainerCreationError ): JsValue = JsString (error.toString)
658
+
613
659
override def read (json : JsValue ): ContainerCreationError =
614
660
Try {
615
661
val JsString (str) = json
616
- ContainerCreationError .fromName (str.trim.toUpperCase)
662
+ ContainerCreationError .parse (str.trim.toUpperCase)
617
663
} getOrElse {
618
664
throw deserializationError(" ContainerCreationError must be a valid string" )
619
665
}
@@ -632,7 +678,7 @@ case class ContainerCreationAckMessage(override val transid: TransactionId,
632
678
retryCount : Int = 0 ,
633
679
error : Option [ContainerCreationError ] = None ,
634
680
reason : Option [String ] = None )
635
- extends Message {
681
+ extends Message {
636
682
637
683
/**
638
684
* Serializes message to string. Must be idempotent.
@@ -642,6 +688,7 @@ case class ContainerCreationAckMessage(override val transid: TransactionId,
642
688
643
689
object ContainerCreationAckMessage extends DefaultJsonProtocol {
644
690
def parse (msg : String ): Try [ContainerCreationAckMessage ] = Try (serdes.read(msg.parseJson))
691
+
645
692
private implicit val fqnSerdes = FullyQualifiedEntityName .serdes
646
693
private implicit val byteSizeSerdes = size.serdes
647
694
implicit val serdes = jsonFormat12(ContainerCreationAckMessage .apply)
0 commit comments