Skip to content

Commit 3178347

Browse files
committed
adds tests and logs failure or success of cleanup
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent 8928a8c commit 3178347

File tree

3 files changed

+68
-44
lines changed

3 files changed

+68
-44
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@
55

66
package org.opensearch.alerting.transport
77

8-
import kotlinx.coroutines.CoroutineName
8+
import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.Dispatchers
10-
import kotlinx.coroutines.GlobalScope
1110
import kotlinx.coroutines.launch
1211
import org.apache.logging.log4j.LogManager
1312
import org.opensearch.OpenSearchStatusException
@@ -25,6 +24,7 @@ import org.opensearch.action.search.SearchResponse
2524
import org.opensearch.action.support.ActionFilters
2625
import org.opensearch.action.support.HandledTransportAction
2726
import org.opensearch.action.support.IndicesOptions
27+
import org.opensearch.action.support.WriteRequest
2828
import org.opensearch.action.support.master.AcknowledgedResponse
2929
import org.opensearch.alerting.MonitorMetadataService
3030
import org.opensearch.alerting.opensearchapi.suspendUntil
@@ -57,6 +57,7 @@ import kotlin.coroutines.resume
5757
import kotlin.coroutines.resumeWithException
5858
import kotlin.coroutines.suspendCoroutine
5959

60+
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
6061
private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)
6162

6263
class TransportDeleteMonitorAction @Inject constructor(
@@ -87,8 +88,7 @@ class TransportDeleteMonitorAction @Inject constructor(
8788
if (!validateUserBackendRoles(user, actionListener)) {
8889
return
8990
}
90-
91-
GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) {
91+
scope.launch {
9292
DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart()
9393
}
9494
}
@@ -147,6 +147,7 @@ class TransportDeleteMonitorAction @Inject constructor(
147147

148148
private suspend fun deleteMetadata(monitor: Monitor) {
149149
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata")
150+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
150151
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
151152
}
152153

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import org.opensearch.common.xcontent.XContentHelper
5858
import org.opensearch.common.xcontent.XContentType
5959
import org.opensearch.commons.alerting.action.AlertingActions
6060
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
61+
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
6162
import org.opensearch.commons.alerting.action.IndexMonitorRequest
6263
import org.opensearch.commons.alerting.action.IndexMonitorResponse
6364
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
@@ -178,7 +179,7 @@ class TransportIndexMonitorAction @Inject constructor(
178179
client: Client,
179180
actionListener: ActionListener<IndexMonitorResponse>,
180181
request: IndexMonitorRequest,
181-
user: User?
182+
user: User?,
182183
) {
183184
val indices = mutableListOf<String>()
184185
// todo: for doc level alerting: check if index is present before monitor is created.
@@ -231,7 +232,7 @@ class TransportIndexMonitorAction @Inject constructor(
231232
client: Client,
232233
actionListener: ActionListener<IndexMonitorResponse>,
233234
request: IndexMonitorRequest,
234-
user: User?
235+
user: User?,
235236
) {
236237
client.threadPool().threadContext.stashContext().use {
237238
IndexMonitorHandler(client, actionListener, request, user).resolveUserAndStartForAD()
@@ -242,7 +243,7 @@ class TransportIndexMonitorAction @Inject constructor(
242243
private val client: Client,
243244
private val actionListener: ActionListener<IndexMonitorResponse>,
244245
private val request: IndexMonitorRequest,
245-
private val user: User?
246+
private val user: User?,
246247
) {
247248

248249
fun resolveUserAndStart() {
@@ -503,10 +504,7 @@ class TransportIndexMonitorAction @Inject constructor(
503504
metadata = monitorMetadata
504505
} catch (t: Exception) {
505506
log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor")
506-
client.execute(
507-
AlertingActions.DELETE_MONITOR_ACTION_TYPE,
508-
DeleteMonitorRequest(indexResponse.id, RefreshPolicy.IMMEDIATE)
509-
)
507+
cleanupMonitorAfterPartialFailure(indexResponse)
510508
throw t
511509
}
512510
try {
@@ -517,10 +515,7 @@ class TransportIndexMonitorAction @Inject constructor(
517515
MonitorMetadataService.upsertMetadata(metadata, updating = true)
518516
} catch (t: Exception) {
519517
log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t)
520-
client.execute(
521-
AlertingActions.DELETE_MONITOR_ACTION_TYPE,
522-
DeleteMonitorRequest(indexResponse.id, RefreshPolicy.IMMEDIATE)
523-
)
518+
cleanupMonitorAfterPartialFailure(indexResponse)
524519
throw t
525520
}
526521

@@ -535,6 +530,22 @@ class TransportIndexMonitorAction @Inject constructor(
535530
}
536531
}
537532

533+
private suspend fun cleanupMonitorAfterPartialFailure(indexMonitorResponse: IndexResponse) {
534+
// we simply log the success (debug log) or failure (error log) when we try clean up partially failed monitor creation request
535+
try {
536+
val dmr: DeleteMonitorResponse = client.suspendUntil {
537+
client.execute(
538+
AlertingActions.DELETE_MONITOR_ACTION_TYPE,
539+
DeleteMonitorRequest(indexMonitorResponse.id, RefreshPolicy.IMMEDIATE),
540+
it
541+
)
542+
}
543+
log.debug("Cleaned up monitor after monitor creation request partial failure. Monitor id : ${dmr.id}")
544+
} catch (e: Exception) {
545+
log.error("Failed to clean up monitor after monitor creation request partial failure", e)
546+
}
547+
}
548+
538549
@Suppress("UNCHECKED_CAST")
539550
private suspend fun indexDocLevelMonitorQueries(
540551
monitor: Monitor,

alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.junit.Assert
99
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
10+
import org.opensearch.action.admin.indices.alias.Alias
1011
import org.opensearch.action.admin.indices.close.CloseIndexRequest
1112
import org.opensearch.action.admin.indices.create.CreateIndexRequest
1213
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
@@ -23,7 +24,6 @@ import org.opensearch.alerting.action.SearchMonitorAction
2324
import org.opensearch.alerting.action.SearchMonitorRequest
2425
import org.opensearch.alerting.alerts.AlertIndices
2526
import org.opensearch.alerting.core.ScheduledJobIndices
26-
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
2727
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
2828
import org.opensearch.alerting.util.DocLevelMonitorQueries
2929
import org.opensearch.alerting.util.DocLevelMonitorQueries.Companion.INDEX_PATTERN_SUFFIX
@@ -755,12 +755,43 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
755755
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
756756
}
757757

758-
fun `test execute monitor without create when no monitors exists`() {
759-
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
758+
fun `test cleanup monitor on partial create monitor failure`() {
759+
val docQuery = DocLevelQuery(query = "dnbkjndsfkjbnds:\"us-west-2\"", name = "3")
760760
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
761761
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
762762
val customQueryIndex = "custom_alerts_index"
763-
val analyzer = "whitespace"
763+
val analyzer = "dfbdfbafd"
764+
val testDoc = """{
765+
"rule": {"title": "some_title"},
766+
"message": "msg 1 2 3 4"
767+
}"""
768+
indexDoc(index, "2", testDoc)
769+
client().admin().indices()
770+
.create(
771+
CreateIndexRequest(customQueryIndex + "-000001").alias(Alias(customQueryIndex))
772+
.mapping(
773+
"""
774+
{
775+
"_meta": {
776+
"schema_version": 1
777+
},
778+
"properties": {
779+
"query": {
780+
"type": "percolator_ext"
781+
},
782+
"monitor_id": {
783+
"type": "text"
784+
},
785+
"index": {
786+
"type": "text"
787+
}
788+
}
789+
}
790+
""".trimIndent()
791+
)
792+
)
793+
794+
client().admin().indices().close(CloseIndexRequest(customQueryIndex))
764795
var monitor = randomDocumentLevelMonitor(
765796
inputs = listOf(docLevelInput),
766797
triggers = listOf(trigger),
@@ -769,31 +800,12 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
769800
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
770801
)
771802
)
772-
var executeMonitorResponse = executeMonitor(monitor, null)
773-
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
774-
val testDoc = """{
775-
"message" : "This is an error from IAD region",
776-
"test_strict_date_time" : "$testTime",
777-
"test_field" : "us-west-2"
778-
}"""
779-
780-
assertIndexNotExists(SCHEDULED_JOBS_INDEX)
781-
782-
val createMonitorResponse = createMonitor(monitor)
783-
784-
assertIndexExists(SCHEDULED_JOBS_INDEX)
785-
786-
indexDoc(index, "1", testDoc)
787-
788-
executeMonitorResponse = executeMonitor(monitor, createMonitorResponse?.id, dryRun = false)
789-
790-
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
791-
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
792-
Assert.assertEquals(
793-
(executeMonitorResponse.monitorRunResult.triggerResults.iterator().next().value as DocumentLevelTriggerRunResult)
794-
.triggeredDocs.size,
795-
1
796-
)
803+
try {
804+
createMonitor(monitor)
805+
fail("monitor creation should fail due to incorrect analyzer name in test setup")
806+
} catch (e: Exception) {
807+
Assert.assertEquals(client().search(SearchRequest(SCHEDULED_JOBS_INDEX)).get().hits.hits.size, 0)
808+
}
797809
}
798810

799811
fun `test execute monitor with custom query index and custom field mappings`() {

0 commit comments

Comments
 (0)