@@ -136,6 +136,63 @@ class MemoryQueueTests
136
136
137
137
behavior of " MemoryQueue"
138
138
139
+ it should " send StateTimeout message when state timeout" in {
140
+ implicit val clock = SystemClock
141
+ val mockEtcdClient = mock[EtcdClient ]
142
+ val prove = TestProbe ()
143
+ val watcher = TestProbe ()
144
+ val parent = TestProbe ()
145
+
146
+ expectDurationChecking(mockEsClient, testInvocationNamespace)
147
+
148
+ val queueConfigWithShortTimeout = queueConfig.copy(
149
+ idleGrace = 10 .milliseconds,
150
+ stopGrace = 10 .milliseconds,
151
+ gracefulShutdownTimeout = 10 .milliseconds)
152
+
153
+ val fsm =
154
+ TestFSMRef (
155
+ new MemoryQueue (
156
+ mockEtcdClient,
157
+ durationChecker,
158
+ fqn,
159
+ mockMessaging(),
160
+ schedulingConfig,
161
+ testInvocationNamespace,
162
+ revision,
163
+ endpoints,
164
+ actionMetadata,
165
+ prove.ref,
166
+ watcher.ref,
167
+ TestProbe ().ref,
168
+ TestProbe ().ref,
169
+ schedulerId,
170
+ ack,
171
+ store,
172
+ getUserLimit,
173
+ checkToDropStaleActivation,
174
+ queueConfigWithShortTimeout),
175
+ parent.ref,
176
+ " MemoryQueue" )
177
+
178
+ registerCallback(fsm)
179
+ fsm ! Start
180
+ expectMsg(Transition (fsm, Uninitialized , Running ))
181
+
182
+ // Wait for the actual timeout
183
+ // Test when(Running, stateTimeout = queueConfig.idleGrace)
184
+ Thread .sleep(queueConfigWithShortTimeout.idleGrace.toMillis)
185
+ expectMsg(Transition (fsm, Running , Idle ))
186
+
187
+ // Test for when(Idle, stateTimeout = queueConfig.stopGrace)
188
+ Thread .sleep(queueConfigWithShortTimeout.stopGrace.toMillis)
189
+ expectMsg(Transition (fsm, Idle , Removed ))
190
+
191
+ // Test for when(Removed, stateTimeout = queueConfig.gracefulShutdownTimeout)
192
+ Thread .sleep(queueConfigWithShortTimeout.gracefulShutdownTimeout.toMillis)
193
+ parent.expectMsg(queueRemovedMsg)
194
+ }
195
+
139
196
it should " register the endpoint when initializing" in {
140
197
implicit val clock = SystemClock
141
198
val mockEtcdClient = mock[EtcdClient ]
0 commit comments