Skip to content

Commit 6d28218

Browse files
committed
log errors and clean up monitor when indexing doc level queries or metadata creation fails
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent 461e95f commit 6d28218

File tree

4 files changed

+251
-66
lines changed

4 files changed

+251
-66
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt

Lines changed: 88 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@
55

66
package org.opensearch.alerting.transport
77

8-
import kotlinx.coroutines.CoroutineName
8+
import kotlinx.coroutines.CoroutineScope
99
import kotlinx.coroutines.Dispatchers
10-
import kotlinx.coroutines.GlobalScope
1110
import kotlinx.coroutines.launch
1211
import org.apache.logging.log4j.LogManager
1312
import org.opensearch.OpenSearchStatusException
@@ -25,6 +24,7 @@ import org.opensearch.action.search.SearchResponse
2524
import org.opensearch.action.support.ActionFilters
2625
import org.opensearch.action.support.HandledTransportAction
2726
import org.opensearch.action.support.IndicesOptions
27+
import org.opensearch.action.support.WriteRequest
2828
import org.opensearch.action.support.master.AcknowledgedResponse
2929
import org.opensearch.alerting.MonitorMetadataService
3030
import org.opensearch.alerting.opensearchapi.suspendUntil
@@ -57,6 +57,7 @@ import kotlin.coroutines.resume
5757
import kotlin.coroutines.resumeWithException
5858
import kotlin.coroutines.suspendCoroutine
5959

60+
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
6061
private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)
6162

6263
class TransportDeleteMonitorAction @Inject constructor(
@@ -87,8 +88,7 @@ class TransportDeleteMonitorAction @Inject constructor(
8788
if (!validateUserBackendRoles(user, actionListener)) {
8889
return
8990
}
90-
91-
GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) {
91+
scope.launch {
9292
DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart()
9393
}
9494
}
@@ -109,16 +109,15 @@ class TransportDeleteMonitorAction @Inject constructor(
109109
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)
110110

111111
if (canDelete) {
112-
val deleteResponse = deleteMonitor(monitor)
113-
deleteDocLevelMonitorQueriesAndIndices(monitor)
114-
deleteMetadata(monitor)
112+
val deleteResponse = deleteAllResourcesForMonitor(client, monitor, deleteRequest, monitorId)
115113
actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version))
116114
} else {
117115
actionListener.onFailure(
118116
AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException())
119117
)
120118
}
121119
} catch (t: Exception) {
120+
log.error("Failed to delete monitor ${deleteRequest.id()}", t)
122121
actionListener.onFailure(AlertingException.wrap(t))
123122
}
124123
}
@@ -140,68 +139,102 @@ class TransportDeleteMonitorAction @Inject constructor(
140139
)
141140
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
142141
}
142+
}
143+
144+
companion object {
145+
@JvmStatic
146+
suspend fun deleteAllResourcesForMonitor(
147+
client: Client,
148+
monitor: Monitor,
149+
deleteRequest: DeleteRequest,
150+
monitorId: String,
151+
): DeleteResponse {
152+
val deleteResponse = deleteMonitorDocument(client, deleteRequest)
153+
deleteMetadata(client, monitor)
154+
deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId)
155+
return deleteResponse
156+
}
143157

144-
private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse {
158+
private suspend fun deleteMonitorDocument(client: Client, deleteRequest: DeleteRequest): DeleteResponse {
145159
return client.suspendUntil { delete(deleteRequest, it) }
146160
}
147161

148-
private suspend fun deleteMetadata(monitor: Monitor) {
162+
suspend fun deleteMetadata(client: Client, monitor: Monitor) {
149163
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata")
150-
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
164+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
165+
try {
166+
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
167+
log.debug("Monitor metadata: ${deleteResponse.id} deletion result: ${deleteResponse.result}")
168+
} catch (e: Exception) {
169+
// we only log the error and don't fail the request because if monitor document has been deleted,
170+
// we cannot retry based on this failure
171+
log.error("Failed to delete workflow metadata for monitor ${monitor.id}.", e)
172+
}
151173
}
152174

153-
private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) {
154-
val clusterState = clusterService.state()
155-
val metadata = MonitorMetadataService.getMetadata(monitor)
156-
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->
157-
158-
val indicesExistsResponse: IndicesExistsResponse =
159-
client.suspendUntil {
160-
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
175+
suspend fun deleteDocLevelMonitorQueriesAndIndices(
176+
client: Client,
177+
monitor: Monitor,
178+
monitorId: String,
179+
) {
180+
try {
181+
val metadata = MonitorMetadataService.getMetadata(monitor)
182+
metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) ->
183+
184+
val indicesExistsResponse: IndicesExistsResponse =
185+
client.suspendUntil {
186+
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
187+
}
188+
if (indicesExistsResponse.isExists == false) {
189+
return
161190
}
162-
if (indicesExistsResponse.isExists == false) {
163-
return
164-
}
165-
// Check if there's any queries from other monitors in this queryIndex,
166-
// to avoid unnecessary doc deletion, if we could just delete index completely
167-
val searchResponse: SearchResponse = client.suspendUntil {
168-
search(
169-
SearchRequest(queryIndex).source(
170-
SearchSourceBuilder()
171-
.size(0)
172-
.query(
173-
QueryBuilders.boolQuery().mustNot(
174-
QueryBuilders.matchQuery("monitor_id", monitorId)
191+
// Check if there's any queries from other monitors in this queryIndex,
192+
// to avoid unnecessary doc deletion, if we could just delete index completely
193+
val searchResponse: SearchResponse = client.suspendUntil {
194+
search(
195+
SearchRequest(queryIndex).source(
196+
SearchSourceBuilder()
197+
.size(0)
198+
.query(
199+
QueryBuilders.boolQuery().mustNot(
200+
QueryBuilders.matchQuery("monitor_id", monitorId)
201+
)
175202
)
176-
)
177-
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
178-
it
179-
)
180-
}
181-
if (searchResponse.hits.totalHits.value == 0L) {
182-
val ack: AcknowledgedResponse = client.suspendUntil {
183-
client.admin().indices().delete(
184-
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it
203+
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN),
204+
it
185205
)
186206
}
187-
if (ack.isAcknowledged == false) {
188-
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
189-
}
190-
} else {
191-
// Delete all queries added by this monitor
192-
val response: BulkByScrollResponse = suspendCoroutine { cont ->
193-
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
194-
.source(queryIndex)
195-
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
196-
.refresh(true)
197-
.execute(
198-
object : ActionListener<BulkByScrollResponse> {
199-
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
200-
override fun onFailure(t: Exception) = cont.resumeWithException(t)
201-
}
207+
if (searchResponse.hits.totalHits.value == 0L) {
208+
val ack: AcknowledgedResponse = client.suspendUntil {
209+
client.admin().indices().delete(
210+
DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it
202211
)
212+
}
213+
if (ack.isAcknowledged == false) {
214+
log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!")
215+
}
216+
} else {
217+
// Delete all queries added by this monitor
218+
val response: BulkByScrollResponse = suspendCoroutine { cont ->
219+
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
220+
.source(queryIndex)
221+
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
222+
.refresh(true)
223+
.execute(
224+
object : ActionListener<BulkByScrollResponse> {
225+
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
226+
override fun onFailure(t: Exception) {
227+
cont.resumeWithException(t)
228+
}
229+
}
230+
)
231+
}
203232
}
204233
}
234+
} catch (e: Exception) {
235+
// we only log the error and don't fail the request because if monitor document has been deleted successfully,
236+
// we cannot retry based on this failure
237+
log.error("Failed to delete workflow metadata for monitor ${monitor.id}.", e)
205238
}
206239
}
207240
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthAction
2020
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
2121
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
2222
import org.opensearch.action.admin.indices.create.CreateIndexResponse
23+
import org.opensearch.action.delete.DeleteRequest
2324
import org.opensearch.action.get.GetRequest
2425
import org.opensearch.action.get.GetResponse
2526
import org.opensearch.action.index.IndexRequest
@@ -177,7 +178,7 @@ class TransportIndexMonitorAction @Inject constructor(
177178
client: Client,
178179
actionListener: ActionListener<IndexMonitorResponse>,
179180
request: IndexMonitorRequest,
180-
user: User?
181+
user: User?,
181182
) {
182183
val indices = mutableListOf<String>()
183184
// todo: for doc level alerting: check if index is present before monitor is created.
@@ -230,7 +231,7 @@ class TransportIndexMonitorAction @Inject constructor(
230231
client: Client,
231232
actionListener: ActionListener<IndexMonitorResponse>,
232233
request: IndexMonitorRequest,
233-
user: User?
234+
user: User?,
234235
) {
235236
client.threadPool().threadContext.stashContext().use {
236237
IndexMonitorHandler(client, actionListener, request, user).resolveUserAndStartForAD()
@@ -241,7 +242,7 @@ class TransportIndexMonitorAction @Inject constructor(
241242
private val client: Client,
242243
private val actionListener: ActionListener<IndexMonitorResponse>,
243244
private val request: IndexMonitorRequest,
244-
private val user: User?
245+
private val user: User?,
245246
) {
246247

247248
fun resolveUserAndStart() {
@@ -492,16 +493,30 @@ class TransportIndexMonitorAction @Inject constructor(
492493
)
493494
return
494495
}
495-
request.monitor = request.monitor.copy(id = indexResponse.id)
496-
var (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
497-
if (created == false) {
498-
log.warn("Metadata doc id:${metadata.id} exists, but it shouldn't!")
496+
var metadata: MonitorMetadata?
497+
try { // delete monitor if metadata creation fails, log the right error and re-throw the error to fail listener
498+
request.monitor = request.monitor.copy(id = indexResponse.id)
499+
var (monitorMetadata: MonitorMetadata, created: Boolean) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
500+
if (created == false) {
501+
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
502+
}
503+
metadata = monitorMetadata
504+
} catch (t: Exception) {
505+
log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor")
506+
cleanupMonitorAfterPartialFailure(request.monitor, indexResponse)
507+
throw t
499508
}
500-
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
501-
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
509+
try {
510+
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
511+
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
512+
}
513+
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
514+
MonitorMetadataService.upsertMetadata(metadata, updating = true)
515+
} catch (t: Exception) {
516+
log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t)
517+
cleanupMonitorAfterPartialFailure(request.monitor, indexResponse)
518+
throw t
502519
}
503-
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
504-
MonitorMetadataService.upsertMetadata(metadata, updating = true)
505520

506521
actionListener.onResponse(
507522
IndexMonitorResponse(
@@ -514,6 +529,24 @@ class TransportIndexMonitorAction @Inject constructor(
514529
}
515530
}
516531

532+
private suspend fun cleanupMonitorAfterPartialFailure(monitor: Monitor, indexMonitorResponse: IndexResponse) {
533+
// we simply log the success (debug log) or failure (error log) when we try clean up partially failed monitor creation request
534+
try {
535+
TransportDeleteMonitorAction.deleteAllResourcesForMonitor(
536+
client,
537+
monitor = monitor,
538+
DeleteRequest(SCHEDULED_JOBS_INDEX, indexMonitorResponse.id).setRefreshPolicy(RefreshPolicy.IMMEDIATE),
539+
indexMonitorResponse.id
540+
)
541+
log.debug(
542+
"Cleaned up monitor related resources after monitor creation request partial failure. " +
543+
"Monitor id : ${indexMonitorResponse.id}"
544+
)
545+
} catch (e: Exception) {
546+
log.error("Failed to clean up monitor after monitor creation request partial failure", e)
547+
}
548+
}
549+
517550
@Suppress("UNCHECKED_CAST")
518551
private suspend fun indexDocLevelMonitorQueries(
519552
monitor: Monitor,

alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.junit.Assert
99
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
10+
import org.opensearch.action.admin.indices.alias.Alias
1011
import org.opensearch.action.admin.indices.close.CloseIndexRequest
1112
import org.opensearch.action.admin.indices.create.CreateIndexRequest
1213
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
@@ -755,6 +756,59 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
755756
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
756757
}
757758

759+
fun `test cleanup monitor on partial create monitor failure`() {
760+
val docQuery = DocLevelQuery(query = "dnbkjndsfkjbnds:\"us-west-2\"", name = "3")
761+
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
762+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
763+
val customQueryIndex = "custom_alerts_index"
764+
val analyzer = "dfbdfbafd"
765+
val testDoc = """{
766+
"rule": {"title": "some_title"},
767+
"message": "msg 1 2 3 4"
768+
}"""
769+
indexDoc(index, "2", testDoc)
770+
client().admin().indices()
771+
.create(
772+
CreateIndexRequest(customQueryIndex + "-000001").alias(Alias(customQueryIndex))
773+
.mapping(
774+
"""
775+
{
776+
"_meta": {
777+
"schema_version": 1
778+
},
779+
"properties": {
780+
"query": {
781+
"type": "percolator_ext"
782+
},
783+
"monitor_id": {
784+
"type": "text"
785+
},
786+
"index": {
787+
"type": "text"
788+
}
789+
}
790+
}
791+
""".trimIndent()
792+
)
793+
).get()
794+
795+
client().admin().indices().close(CloseIndexRequest(customQueryIndex + "-000001")).get()
796+
var monitor = randomDocumentLevelMonitor(
797+
inputs = listOf(docLevelInput),
798+
triggers = listOf(trigger),
799+
dataSources = DataSources(
800+
queryIndex = customQueryIndex,
801+
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
802+
)
803+
)
804+
try {
805+
createMonitor(monitor)
806+
fail("monitor creation should fail due to incorrect analyzer name in test setup")
807+
} catch (e: Exception) {
808+
Assert.assertEquals(client().search(SearchRequest(SCHEDULED_JOBS_INDEX)).get().hits.hits.size, 0)
809+
}
810+
}
811+
758812
fun `test execute monitor without create when no monitors exists`() {
759813
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
760814
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

0 commit comments

Comments
 (0)