Skip to content

Commit b4a7717

Browse files
committed
Change string to FiniteDuration
1 parent cc9c54f commit b4a7717

File tree

7 files changed

+52
-52
lines changed

7 files changed

+52
-52
lines changed

ansible/group_vars/all

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ scheduler:
467467
protocol: "{{ scheduler_protocol | default('http') }}"
468468
maxPeek: "{{ scheduler_max_peek | default(128) }}"
469469
queueManager:
470-
maxSchedulingTimeMs: "{{ scheduler_maxSchedulingTimeMs | default(20000) }}"
470+
maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
471471
maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"
472472
dataManagementService:
473473
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ case class ActivationMessage(override val transid: TransactionId,
6060
lockedArgs: Map[String, String] = Map.empty,
6161
cause: Option[ActivationId] = None,
6262
traceContext: Option[Map[String, String]] = None)
63-
extends Message {
63+
extends Message {
6464

6565
override def serialize = ActivationMessage.serdes.write(this).compactPrint
6666

@@ -116,11 +116,11 @@ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Me
116116
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
117117
* Right when this message is created.
118118
*/
119-
case class CombinedCompletionAndResultMessage private(override val transid: TransactionId,
120-
response: Either[ActivationId, WhiskActivation],
121-
override val isSystemError: Option[Boolean],
122-
instance: InstanceId)
123-
extends AcknowledegmentMessage(transid) {
119+
case class CombinedCompletionAndResultMessage private (override val transid: TransactionId,
120+
response: Either[ActivationId, WhiskActivation],
121+
override val isSystemError: Option[Boolean],
122+
instance: InstanceId)
123+
extends AcknowledegmentMessage(transid) {
124124
override def messageType = "combined"
125125

126126
override def result = Some(response)
@@ -142,11 +142,11 @@ case class CombinedCompletionAndResultMessage private(override val transid: Tran
142142
* phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the
143143
* `CompletionMessage`.
144144
*/
145-
case class CompletionMessage private(override val transid: TransactionId,
146-
override val activationId: ActivationId,
147-
override val isSystemError: Option[Boolean],
148-
instance: InstanceId)
149-
extends AcknowledegmentMessage(transid) {
145+
case class CompletionMessage private (override val transid: TransactionId,
146+
override val activationId: ActivationId,
147+
override val isSystemError: Option[Boolean],
148+
instance: InstanceId)
149+
extends AcknowledegmentMessage(transid) {
150150
override def messageType = "completion"
151151

152152
override def result = None
@@ -168,8 +168,8 @@ case class CompletionMessage private(override val transid: TransactionId,
168168
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
169169
* Right when this message is created.
170170
*/
171-
case class ResultMessage private(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
172-
extends AcknowledegmentMessage(transid) {
171+
case class ResultMessage private (override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
172+
extends AcknowledegmentMessage(transid) {
173173
override def messageType = "result"
174174

175175
override def result = Some(response)
@@ -253,7 +253,7 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
253253
Left(value.convertTo[ActivationId])
254254

255255
case _: JsObject => Right(value.convertTo[WhiskActivation])
256-
case _ => deserializationError("could not read ResultMessage")
256+
case _ => deserializationError("could not read ResultMessage")
257257
}
258258
}
259259

@@ -296,7 +296,7 @@ object EventMessageBody extends DefaultJsonProtocol {
296296

297297
implicit val format = new JsonFormat[EventMessageBody] {
298298
def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
299-
case m: Metric => m.toJson
299+
case m: Metric => m.toJson
300300
case a: Activation => a.toJson
301301
}
302302

@@ -321,7 +321,7 @@ case class Activation(name: String,
321321
causedBy: Option[String],
322322
size: Option[Int] = None,
323323
userDefinedStatusCode: Option[Int] = None)
324-
extends EventMessageBody {
324+
extends EventMessageBody {
325325
val typeName = Activation.typeName
326326

327327
override def serialize = toJson.compactPrint
@@ -349,12 +349,12 @@ object Activation extends DefaultJsonProtocol {
349349
private implicit val durationFormat = new RootJsonFormat[Duration] {
350350
override def write(obj: Duration): JsValue = obj match {
351351
case o if o.isFinite => JsNumber(o.toMillis)
352-
case _ => JsNumber.zero
352+
case _ => JsNumber.zero
353353
}
354354

355355
override def read(json: JsValue): Duration = json match {
356356
case JsNumber(n) if n <= 0 => Duration.Zero
357-
case JsNumber(n) => toDuration(n.longValue)
357+
case JsNumber(n) => toDuration(n.longValue)
358358
}
359359
}
360360

@@ -437,7 +437,7 @@ case class EventMessage(source: String,
437437
userId: UUID,
438438
eventType: String,
439439
timestamp: Long = System.currentTimeMillis())
440-
extends Message {
440+
extends Message {
441441
override def serialize = EventMessage.format.write(this).compactPrint
442442
}
443443

@@ -460,7 +460,7 @@ case class InvokerResourceMessage(status: String,
460460
inProgressMemory: Long,
461461
tags: Seq[String],
462462
dedicatedNamespaces: Seq[String])
463-
extends Message {
463+
extends Message {
464464

465465
/**
466466
* Serializes message to string. Must be idempotent.
@@ -502,7 +502,7 @@ object InvokerResourceMessage extends DefaultJsonProtocol {
502502
object StatusQuery
503503

504504
case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
505-
extends Message {
505+
extends Message {
506506

507507
override def serialize: String = StatusData.serdes.write(this).compactPrint
508508

@@ -524,7 +524,7 @@ case class ContainerCreationMessage(override val transid: TransactionId,
524524
rpcPort: Int,
525525
retryCount: Int = 0,
526526
creationId: CreationId = CreationId.generate())
527-
extends ContainerMessage(transid) {
527+
extends ContainerMessage(transid) {
528528

529529
override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)
530530

@@ -556,7 +556,7 @@ case class ContainerDeletionMessage(override val transid: TransactionId,
556556
action: FullyQualifiedEntityName,
557557
revision: DocRevision,
558558
whiskActionMetaData: WhiskActionMetaData)
559-
extends ContainerMessage(transid) {
559+
extends ContainerMessage(transid) {
560560
override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)
561561

562562
override def serialize: String = toJson.compactPrint
@@ -640,17 +640,17 @@ object ContainerCreationError extends Enumeration {
640640
ZeroNamespaceLimit)
641641

642642
private def parse(name: String) = name.toUpperCase match {
643-
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
643+
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
644644
case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
645-
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
646-
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
647-
case "DBFETCHERROR" => DBFetchError
648-
case "WHISKERROR" => WhiskError
649-
case "BLACKBOXERROR" => BlackBoxError
650-
case "TIMEOUTERROR" => TimeoutError
651-
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
652-
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
653-
case "UNKNOWNERROR" => UnknownError
645+
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
646+
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
647+
case "DBFETCHERROR" => DBFetchError
648+
case "WHISKERROR" => WhiskError
649+
case "BLACKBOXERROR" => BlackBoxError
650+
case "TIMEOUTERROR" => TimeoutError
651+
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
652+
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
653+
case "UNKNOWNERROR" => UnknownError
654654
}
655655

656656
implicit val serds = new RootJsonFormat[ContainerCreationError] {
@@ -678,7 +678,7 @@ case class ContainerCreationAckMessage(override val transid: TransactionId,
678678
retryCount: Int = 0,
679679
error: Option[ContainerCreationError] = None,
680680
reason: Option[String] = None)
681-
extends Message {
681+
extends Message {
682682

683683
/**
684684
* Serializes message to string. Must be idempotent.

common/scala/src/main/scala/org/apache/openwhisk/core/entity/Size.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,15 @@ case class ByteSize(size: Long, unit: SizeUnits.Unit) extends Ordered[ByteSize]
123123

124124
override def equals(that: Any): Boolean = that match {
125125
case t: ByteSize => compareTo(t) == 0
126-
case _ => false
126+
case _ => false
127127
}
128128

129129
override def toString = {
130130
unit match {
131131
case SizeUnits.BYTE => s"$size B"
132-
case SizeUnits.KB => s"$size KB"
133-
case SizeUnits.MB => s"$size MB"
134-
case SizeUnits.GB => s"$size GB"
132+
case SizeUnits.KB => s"$size KB"
133+
case SizeUnits.MB => s"$size MB"
134+
case SizeUnits.GB => s"$size GB"
135135
}
136136
}
137137
}
@@ -190,7 +190,7 @@ object size {
190190

191191
def read(value: JsValue): ByteSize = value match {
192192
case JsString(s) => ByteSize.fromString(s)
193-
case _ => deserializationError(formatError)
193+
case _ => deserializationError(formatError)
194194
}
195195
}
196196
}

core/scheduler/src/main/resources/application.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ whisk {
2828

2929
scheduler {
3030
queue-manager {
31-
max-scheduling-time-ms = "20000"
31+
max-scheduling-time = "20 seconds"
3232
max-retries-to-get-queue = "13"
3333
}
3434
max-peek = "128"

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ case class CreateNewQueue(activationMessage: ActivationMessage,
5555
action: FullyQualifiedEntityName,
5656
actionMetadata: WhiskActionMetaData)
5757

58-
case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTimeMs: Int)
58+
case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)
5959

6060
class QueueManager(
6161
entityStore: ArtifactStore[WhiskEntity],
@@ -321,13 +321,13 @@ class QueueManager(
321321
implicit val transid = msg.transid
322322

323323
// Drop the message that has not been scheduled for a long time
324-
val schedulingWaitTime = Interval(msg.transid.meta.start, Instant.now()).duration.toMillis
325-
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME, schedulingWaitTime)
324+
val schedulingWaitTime = Interval(msg.transid.meta.start, Instant.now()).duration
325+
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_WAIT_TIME, schedulingWaitTime.toMillis)
326326

327-
if (schedulingWaitTime > queueManagerConfig.maxSchedulingTimeMs) {
327+
if (schedulingWaitTime > queueManagerConfig.maxSchedulingTime) {
328328
logging.warn(
329329
this,
330-
s"[${msg.activationId}] the activation message has not been scheduled for ${queueManagerConfig.maxSchedulingTimeMs}ms")
330+
s"[${msg.activationId}] the activation message has not been scheduled for ${queueManagerConfig.maxSchedulingTime.toSeconds} sec")
331331
completeErrorActivation(msg, "The activation has not been processed")
332332
} else {
333333
QueuePool.get(MemoryQueueKey(msg.user.namespace.name.asString, msg.action.toDocId.asDocInfo(msg.revision))) match {
@@ -488,7 +488,7 @@ class QueueManager(
488488
}
489489
}
490490

491-
private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds) (()=>{
491+
private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
492492
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
493493
})
494494

tests/src/test/resources/application.conf.j2

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ whisk {
141141
scheduler {
142142
protocol = "{{ scheduler.protocol }}"
143143
queue-manager {
144-
max-scheduling-time-ms = "{{ scheduler.queueManager.maxSchedulingTimeMs }}"
144+
max-scheduling-time = "{{ scheduler.queueManager.maxSchedulingTime }}"
145145
max-retries-to-get-queue = "{{ scheduler.queueManager.maxRetriesToGetQueue }}"
146146
}
147147
max-peek = "{{ scheduler.maxPeek }}"

tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ class QueueManagerTests
520520
store,
521521
childFactory,
522522
mockConsumer,
523-
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTimeMs = 10000)))
523+
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
524524

525525
queueManager ! activationMessage
526526
Thread.sleep(100)
@@ -552,7 +552,7 @@ class QueueManagerTests
552552
store,
553553
childFactory,
554554
mockConsumer,
555-
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTimeMs = 10000)))
555+
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
556556

557557
queueManager ! activationMessage
558558
Thread.sleep(100)
@@ -593,7 +593,7 @@ class QueueManagerTests
593593
store,
594594
childFactory,
595595
mockConsumer,
596-
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTimeMs = 10000)))
596+
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
597597

598598
// send old activation message
599599
queueManager ! oldActivationMessage
@@ -638,7 +638,7 @@ class QueueManagerTests
638638
store,
639639
childFactory,
640640
mockConsumer,
641-
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTimeMs = 10000)))
641+
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
642642

643643
// send old activation message
644644
queueManager ! oldActivationMessage

0 commit comments

Comments
 (0)