Skip to content

Commit c7063b3

Browse files
eirsepgithub-actions[bot]
authored andcommitted
log error messages and clean up monitor when indexing doc level queries or metadata creation fails (#900) (#912)
* log errors and clean up monitor when indexing doc level queries or metadata creation fails * refactor delete monitor action to re-use delete methods Signed-off-by: Surya Sashank Nistala <[email protected]> (cherry picked from commit 3440a4d)
1 parent edc00ae commit c7063b3

File tree

4 files changed

+250
-66
lines changed

4 files changed

+250
-66
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt

Lines changed: 87 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@
55

66
package org.opensearch.alerting.transport
77

8-
import kotlinx.coroutines.CoroutineName
8+
import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.Dispatchers
10-
import kotlinx.coroutines.GlobalScope
1110
import kotlinx.coroutines.launch
1211
import org.apache.logging.log4j.LogManager
1312
import org.opensearch.OpenSearchStatusException
@@ -25,6 +24,7 @@ import org.opensearch.action.search.SearchResponse
2524
import org.opensearch.action.support.ActionFilters
2625
import org.opensearch.action.support.HandledTransportAction
2726
import org.opensearch.action.support.IndicesOptions
27+
import org.opensearch.action.support.WriteRequest
2828
import org.opensearch.action.support.master.AcknowledgedResponse
2929
import org.opensearch.alerting.MonitorMetadataService
3030
import org.opensearch.alerting.opensearchapi.suspendUntil
@@ -57,6 +57,7 @@ import kotlin.coroutines.resume
5757
import kotlin.coroutines.resumeWithException
5858
import kotlin.coroutines.suspendCoroutine
5959

60+
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
6061
private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)
6162

6263
class TransportDeleteMonitorAction @Inject constructor(
@@ -90,8 +91,7 @@ class TransportDeleteMonitorAction @Inject constructor(
9091
if (!validateUserBackendRoles(user, actionListener)) {
9192
return
9293
}
93-
94-
GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) {
94+
scope.launch {
9595
DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart()
9696
}
9797
}
@@ -112,16 +112,15 @@ class TransportDeleteMonitorAction @Inject constructor(
112112
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)
113113

114114
if (canDelete) {
115-
val deleteResponse = deleteMonitor(monitor)
116-
deleteDocLevelMonitorQueriesAndIndices(monitor)
117-
deleteMetadata(monitor)
115+
val deleteResponse = deleteAllResourcesForMonitor(client, monitor, deleteRequest, monitorId)
118116
actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version))
119117
} else {
120118
actionListener.onFailure(
121119
AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException())
122120
)
123121
}
124122
} catch (t: Exception) {
123+
log.error("Failed to delete monitor ${deleteRequest.id()}", t)
125124
actionListener.onFailure(AlertingException.wrap(t))
126125
}
127126
}
@@ -145,69 +144,102 @@ class TransportDeleteMonitorAction @Inject constructor(
145144
)
146145
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
147146
}
147+
}
148+
149+
companion object {
150+
@JvmStatic
151+
suspend fun deleteAllResourcesForMonitor(
152+
client: Client,
153+
monitor: Monitor,
154+
deleteRequest: DeleteRequest,
155+
monitorId: String,
156+
): DeleteResponse {
157+
val deleteResponse = deleteMonitorDocument(client, deleteRequest)
158+
deleteMetadata(client, monitor)
159+
deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId)
160+
return deleteResponse
161+
}
148162

149-
private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse {
163+
private suspend fun deleteMonitorDocument(client: Client, deleteRequest: DeleteRequest): DeleteResponse {
150164
return client.suspendUntil { delete(deleteRequest, it) }
151165
}
152166

153-
private suspend fun deleteMetadata(monitor: Monitor) {
167+
suspend fun deleteMetadata(client: Client, monitor: Monitor) {
154168
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata")
155-
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
169+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
170+
try {
171+
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
172+
log.debug("Monitor metadata: ${deleteResponse.id} deletion result: ${deleteResponse.result}")
173+
} catch (e: Exception) {
174+
// we only log the error and don't fail the request because if monitor document has been deleted,
175+
// we cannot retry based on this failure
176+
log.error("Failed to delete monitor metadata ${deleteRequest.id()}.", e)
177+
}
156178
}
157179

158-
private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) {
159-
val clusterState = clusterService.state()
160-
val metadata = MonitorMetadataService.getMetadata(monitor)
161-
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->
162-
163-
val indicesExistsResponse: IndicesExistsResponse =
164-
client.suspendUntil {
165-
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
180+
suspend fun deleteDocLevelMonitorQueriesAndIndices(
181+
client: Client,
182+
monitor: Monitor,
183+
monitorId: String,
184+
) {
185+
try {
186+
val metadata = MonitorMetadataService.getMetadata(monitor)
187+
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->
188+
189+
val indicesExistsResponse: IndicesExistsResponse =
190+
client.suspendUntil {
191+
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
192+
}
193+
if (indicesExistsResponse.isExists == false) {
194+
return
166195
}
167-
if (indicesExistsResponse.isExists == false) {
168-
return
169-
}
170-
// Check if there's any queries from other monitors in this queryIndex,
171-
// to avoid unnecessary doc deletion, if we could just delete index completely
172-
val searchResponse: SearchResponse = client.suspendUntil {
173-
search(
174-
SearchRequest(queryIndex).source(
175-
SearchSourceBuilder()
176-
.size(0)
177-
.query(
178-
QueryBuilders.boolQuery().mustNot(
179-
QueryBuilders.matchQuery("monitor_id", monitorId)
196+
// Check if there's any queries from other monitors in this queryIndex,
197+
// to avoid unnecessary doc deletion, if we could just delete index completely
198+
val searchResponse: SearchResponse = client.suspendUntil {
199+
search(
200+
SearchRequest(queryIndex).source(
201+
SearchSourceBuilder()
202+
.size(0)
203+
.query(
204+
QueryBuilders.boolQuery().mustNot(
205+
QueryBuilders.matchQuery("monitor_id", monitorId)
206+
)
180207
)
181-
)
182-
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
183-
it
184-
)
185-
}
186-
if (searchResponse.hits.totalHits.value == 0L) {
187-
val ack: AcknowledgedResponse = client.suspendUntil {
188-
client.admin().indices().delete(
189-
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
208+
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
190209
it
191210
)
192211
}
193-
if (ack.isAcknowledged == false) {
194-
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
195-
}
196-
} else {
197-
// Delete all queries added by this monitor
198-
val response: BulkByScrollResponse = suspendCoroutine { cont ->
199-
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
200-
.source(queryIndex)
201-
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
202-
.refresh(true)
203-
.execute(
204-
object : ActionListener<BulkByScrollResponse> {
205-
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
206-
override fun onFailure(t: Exception) = cont.resumeWithException(t)
207-
}
212+
if (searchResponse.hits.totalHits.value == 0L) {
213+
val ack: AcknowledgedResponse = client.suspendUntil {
214+
client.admin().indices().delete(
215+
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it
208216
)
217+
}
218+
if (ack.isAcknowledged == false) {
219+
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
220+
}
221+
} else {
222+
// Delete all queries added by this monitor
223+
val response: BulkByScrollResponse = suspendCoroutine { cont ->
224+
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
225+
.source(queryIndex)
226+
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
227+
.refresh(true)
228+
.execute(
229+
object : ActionListener<BulkByScrollResponse> {
230+
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
231+
override fun onFailure(t: Exception) {
232+
cont.resumeWithException(t)
233+
}
234+
}
235+
)
236+
}
209237
}
210238
}
239+
} catch (e: Exception) {
240+
// we only log the error and don't fail the request because if monitor document has been deleted successfully,
241+
// we cannot retry based on this failure
242+
log.error("Failed to delete doc level queries from query index.", e)
211243
}
212244
}
213245
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthAction
2020
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
2121
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
2222
import org.opensearch.action.admin.indices.create.CreateIndexResponse
23+
import org.opensearch.action.delete.DeleteRequest
2324
import org.opensearch.action.get.GetRequest
2425
import org.opensearch.action.get.GetResponse
2526
import org.opensearch.action.index.IndexRequest
@@ -186,7 +187,7 @@ class TransportIndexMonitorAction @Inject constructor(
186187
client: Client,
187188
actionListener: ActionListener<IndexMonitorResponse>,
188189
request: IndexMonitorRequest,
189-
user: User?
190+
user: User?,
190191
) {
191192
val indices = mutableListOf<String>()
192193
// todo: for doc level alerting: check if index is present before monitor is created.
@@ -239,7 +240,7 @@ class TransportIndexMonitorAction @Inject constructor(
239240
client: Client,
240241
actionListener: ActionListener<IndexMonitorResponse>,
241242
request: IndexMonitorRequest,
242-
user: User?
243+
user: User?,
243244
) {
244245
client.threadPool().threadContext.stashContext().use {
245246
IndexMonitorHandler(client, actionListener, request, user).resolveUserAndStartForAD()
@@ -250,7 +251,7 @@ class TransportIndexMonitorAction @Inject constructor(
250251
private val client: Client,
251252
private val actionListener: ActionListener<IndexMonitorResponse>,
252253
private val request: IndexMonitorRequest,
253-
private val user: User?
254+
private val user: User?,
254255
) {
255256

256257
fun resolveUserAndStart() {
@@ -503,16 +504,30 @@ class TransportIndexMonitorAction @Inject constructor(
503504
)
504505
return
505506
}
506-
request.monitor = request.monitor.copy(id = indexResponse.id)
507-
var (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
508-
if (created == false) {
509-
log.warn("Metadata doc id:${metadata.id} exists, but it shouldn't!")
507+
var metadata: MonitorMetadata?
508+
try { // delete monitor if metadata creation fails, log the right error and re-throw the error to fail listener
509+
request.monitor = request.monitor.copy(id = indexResponse.id)
510+
var (monitorMetadata: MonitorMetadata, created: Boolean) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
511+
if (created == false) {
512+
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
513+
}
514+
metadata = monitorMetadata
515+
} catch (t: Exception) {
516+
log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor")
517+
cleanupMonitorAfterPartialFailure(request.monitor, indexResponse)
518+
throw t
510519
}
511-
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
512-
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
520+
try {
521+
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
522+
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
523+
}
524+
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
525+
MonitorMetadataService.upsertMetadata(metadata, updating = true)
526+
} catch (t: Exception) {
527+
log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t)
528+
cleanupMonitorAfterPartialFailure(request.monitor, indexResponse)
529+
throw t
513530
}
514-
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
515-
MonitorMetadataService.upsertMetadata(metadata, updating = true)
516531

517532
actionListener.onResponse(
518533
IndexMonitorResponse(
@@ -528,6 +543,24 @@ class TransportIndexMonitorAction @Inject constructor(
528543
}
529544
}
530545

546+
private suspend fun cleanupMonitorAfterPartialFailure(monitor: Monitor, indexMonitorResponse: IndexResponse) {
547+
// we simply log the success (debug log) or failure (error log) when we try clean up partially failed monitor creation request
548+
try {
549+
TransportDeleteMonitorAction.deleteAllResourcesForMonitor(
550+
client,
551+
monitor = monitor,
552+
DeleteRequest(SCHEDULED_JOBS_INDEX, indexMonitorResponse.id).setRefreshPolicy(RefreshPolicy.IMMEDIATE),
553+
indexMonitorResponse.id
554+
)
555+
log.debug(
556+
"Cleaned up monitor related resources after monitor creation request partial failure. " +
557+
"Monitor id : ${indexMonitorResponse.id}"
558+
)
559+
} catch (e: Exception) {
560+
log.error("Failed to clean up monitor after monitor creation request partial failure", e)
561+
}
562+
}
563+
531564
@Suppress("UNCHECKED_CAST")
532565
private suspend fun indexDocLevelMonitorQueries(
533566
monitor: Monitor,

alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.junit.Assert
99
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
10+
import org.opensearch.action.admin.indices.alias.Alias
1011
import org.opensearch.action.admin.indices.close.CloseIndexRequest
1112
import org.opensearch.action.admin.indices.create.CreateIndexRequest
1213
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
@@ -849,6 +850,59 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
849850
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
850851
}
851852

853+
fun `test cleanup monitor on partial create monitor failure`() {
854+
val docQuery = DocLevelQuery(query = "dnbkjndsfkjbnds:\"us-west-2\"", name = "3")
855+
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
856+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
857+
val customQueryIndex = "custom_alerts_index"
858+
val analyzer = "dfbdfbafd"
859+
val testDoc = """{
860+
"rule": {"title": "some_title"},
861+
"message": "msg 1 2 3 4"
862+
}"""
863+
indexDoc(index, "2", testDoc)
864+
client().admin().indices()
865+
.create(
866+
CreateIndexRequest(customQueryIndex + "-000001").alias(Alias(customQueryIndex))
867+
.mapping(
868+
"""
869+
{
870+
"_meta": {
871+
"schema_version": 1
872+
},
873+
"properties": {
874+
"query": {
875+
"type": "percolator_ext"
876+
},
877+
"monitor_id": {
878+
"type": "text"
879+
},
880+
"index": {
881+
"type": "text"
882+
}
883+
}
884+
}
885+
""".trimIndent()
886+
)
887+
).get()
888+
889+
client().admin().indices().close(CloseIndexRequest(customQueryIndex + "-000001")).get()
890+
var monitor = randomDocumentLevelMonitor(
891+
inputs = listOf(docLevelInput),
892+
triggers = listOf(trigger),
893+
dataSources = DataSources(
894+
queryIndex = customQueryIndex,
895+
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
896+
)
897+
)
898+
try {
899+
createMonitor(monitor)
900+
fail("monitor creation should fail due to incorrect analyzer name in test setup")
901+
} catch (e: Exception) {
902+
Assert.assertEquals(client().search(SearchRequest(SCHEDULED_JOBS_INDEX)).get().hits.hits.size, 0)
903+
}
904+
}
905+
852906
fun `test execute monitor without create when no monitors exists`() {
853907
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
854908
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

0 commit comments

Comments
 (0)