Skip to content

Commit c00586a

Browse files
jowg-amazonengechas
authored andcommitted
clean up doc level queries on dry run (#1430)
Signed-off-by: Joanne Wang <[email protected]>
1 parent 261e16d commit c00586a

File tree

3 files changed

+178
-0
lines changed

3 files changed

+178
-0
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
334334
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
335335
true
336336
)
337+
} else {
338+
// Clean up any queries created by the dry run monitor
339+
monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata)
337340
}
338341

339342
// TODO: Update the Document as part of the Trigger and return back the trigger action result

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import org.opensearch.action.admin.indices.alias.Alias
1313
import org.opensearch.action.admin.indices.create.CreateIndexRequest
1414
import org.opensearch.action.admin.indices.create.CreateIndexResponse
1515
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
16+
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
17+
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
1618
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
1719
import org.opensearch.action.admin.indices.rollover.RolloverRequest
1820
import org.opensearch.action.admin.indices.rollover.RolloverResponse
@@ -38,8 +40,16 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
3840
import org.opensearch.commons.alerting.model.DocLevelQuery
3941
import org.opensearch.commons.alerting.model.Monitor
4042
import org.opensearch.commons.alerting.model.ScheduledJob
43+
import org.opensearch.core.action.ActionListener
4144
import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING
45+
import org.opensearch.index.query.QueryBuilders
46+
import org.opensearch.index.reindex.BulkByScrollResponse
47+
import org.opensearch.index.reindex.DeleteByQueryAction
48+
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
4249
import org.opensearch.rest.RestStatus
50+
import kotlin.coroutines.resume
51+
import kotlin.coroutines.resumeWithException
52+
import kotlin.coroutines.suspendCoroutine
4353

4454
private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)
4555

@@ -134,6 +144,42 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
134144
return true
135145
}
136146

147+
suspend fun deleteDocLevelQueriesOnDryRun(monitorMetadata: MonitorMetadata) {
148+
try {
149+
monitorMetadata.sourceToQueryIndexMapping.forEach { (_, queryIndex) ->
150+
val indicesExistsResponse: IndicesExistsResponse =
151+
client.suspendUntil {
152+
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
153+
}
154+
if (indicesExistsResponse.isExists == false) {
155+
return
156+
}
157+
158+
val queryBuilder = QueryBuilders.boolQuery()
159+
.must(QueryBuilders.existsQuery("monitor_id"))
160+
.mustNot(QueryBuilders.wildcardQuery("monitor_id", "*"))
161+
162+
val response: BulkByScrollResponse = suspendCoroutine { cont ->
163+
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
164+
.source(queryIndex)
165+
.filter(queryBuilder)
166+
.refresh(true)
167+
.execute(
168+
object : ActionListener<BulkByScrollResponse> {
169+
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
170+
override fun onFailure(t: Exception) = cont.resumeWithException(t)
171+
}
172+
)
173+
}
174+
response.bulkFailures.forEach {
175+
log.error("Failed deleting queries while removing dry run queries: [${it.id}] cause: [${it.cause}] ")
176+
}
177+
}
178+
} catch (e: Exception) {
179+
log.error("Failed to delete doc level queries on dry run", e)
180+
}
181+
}
182+
137183
fun docLevelQueryIndexExists(dataSources: DataSources): Boolean {
138184
val clusterState = clusterService.state()
139185
return clusterState.metadata.hasAlias(dataSources.queryIndex)

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,21 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
7474

7575
val alerts = searchAlerts(monitor)
7676
assertEquals("Alert saved for test monitor", 0, alerts.size)
77+
78+
// ensure doc level query is deleted on dry run
79+
val request = """{
80+
"size": 10,
81+
"query": {
82+
"match_all": {}
83+
}
84+
}"""
85+
var httpResponse = adminClient().makeRequest(
86+
"GET", "/${monitor.dataSources.queryIndex}/_search",
87+
StringEntity(request, ContentType.APPLICATION_JSON)
88+
)
89+
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
90+
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
91+
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
7792
}
7893

7994
fun `test dryrun execute monitor with queryFieldNames set up with correct field`() {
@@ -297,6 +312,120 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
297312
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
298313
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
299314
assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex"))
315+
316+
// ensure doc level query is deleted on dry run
317+
val request = """{
318+
"size": 10,
319+
"query": {
320+
"match_all": {}
321+
}
322+
}"""
323+
var httpResponse = adminClient().makeRequest(
324+
"GET", "/${monitor.dataSources.queryIndex}/_search",
325+
StringEntity(request, ContentType.APPLICATION_JSON)
326+
)
327+
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
328+
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
329+
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
330+
}
331+
332+
fun `test execute monitor returns search result with dryrun then without dryrun ensure dry run query not saved`() {
333+
val testIndex = createTestIndex()
334+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
335+
val testDoc = """{
336+
"message" : "This is an error from IAD region",
337+
"test_strict_date_time" : "$testTime",
338+
"test_field" : "us-west-2"
339+
}"""
340+
341+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
342+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
343+
344+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
345+
val monitor = randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))
346+
347+
indexDoc(testIndex, "1", testDoc)
348+
indexDoc(testIndex, "2", testDoc)
349+
350+
val response = executeMonitor(monitor, params = DRYRUN_MONITOR)
351+
352+
val output = entityAsMap(response)
353+
354+
assertEquals(monitor.name, output["monitor_name"])
355+
@Suppress("UNCHECKED_CAST")
356+
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
357+
@Suppress("UNCHECKED_CAST")
358+
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
359+
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
360+
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
361+
assertTrue("Incorrect search result", matchingDocsToQuery.contains("2|$testIndex"))
362+
363+
// ensure doc level query is deleted on dry run
364+
val request = """{
365+
"size": 10,
366+
"query": {
367+
"match_all": {}
368+
}
369+
}"""
370+
var httpResponse = adminClient().makeRequest(
371+
"GET", "/${monitor.dataSources.queryIndex}/_search",
372+
StringEntity(request, ContentType.APPLICATION_JSON)
373+
)
374+
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
375+
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
376+
searchResponse.hits.totalHits?.let { assertEquals(0L, it.value) }
377+
378+
// create and execute second monitor not as dryrun
379+
val testIndex2 = createTestIndex("test1")
380+
val testTime2 = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
381+
val testDoc2 = """{
382+
"message" : "This is an error from IAD region",
383+
"test_strict_date_time" : "$testTime2",
384+
"test_field" : "us-east-1"
385+
}"""
386+
387+
val docQuery2 = DocLevelQuery(query = "test_field:\"us-east-1\"", name = "3", fields = listOf())
388+
val docLevelInput2 = DocLevelMonitorInput("description", listOf(testIndex2), listOf(docQuery2))
389+
390+
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
391+
val monitor2 = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput2), triggers = listOf(trigger2)))
392+
assertNotNull(monitor2.id)
393+
394+
indexDoc(testIndex2, "1", testDoc2)
395+
indexDoc(testIndex2, "5", testDoc2)
396+
397+
val response2 = executeMonitor(monitor2.id)
398+
val output2 = entityAsMap(response2)
399+
400+
assertEquals(monitor2.name, output2["monitor_name"])
401+
@Suppress("UNCHECKED_CAST")
402+
val searchResult2 = (output2.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
403+
@Suppress("UNCHECKED_CAST")
404+
val matchingDocsToQuery2 = searchResult2[docQuery2.id] as List<String>
405+
assertEquals("Incorrect search result", 2, matchingDocsToQuery2.size)
406+
assertTrue("Incorrect search result", matchingDocsToQuery2.containsAll(listOf("1|$testIndex2", "5|$testIndex2")))
407+
408+
val alerts = searchAlertsWithFilter(monitor2)
409+
assertEquals("Alert saved for test monitor", 2, alerts.size)
410+
411+
val findings = searchFindings(monitor2)
412+
assertEquals("Findings saved for test monitor", 2, findings.size)
413+
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
414+
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))
415+
416+
// ensure query from second monitor was saved
417+
val expectedQueries = listOf("test_field_test1_${monitor2.id}:\"us-east-1\"")
418+
httpResponse = adminClient().makeRequest(
419+
"GET", "/${monitor.dataSources.queryIndex}/_search",
420+
StringEntity(request, ContentType.APPLICATION_JSON)
421+
)
422+
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
423+
searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
424+
searchResponse.hits.forEach { hit ->
425+
val query = ((hit.sourceAsMap["query"] as Map<String, Any>)["query_string"] as Map<String, Any>)["query"]
426+
assertTrue(expectedQueries.contains(query))
427+
}
428+
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 1L, it.value) }
300429
}
301430

302431
fun `test execute monitor generates alerts and findings`() {

0 commit comments

Comments
 (0)