Skip to content

Commit 2337100

Browse files
eirsepAWSHurneyt
authored andcommitted
log error messages and clean up monitor when indexing doc level queries or metadata creation fails (opensearch-project#900) (opensearch-project#912)
* log errors and clean up monitor when indexing doc level queries or metadata creation fails * refactor delete monitor action to re-use delete methods Signed-off-by: Surya Sashank Nistala <[email protected]> Signed-off-by: AWSHurneyt <[email protected]>
1 parent 069fb7a commit 2337100

File tree

4 files changed

+251
-66
lines changed

4 files changed

+251
-66
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt

Lines changed: 88 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@
55

66
package org.opensearch.alerting.transport
77

8-
import kotlinx.coroutines.CoroutineName
8+
import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.Dispatchers
10-
import kotlinx.coroutines.GlobalScope
1110
import kotlinx.coroutines.launch
1211
import org.apache.logging.log4j.LogManager
1312
import org.opensearch.OpenSearchStatusException
@@ -25,6 +24,7 @@ import org.opensearch.action.search.SearchResponse
2524
import org.opensearch.action.support.ActionFilters
2625
import org.opensearch.action.support.HandledTransportAction
2726
import org.opensearch.action.support.IndicesOptions
27+
import org.opensearch.action.support.WriteRequest
2828
import org.opensearch.action.support.master.AcknowledgedResponse
2929
import org.opensearch.alerting.MonitorMetadataService
3030
import org.opensearch.alerting.opensearchapi.suspendUntil
@@ -57,6 +57,7 @@ import kotlin.coroutines.resume
5757
import kotlin.coroutines.resumeWithException
5858
import kotlin.coroutines.suspendCoroutine
5959

60+
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
6061
private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)
6162

6263
class TransportDeleteMonitorAction @Inject constructor(
@@ -87,8 +88,7 @@ class TransportDeleteMonitorAction @Inject constructor(
8788
if (!validateUserBackendRoles(user, actionListener)) {
8889
return
8990
}
90-
91-
GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) {
91+
scope.launch {
9292
DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart()
9393
}
9494
}
@@ -109,16 +109,15 @@ class TransportDeleteMonitorAction @Inject constructor(
109109
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)
110110

111111
if (canDelete) {
112-
val deleteResponse = deleteMonitor(monitor)
113-
deleteDocLevelMonitorQueriesAndIndices(monitor)
114-
deleteMetadata(monitor)
112+
val deleteResponse = deleteAllResourcesForMonitor(client, monitor, deleteRequest, monitorId)
115113
actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version))
116114
} else {
117115
actionListener.onFailure(
118116
AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException())
119117
)
120118
}
121119
} catch (t: Exception) {
120+
log.error("Failed to delete monitor ${deleteRequest.id()}", t)
122121
actionListener.onFailure(AlertingException.wrap(t))
123122
}
124123
}
@@ -140,68 +139,102 @@ class TransportDeleteMonitorAction @Inject constructor(
140139
)
141140
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
142141
}
142+
}
143+
144+
companion object {
145+
@JvmStatic
146+
suspend fun deleteAllResourcesForMonitor(
147+
client: Client,
148+
monitor: Monitor,
149+
deleteRequest: DeleteRequest,
150+
monitorId: String,
151+
): DeleteResponse {
152+
val deleteResponse = deleteMonitorDocument(client, deleteRequest)
153+
deleteMetadata(client, monitor)
154+
deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId)
155+
return deleteResponse
156+
}
143157

144-
private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse {
158+
private suspend fun deleteMonitorDocument(client: Client, deleteRequest: DeleteRequest): DeleteResponse {
145159
return client.suspendUntil { delete(deleteRequest, it) }
146160
}
147161

148-
private suspend fun deleteMetadata(monitor: Monitor) {
162+
suspend fun deleteMetadata(client: Client, monitor: Monitor) {
149163
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata")
150-
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
164+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
165+
try {
166+
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
167+
log.debug("Monitor metadata: ${deleteResponse.id} deletion result: ${deleteResponse.result}")
168+
} catch (e: Exception) {
169+
// we only log the error and don't fail the request because if monitor document has been deleted,
170+
// we cannot retry based on this failure
171+
log.error("Failed to delete monitor metadata ${deleteRequest.id()}.", e)
172+
}
151173
}
152174

153-
private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) {
154-
val clusterState = clusterService.state()
155-
val metadata = MonitorMetadataService.getMetadata(monitor)
156-
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->
157-
158-
val indicesExistsResponse: IndicesExistsResponse =
159-
client.suspendUntil {
160-
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
175+
suspend fun deleteDocLevelMonitorQueriesAndIndices(
176+
client: Client,
177+
monitor: Monitor,
178+
monitorId: String,
179+
) {
180+
try {
181+
val metadata = MonitorMetadataService.getMetadata(monitor)
182+
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->
183+
184+
val indicesExistsResponse: IndicesExistsResponse =
185+
client.suspendUntil {
186+
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
187+
}
188+
if (indicesExistsResponse.isExists == false) {
189+
return
161190
}
162-
if (indicesExistsResponse.isExists == false) {
163-
return
164-
}
165-
// Check if there's any queries from other monitors in this queryIndex,
166-
// to avoid unnecessary doc deletion, if we could just delete index completely
167-
val searchResponse: SearchResponse = client.suspendUntil {
168-
search(
169-
SearchRequest(queryIndex).source(
170-
SearchSourceBuilder()
171-
.size(0)
172-
.query(
173-
QueryBuilders.boolQuery().mustNot(
174-
QueryBuilders.matchQuery("monitor_id", monitorId)
191+
// Check if there's any queries from other monitors in this queryIndex,
192+
// to avoid unnecessary doc deletion, if we could just delete index completely
193+
val searchResponse: SearchResponse = client.suspendUntil {
194+
search(
195+
SearchRequest(queryIndex).source(
196+
SearchSourceBuilder()
197+
.size(0)
198+
.query(
199+
QueryBuilders.boolQuery().mustNot(
200+
QueryBuilders.matchQuery("monitor_id", monitorId)
201+
)
175202
)
176-
)
177-
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
178-
it
179-
)
180-
}
181-
if (searchResponse.hits.totalHits.value == 0L) {
182-
val ack: AcknowledgedResponse = client.suspendUntil {
183-
client.admin().indices().delete(
184-
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it
203+
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
204+
it
185205
)
186206
}
187-
if (ack.isAcknowledged == false) {
188-
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
189-
}
190-
} else {
191-
// Delete all queries added by this monitor
192-
val response: BulkByScrollResponse = suspendCoroutine { cont ->
193-
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
194-
.source(queryIndex)
195-
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
196-
.refresh(true)
197-
.execute(
198-
object : ActionListener<BulkByScrollResponse> {
199-
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
200-
override fun onFailure(t: Exception) = cont.resumeWithException(t)
201-
}
207+
if (searchResponse.hits.totalHits.value == 0L) {
208+
val ack: AcknowledgedResponse = client.suspendUntil {
209+
client.admin().indices().delete(
210+
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it
202211
)
212+
}
213+
if (ack.isAcknowledged == false) {
214+
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
215+
}
216+
} else {
217+
// Delete all queries added by this monitor
218+
val response: BulkByScrollResponse = suspendCoroutine { cont ->
219+
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
220+
.source(queryIndex)
221+
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
222+
.refresh(true)
223+
.execute(
224+
object : ActionListener<BulkByScrollResponse> {
225+
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
226+
override fun onFailure(t: Exception) {
227+
cont.resumeWithException(t)
228+
}
229+
}
230+
)
231+
}
203232
}
204233
}
234+
} catch (e: Exception) {
235+
// we only log the error and don't fail the request because if monitor document has been deleted successfully,
236+
// we cannot retry based on this failure
237+
log.error("Failed to delete doc level queries from query index.", e)
205238
}
206239
}
207240
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthAction
1919
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
2020
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
2121
import org.opensearch.action.admin.indices.create.CreateIndexResponse
22+
import org.opensearch.action.delete.DeleteRequest
2223
import org.opensearch.action.get.GetRequest
2324
import org.opensearch.action.get.GetResponse
2425
import org.opensearch.action.index.IndexRequest
@@ -175,7 +176,7 @@ class TransportIndexMonitorAction @Inject constructor(
175176
client: Client,
176177
actionListener: ActionListener<IndexMonitorResponse>,
177178
request: IndexMonitorRequest,
178-
user: User?
179+
user: User?,
179180
) {
180181
val indices = mutableListOf<String>()
181182
// todo: for doc level alerting: check if index is present before monitor is created.
@@ -228,7 +229,7 @@ class TransportIndexMonitorAction @Inject constructor(
228229
client: Client,
229230
actionListener: ActionListener<IndexMonitorResponse>,
230231
request: IndexMonitorRequest,
231-
user: User?
232+
user: User?,
232233
) {
233234
client.threadPool().threadContext.stashContext().use {
234235
IndexMonitorHandler(client, actionListener, request, user).resolveUserAndStartForAD()
@@ -239,7 +240,7 @@ class TransportIndexMonitorAction @Inject constructor(
239240
private val client: Client,
240241
private val actionListener: ActionListener<IndexMonitorResponse>,
241242
private val request: IndexMonitorRequest,
242-
private val user: User?
243+
private val user: User?,
243244
) {
244245

245246
fun resolveUserAndStart() {
@@ -490,16 +491,30 @@ class TransportIndexMonitorAction @Inject constructor(
490491
)
491492
return
492493
}
493-
request.monitor = request.monitor.copy(id = indexResponse.id)
494-
var (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
495-
if (created == false) {
496-
log.warn("Metadata doc id:${metadata.id} exists, but it shouldn't!")
494+
var metadata: MonitorMetadata?
495+
try { // delete monitor if metadata creation fails, log the right error and re-throw the error to fail listener
496+
request.monitor = request.monitor.copy(id = indexResponse.id)
497+
var (monitorMetadata: MonitorMetadata, created: Boolean) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
498+
if (created == false) {
499+
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
500+
}
501+
metadata = monitorMetadata
502+
} catch (t: Exception) {
503+
log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor")
504+
cleanupMonitorAfterPartialFailure(request.monitor, indexResponse)
505+
throw t
497506
}
498-
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
499-
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
507+
try {
508+
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
509+
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
510+
}
511+
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
512+
MonitorMetadataService.upsertMetadata(metadata, updating = true)
513+
} catch (t: Exception) {
514+
log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t)
515+
cleanupMonitorAfterPartialFailure(request.monitor, indexResponse)
516+
throw t
500517
}
501-
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
502-
MonitorMetadataService.upsertMetadata(metadata, updating = true)
503518

504519
actionListener.onResponse(
505520
IndexMonitorResponse(
@@ -512,6 +527,24 @@ class TransportIndexMonitorAction @Inject constructor(
512527
}
513528
}
514529

530+
private suspend fun cleanupMonitorAfterPartialFailure(monitor: Monitor, indexMonitorResponse: IndexResponse) {
531+
// we simply log the success (debug log) or failure (error log) when we try clean up partially failed monitor creation request
532+
try {
533+
TransportDeleteMonitorAction.deleteAllResourcesForMonitor(
534+
client,
535+
monitor = monitor,
536+
DeleteRequest(SCHEDULED_JOBS_INDEX, indexMonitorResponse.id).setRefreshPolicy(RefreshPolicy.IMMEDIATE),
537+
indexMonitorResponse.id
538+
)
539+
log.debug(
540+
"Cleaned up monitor related resources after monitor creation request partial failure. " +
541+
"Monitor id : ${indexMonitorResponse.id}"
542+
)
543+
} catch (e: Exception) {
544+
log.error("Failed to clean up monitor after monitor creation request partial failure", e)
545+
}
546+
}
547+
515548
@Suppress("UNCHECKED_CAST")
516549
private suspend fun indexDocLevelMonitorQueries(
517550
monitor: Monitor,

alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.junit.Assert
99
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
10+
import org.opensearch.action.admin.indices.alias.Alias
1011
import org.opensearch.action.admin.indices.create.CreateIndexRequest
1112
import org.opensearch.action.admin.indices.get.GetIndexRequest
1213
import org.opensearch.action.admin.indices.get.GetIndexResponse
@@ -589,6 +590,59 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
589590
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
590591
}
591592

593+
fun `test cleanup monitor on partial create monitor failure`() {
594+
val docQuery = DocLevelQuery(query = "dnbkjndsfkjbnds:\"us-west-2\"", name = "3")
595+
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
596+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
597+
val customQueryIndex = "custom_alerts_index"
598+
val analyzer = "dfbdfbafd"
599+
val testDoc = """{
600+
"rule": {"title": "some_title"},
601+
"message": "msg 1 2 3 4"
602+
}"""
603+
indexDoc(index, "2", testDoc)
604+
client().admin().indices()
605+
.create(
606+
CreateIndexRequest(customQueryIndex + "-000001").alias(Alias(customQueryIndex))
607+
.mapping(
608+
"""
609+
{
610+
"_meta": {
611+
"schema_version": 1
612+
},
613+
"properties": {
614+
"query": {
615+
"type": "percolator_ext"
616+
},
617+
"monitor_id": {
618+
"type": "text"
619+
},
620+
"index": {
621+
"type": "text"
622+
}
623+
}
624+
}
625+
""".trimIndent()
626+
)
627+
).get()
628+
629+
client().admin().indices().close(CloseIndexRequest(customQueryIndex + "-000001")).get()
630+
var monitor = randomDocumentLevelMonitor(
631+
inputs = listOf(docLevelInput),
632+
triggers = listOf(trigger),
633+
dataSources = DataSources(
634+
queryIndex = customQueryIndex,
635+
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
636+
)
637+
)
638+
try {
639+
createMonitor(monitor)
640+
fail("monitor creation should fail due to incorrect analyzer name in test setup")
641+
} catch (e: Exception) {
642+
Assert.assertEquals(client().search(SearchRequest(SCHEDULED_JOBS_INDEX)).get().hits.hits.size, 0)
643+
}
644+
}
645+
592646
fun `test execute monitor without create when no monitors exists`() {
593647
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
594648
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

0 commit comments

Comments
 (0)