@@ -102,6 +102,11 @@ class QueueManagerTests
102
102
content = None )
103
103
val statusData = StatusData (testInvocationNamespace, testFQN.asString, 0 , " Running" , " RunningData" )
104
104
105
+ // update start time for activation to ensure it's not stale
106
+ def newActivation (start : Instant = Instant .now()): ActivationMessage = {
107
+ activationMessage.copy(transid = TransactionId (messageTransId.meta.copy(start = start)))
108
+ }
109
+
105
110
val activationResponse = ActivationResponse (Right (activationMessage))
106
111
107
112
val ack = new ActiveAck {
@@ -126,7 +131,7 @@ class QueueManagerTests
126
131
system.actorOf(Props (new Actor () {
127
132
override def receive : Receive = {
128
133
case GetActivation (_, _, _, _, _, _) =>
129
- sender ! ActivationResponse (Right (activationMessage ))
134
+ sender ! ActivationResponse (Right (newActivation() ))
130
135
case StatusQuery =>
131
136
sender ! statusData
132
137
}
@@ -391,13 +396,13 @@ class QueueManagerTests
391
396
392
397
// got a message but no queue created on this scheduler
393
398
// it should try to got leader key from etcd and forward this msg to remote queue, here is `schedulerEndpoints`
394
- queueManager ! activationMessage
399
+ queueManager ! newActivation()
395
400
stream.toString should include(s " send activation to remote queue, key: $leaderKey" )
396
401
stream.toString should include(s " add a new actor selection to a map with key: $leaderKey" )
397
402
stream.reset()
398
403
399
404
// got msg again, and it should get remote queue from memory instead of etcd
400
- val msg2 = activationMessage .copy(activationId = ActivationId .generate())
405
+ val msg2 = newActivation() .copy(activationId = ActivationId .generate())
401
406
queueManager ! msg2
402
407
stream.toString shouldNot include(s " send activation to remote queue, key: $leaderKey" )
403
408
}
@@ -594,7 +599,7 @@ class QueueManagerTests
594
599
true )
595
600
596
601
// the activationMessage's revision(1-test-revision) is older than current queue's revision(2-test-revision)
597
- queueManager ! activationMessage
602
+ queueManager ! newActivation()
598
603
599
604
stream.toString should include(s " it will be replaced with the latest revision and invoked " )
600
605
}
@@ -625,8 +630,8 @@ class QueueManagerTests
625
630
mockConsumer,
626
631
QueueManagerConfig (maxRetriesToGetQueue = 2 , maxSchedulingTime = 10 seconds)))
627
632
628
- queueManager ! activationMessage
629
- Thread .sleep(1000 )
633
+ queueManager ! newActivation()
634
+ Thread .sleep(100 )
630
635
(mockEtcdClient.get _) verify (* ) repeated (3 )
631
636
}
632
637
@@ -657,7 +662,7 @@ class QueueManagerTests
657
662
mockConsumer,
658
663
QueueManagerConfig (maxRetriesToGetQueue = 2 , maxSchedulingTime = 10 seconds)))
659
664
660
- queueManager ! activationMessage.copy(transid = TransactionId (messageTransId.meta.copy(start = Instant .now())) )
665
+ queueManager ! newActivation( )
661
666
Thread .sleep(100 )
662
667
(mockEtcdClient.get _) verify (* ) repeated (3 )
663
668
}
@@ -753,7 +758,7 @@ class QueueManagerTests
753
758
}
754
759
755
760
val oldNow = Instant .now(Clock .systemUTC()).minusMillis(11000 )
756
- val oldActivationMessage = activationMessage.copy(transid = TransactionId (messageTransId.meta.copy(start = oldNow)) )
761
+ val oldActivationMessage = newActivation( oldNow)
757
762
758
763
val queueManager =
759
764
TestActorRef (
@@ -807,7 +812,7 @@ class QueueManagerTests
807
812
}
808
813
809
814
val oldNow = Instant .now(Clock .systemUTC()).minusMillis(9000 )
810
- val oldActivationMessage = activationMessage.copy(transid = TransactionId (messageTransId.meta.copy(start = oldNow)) )
815
+ val oldActivationMessage = newActivation( oldNow)
811
816
812
817
val queueManager =
813
818
TestActorRef (
@@ -877,7 +882,7 @@ class QueueManagerTests
877
882
true )
878
883
879
884
queueManager.tell(
880
- UpdateMemoryQueue (testFQN.toDocId.asDocInfo(testDocRevision), newFqn, activationMessage ),
885
+ UpdateMemoryQueue (testFQN.toDocId.asDocInfo(testDocRevision), newFqn, newActivation() ),
881
886
consumer.ref)
882
887
883
888
probe.expectMsg(activationMessage.activationId)
@@ -1084,7 +1089,7 @@ class QueueManagerTests
1084
1089
system.actorOf(Props (new Actor () {
1085
1090
override def receive : Receive = {
1086
1091
case GetActivation (_, _, _, _, _, _) =>
1087
- sender ! ActivationResponse (Right (activationMessage ))
1092
+ sender ! ActivationResponse (Right (newActivation() ))
1088
1093
1089
1094
case GracefulShutdown =>
1090
1095
probe.ref ! GracefulShutdown
0 commit comments