@@ -179,20 +179,78 @@ class MemoryQueueTests
179
179
fsm ! Start
180
180
expectMsg(Transition (fsm, Uninitialized , Running ))
181
181
182
- // Wait for the actual timeout
183
- // Test when(Running, stateTimeout = queueConfig.idleGrace)
182
+ // Test stateTimeout for when(Running, stateTimeout = queueConfig.idleGrace)
183
+ fsm.isStateTimerActive shouldBe true
184
184
Thread .sleep(queueConfigWithShortTimeout.idleGrace.toMillis)
185
+
185
186
expectMsg(Transition (fsm, Running , Idle ))
186
187
187
- // Test for when(Idle, stateTimeout = queueConfig.stopGrace)
188
+ // Test stateTimeout for when(Idle, stateTimeout = queueConfig.stopGrace)
189
+ fsm.isStateTimerActive shouldBe true
188
190
Thread .sleep(queueConfigWithShortTimeout.stopGrace.toMillis)
189
191
expectMsg(Transition (fsm, Idle , Removed ))
190
192
191
- // Test for when(Removed, stateTimeout = queueConfig.gracefulShutdownTimeout)
193
+ // Test stateTimeout for when(Removed, stateTimeout = queueConfig.gracefulShutdownTimeout)
194
+ fsm.isStateTimerActive shouldBe true
192
195
Thread .sleep(queueConfigWithShortTimeout.gracefulShutdownTimeout.toMillis)
193
196
parent.expectMsg(queueRemovedMsg)
194
197
}
195
198
199
+ it should " start startTimerWithFixedDelay(name=StopQueue) on Transition _ => Flushing" in {
200
+ implicit val clock = SystemClock
201
+ val mockEtcdClient = mock[EtcdClient ]
202
+ val prove = TestProbe ()
203
+ val watcher = TestProbe ()
204
+ val parent = TestProbe ()
205
+
206
+ expectDurationChecking(mockEsClient, testInvocationNamespace)
207
+
208
+ val queueConfigWithShortTimeout =
209
+ queueConfig.copy(idleGrace = 10 .seconds, stopGrace = 10 .milliseconds, gracefulShutdownTimeout = 10 .milliseconds)
210
+
211
+ val fsm =
212
+ TestFSMRef (
213
+ new MemoryQueue (
214
+ mockEtcdClient,
215
+ durationChecker,
216
+ fqn,
217
+ mockMessaging(),
218
+ schedulingConfig,
219
+ testInvocationNamespace,
220
+ revision,
221
+ endpoints,
222
+ actionMetadata,
223
+ prove.ref,
224
+ watcher.ref,
225
+ TestProbe ().ref,
226
+ TestProbe ().ref,
227
+ schedulerId,
228
+ ack,
229
+ store,
230
+ getUserLimit,
231
+ checkToDropStaleActivation,
232
+ queueConfigWithShortTimeout),
233
+ parent.ref,
234
+ " MemoryQueue" )
235
+
236
+ registerCallback(fsm)
237
+ fsm ! Start
238
+ expectMsg(Transition (fsm, Uninitialized , Running ))
239
+
240
+ fsm ! FailedCreationJob (
241
+ testCreationId,
242
+ message.user.namespace.name.asString,
243
+ message.action,
244
+ message.revision,
245
+ ContainerCreationError .NoAvailableInvokersError ,
246
+ " no available invokers" )
247
+
248
+ // Test case _ -> Flushing => startTimerWithFixedDelay("StopQueue", StateTimeout, queueConfig.flushGrace)
249
+ // state Running -> Flushing
250
+ expectMsg(Transition (fsm, Running , Flushing ))
251
+ fsm.isTimerActive(" StopQueue" ) shouldBe true
252
+ }
253
+
196
254
it should " register the endpoint when initializing" in {
197
255
implicit val clock = SystemClock
198
256
val mockEtcdClient = mock[EtcdClient ]
0 commit comments