Skip to content

Commit a26f4c0

Browse files
authored
change Thread.sleep to waitUntil function under test files (#1242)
Signed-off-by: Jacob Choi <[email protected]>
1 parent 2146199 commit a26f4c0

File tree

6 files changed

+81
-19
lines changed

6 files changed

+81
-19
lines changed

alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt

+20-6
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
4747
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder
4848
import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig
4949
import org.opensearch.search.builder.SearchSourceBuilder
50+
import org.opensearch.test.OpenSearchTestCase
5051
import java.net.URLEncoder
5152
import java.time.Instant
5253
import java.time.ZonedDateTime
@@ -55,6 +56,7 @@ import java.time.temporal.ChronoUnit
5556
import java.time.temporal.ChronoUnit.DAYS
5657
import java.time.temporal.ChronoUnit.MILLIS
5758
import java.time.temporal.ChronoUnit.MINUTES
59+
import java.util.concurrent.TimeUnit
5860

5961
class MonitorRunnerServiceIT : AlertingRestTestCase() {
6062

@@ -138,7 +140,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
138140
verifyAlert(firstRunAlert, monitor)
139141
// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
140142
// see lastNotificationTime change.
141-
Thread.sleep(200)
143+
OpenSearchTestCase.waitUntil({
144+
return@waitUntil false
145+
}, 200, TimeUnit.MILLISECONDS)
142146
executeMonitor(monitor.id)
143147
val secondRunAlert = searchAlerts(monitor).single()
144148
verifyAlert(secondRunAlert, monitor)
@@ -265,7 +269,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
265269

266270
// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
267271
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
268-
Thread.sleep(200)
272+
OpenSearchTestCase.waitUntil({
273+
return@waitUntil false
274+
}, 200, TimeUnit.MILLISECONDS)
269275
val response = executeMonitor(monitor.id)
270276

271277
val output = entityAsMap(response)
@@ -765,7 +771,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
765771
verifyAlert(activeAlert1.single(), monitor, ACTIVE)
766772
val actionResults1 = verifyActionExecutionResultInAlert(activeAlert1[0], mutableMapOf(Pair(actionThrottleEnabled.id, 0)))
767773

768-
Thread.sleep(200)
774+
OpenSearchTestCase.waitUntil({
775+
return@waitUntil false
776+
}, 200, TimeUnit.MILLISECONDS)
769777
updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = NEVER_RUN)), id = monitor.id))
770778
executeMonitor(monitor.id)
771779
val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single()
@@ -1398,7 +1406,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
13981406

13991407
// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
14001408
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
1401-
Thread.sleep(200)
1409+
OpenSearchTestCase.waitUntil({
1410+
return@waitUntil false
1411+
}, 200, TimeUnit.MILLISECONDS)
14021412
executeMonitor(monitor.id)
14031413

14041414
// Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was
@@ -1418,7 +1428,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
14181428
)
14191429

14201430
// Execute Monitor and check that both Alerts were updated
1421-
Thread.sleep(200)
1431+
OpenSearchTestCase.waitUntil({
1432+
return@waitUntil false
1433+
}, 200, TimeUnit.MILLISECONDS)
14221434
executeMonitor(monitor.id)
14231435
currentAlerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN)
14241436
val completedAlerts = currentAlerts.filter { it.state == COMPLETED }
@@ -1940,7 +1952,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
19401952

19411953
// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
19421954
// let Action executionTime change. W/o this sleep the test can result in a false negative.
1943-
Thread.sleep(200)
1955+
OpenSearchTestCase.waitUntil({
1956+
return@waitUntil false
1957+
}, 200, TimeUnit.MILLISECONDS)
19441958
val monitorRunResultThrottled = entityAsMap(executeMonitor(monitor.id))
19451959
verifyActionThrottleResultsForBucketLevelMonitor(
19461960
monitorRunResult = monitorRunResultThrottled,

alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt

+6-2
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ class AlertIndicesIT : AlertingRestTestCase() {
140140
executeMonitor(trueMonitor)
141141

142142
// Allow for a rollover index.
143-
Thread.sleep(2000)
143+
OpenSearchTestCase.waitUntil({
144+
return@waitUntil (getAlertIndices().size >= 3)
145+
}, 2, TimeUnit.SECONDS)
144146
assertTrue("Did not find 3 alert indices", getAlertIndices().size >= 3)
145147
}
146148

@@ -157,7 +159,9 @@ class AlertIndicesIT : AlertingRestTestCase() {
157159
executeMonitor(trueMonitor.id)
158160

159161
// Allow for a rollover index.
160-
Thread.sleep(2000)
162+
OpenSearchTestCase.waitUntil({
163+
return@waitUntil (getFindingIndices().size >= 2)
164+
}, 2, TimeUnit.SECONDS)
161165
assertTrue("Did not find 2 alert indices", getFindingIndices().size >= 2)
162166
}
163167

alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt

+17-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import org.opensearch.commons.alerting.model.Monitor
1616
import org.opensearch.core.rest.RestStatus
1717
import org.opensearch.index.query.QueryBuilders
1818
import org.opensearch.search.builder.SearchSourceBuilder
19+
import org.opensearch.test.OpenSearchTestCase
20+
import java.util.concurrent.TimeUnit
1921

2022
class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
2123

@@ -69,8 +71,21 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
6971
// the test execution by a lot (might have to wait for Job Scheduler plugin integration first)
7072
// Waiting a minute to ensure the Monitor ran again at least once before checking if the job is running
7173
// on time
72-
Thread.sleep(60000)
73-
verifyMonitorStats("/_plugins/_alerting")
74+
var passed = false
75+
OpenSearchTestCase.waitUntil({
76+
try {
77+
// Run verifyMonitorStats until all assertion test passes
78+
verifyMonitorStats("/_plugins/_alerting")
79+
passed = true
80+
return@waitUntil true
81+
} catch (e: AssertionError) {
82+
return@waitUntil false
83+
}
84+
}, 1, TimeUnit.MINUTES)
85+
if (!passed) {
86+
// if it hit the max time (1 minute), run verifyMonitorStats again to make sure all the tests pass
87+
verifyMonitorStats("/_plugins/_alerting")
88+
}
7489
}
7590
}
7691
break

alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt

+27-7
Original file line numberDiff line numberDiff line change
@@ -811,7 +811,11 @@ class MonitorRestApiIT : AlertingRestTestCase() {
811811
assertEquals("Delete request not successful", RestStatus.OK, deleteResponse.restStatus())
812812

813813
// Wait 5 seconds for event to be processed and alerts moved
814-
Thread.sleep(5000)
814+
OpenSearchTestCase.waitUntil({
815+
val alerts = searchAlerts(monitor)
816+
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
817+
return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1)
818+
}, 5, TimeUnit.SECONDS)
815819

816820
val alerts = searchAlerts(monitor)
817821
assertEquals("Active alert was not deleted", 0, alerts.size)
@@ -842,7 +846,9 @@ class MonitorRestApiIT : AlertingRestTestCase() {
842846
assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus())
843847

844848
// Wait 5 seconds for event to be processed and alerts moved
845-
Thread.sleep(5000)
849+
OpenSearchTestCase.waitUntil({
850+
return@waitUntil false
851+
}, 5, TimeUnit.SECONDS)
846852

847853
val alerts = searchAlerts(monitor)
848854
assertEquals("Active alert was not deleted", 0, alerts.size)
@@ -870,7 +876,11 @@ class MonitorRestApiIT : AlertingRestTestCase() {
870876
assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus())
871877

872878
// Wait 5 seconds for event to be processed and alerts moved
873-
Thread.sleep(5000)
879+
OpenSearchTestCase.waitUntil({
880+
val alerts = searchAlerts(monitor)
881+
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
882+
return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1)
883+
}, 5, TimeUnit.SECONDS)
874884

875885
val alerts = searchAlerts(monitor)
876886
assertEquals("Active alert was not deleted", 0, alerts.size)
@@ -956,10 +966,13 @@ class MonitorRestApiIT : AlertingRestTestCase() {
956966

957967
fun `test monitor stats when disabling and re-enabling scheduled jobs with existing monitor`() {
958968
// Enable Monitor jobs
969+
959970
enableScheduledJob()
960971
val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true), refresh = true).id
961972

962-
if (isMultiNode) Thread.sleep(2000)
973+
if (isMultiNode) OpenSearchTestCase.waitUntil({
974+
return@waitUntil false
975+
}, 2, TimeUnit.SECONDS)
963976
var alertingStats = getAlertingStats()
964977
assertAlertingStatsSweeperEnabled(alertingStats, true)
965978
assertEquals("Scheduled job index does not exist", true, alertingStats["scheduled_job_index_exists"])
@@ -992,7 +1005,9 @@ class MonitorRestApiIT : AlertingRestTestCase() {
9921005
enableScheduledJob()
9931006

9941007
// Sleep briefly so sweep can reschedule the Monitor
995-
Thread.sleep(2000)
1008+
OpenSearchTestCase.waitUntil({
1009+
return@waitUntil false
1010+
}, 2, TimeUnit.SECONDS)
9961011

9971012
alertingStats = getAlertingStats()
9981013
assertAlertingStatsSweeperEnabled(alertingStats, true)
@@ -1015,10 +1030,13 @@ class MonitorRestApiIT : AlertingRestTestCase() {
10151030

10161031
fun `test monitor stats jobs`() {
10171032
// Enable the Monitor plugin.
1033+
10181034
enableScheduledJob()
10191035
createRandomMonitor(refresh = true)
10201036

1021-
if (isMultiNode) Thread.sleep(2000)
1037+
if (isMultiNode) OpenSearchTestCase.waitUntil({
1038+
return@waitUntil false
1039+
}, 2, TimeUnit.SECONDS)
10221040
val responseMap = getAlertingStats()
10231041
assertAlertingStatsSweeperEnabled(responseMap, true)
10241042
assertEquals("Scheduled job index does not exist", true, responseMap["scheduled_job_index_exists"])
@@ -1051,7 +1069,9 @@ class MonitorRestApiIT : AlertingRestTestCase() {
10511069
enableScheduledJob()
10521070
createRandomMonitor(refresh = true)
10531071

1054-
if (isMultiNode) Thread.sleep(2000)
1072+
if (isMultiNode) OpenSearchTestCase.waitUntil({
1073+
return@waitUntil false
1074+
}, 2, TimeUnit.SECONDS)
10551075
val responseMap = getAlertingStats("/jobs_info")
10561076
assertAlertingStatsSweeperEnabled(responseMap, true)
10571077
assertEquals("Scheduled job index does not exist", true, responseMap["scheduled_job_index_exists"])

alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ import org.opensearch.index.query.QueryBuilders
3535
import org.opensearch.script.Script
3636
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
3737
import org.opensearch.search.builder.SearchSourceBuilder
38+
import org.opensearch.test.OpenSearchTestCase
3839
import org.opensearch.test.junit.annotations.TestLogging
3940
import java.time.Instant
4041
import java.time.temporal.ChronoUnit
4142
import java.util.Collections
4243
import java.util.Locale
4344
import java.util.UUID
45+
import java.util.concurrent.TimeUnit
4446

4547
@TestLogging("level:DEBUG", reason = "Debug for tests.")
4648
@Suppress("UNCHECKED_CAST")
@@ -1180,7 +1182,10 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
11801182
}"""
11811183

11821184
indexDoc(index, "1", testDoc)
1183-
Thread.sleep(80000)
1185+
OpenSearchTestCase.waitUntil({
1186+
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
1187+
return@waitUntil (findings.size == 1)
1188+
}, 80, TimeUnit.SECONDS)
11841189

11851190
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
11861191
assertEquals("Findings saved for test monitor", 1, findings.size)

alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ import org.opensearch.alerting.util.DestinationType
1818
import org.opensearch.client.ResponseException
1919
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
2020
import org.opensearch.core.rest.RestStatus
21+
import org.opensearch.test.OpenSearchTestCase
2122
import java.time.Instant
2223
import java.util.UUID
24+
import java.util.concurrent.TimeUnit
2325

2426
class DestinationMigrationUtilServiceIT : AlertingRestTestCase() {
2527

@@ -80,7 +82,9 @@ class DestinationMigrationUtilServiceIT : AlertingRestTestCase() {
8082

8183
// Create cluster change event and wait for migration service to complete migrating data over
8284
client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb")
83-
Thread.sleep(120000)
85+
OpenSearchTestCase.waitUntil({
86+
return@waitUntil false
87+
}, 2, TimeUnit.MINUTES)
8488

8589
for (id in ids) {
8690
val response = client().makeRequest(

0 commit comments

Comments
 (0)