Skip to content

Commit a5a67ae

Browse files
authored
clean up doc level queries on dry run (#1430)
Signed-off-by: Joanne Wang <[email protected]>
1 parent 77df38b commit a5a67ae

File tree

3 files changed

+178
-0
lines changed

3 files changed

+178
-0
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
345345
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
346346
true
347347
)
348+
} else {
349+
// Clean up any queries created by the dry run monitor
350+
monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata)
348351
}
349352

350353
// TODO: Update the Document as part of the Trigger and return back the trigger action result

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import org.opensearch.action.admin.indices.alias.Alias
1313
import org.opensearch.action.admin.indices.create.CreateIndexRequest
1414
import org.opensearch.action.admin.indices.create.CreateIndexResponse
1515
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
16+
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
17+
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
1618
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
1719
import org.opensearch.action.admin.indices.rollover.RolloverRequest
1820
import org.opensearch.action.admin.indices.rollover.RolloverResponse
@@ -38,8 +40,16 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
3840
import org.opensearch.commons.alerting.model.DocLevelQuery
3941
import org.opensearch.commons.alerting.model.Monitor
4042
import org.opensearch.commons.alerting.model.ScheduledJob
43+
import org.opensearch.core.action.ActionListener
4144
import org.opensearch.core.rest.RestStatus
4245
import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING
46+
import org.opensearch.index.query.QueryBuilders
47+
import org.opensearch.index.reindex.BulkByScrollResponse
48+
import org.opensearch.index.reindex.DeleteByQueryAction
49+
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
50+
import kotlin.coroutines.resume
51+
import kotlin.coroutines.resumeWithException
52+
import kotlin.coroutines.suspendCoroutine
4353

4454
private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)
4555

@@ -134,6 +144,42 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
134144
return true
135145
}
136146

147+
suspend fun deleteDocLevelQueriesOnDryRun(monitorMetadata: MonitorMetadata) {
148+
try {
149+
monitorMetadata.sourceToQueryIndexMapping.forEach { (_, queryIndex) ->
150+
val indicesExistsResponse: IndicesExistsResponse =
151+
client.suspendUntil {
152+
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
153+
}
154+
if (indicesExistsResponse.isExists == false) {
155+
return
156+
}
157+
158+
val queryBuilder = QueryBuilders.boolQuery()
159+
.must(QueryBuilders.existsQuery("monitor_id"))
160+
.mustNot(QueryBuilders.wildcardQuery("monitor_id", "*"))
161+
162+
val response: BulkByScrollResponse = suspendCoroutine { cont ->
163+
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
164+
.source(queryIndex)
165+
.filter(queryBuilder)
166+
.refresh(true)
167+
.execute(
168+
object : ActionListener<BulkByScrollResponse> {
169+
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
170+
override fun onFailure(t: Exception) = cont.resumeWithException(t)
171+
}
172+
)
173+
}
174+
response.bulkFailures.forEach {
175+
log.error("Failed deleting queries while removing dry run queries: [${it.id}] cause: [${it.cause}] ")
176+
}
177+
}
178+
} catch (e: Exception) {
179+
log.error("Failed to delete doc level queries on dry run", e)
180+
}
181+
}
182+
137183
fun docLevelQueryIndexExists(dataSources: DataSources): Boolean {
138184
val clusterState = clusterService.state()
139185
return clusterState.metadata.hasAlias(dataSources.queryIndex)

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,21 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
7474

7575
val alerts = searchAlerts(monitor)
7676
assertEquals("Alert saved for test monitor", 0, alerts.size)
77+
78+
// ensure doc level query is deleted on dry run
79+
val request = """{
80+
"size": 10,
81+
"query": {
82+
"match_all": {}
83+
}
84+
}"""
85+
var httpResponse = adminClient().makeRequest(
86+
"GET", "/${monitor.dataSources.queryIndex}/_search",
87+
StringEntity(request, ContentType.APPLICATION_JSON)
88+
)
89+
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
90+
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
91+
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
7792
}
7893

7994
fun `test dryrun execute monitor with queryFieldNames set up with correct field`() {
@@ -252,6 +267,120 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
252267
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
253268
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
254269
assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex"))
270+
271+
// ensure doc level query is deleted on dry run
272+
val request = """{
273+
"size": 10,
274+
"query": {
275+
"match_all": {}
276+
}
277+
}"""
278+
var httpResponse = adminClient().makeRequest(
279+
"GET", "/${monitor.dataSources.queryIndex}/_search",
280+
StringEntity(request, ContentType.APPLICATION_JSON)
281+
)
282+
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
283+
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
284+
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
285+
}
286+
287+
fun `test execute monitor returns search result with dryrun then without dryrun ensure dry run query not saved`() {
288+
val testIndex = createTestIndex()
289+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
290+
val testDoc = """{
291+
"message" : "This is an error from IAD region",
292+
"test_strict_date_time" : "$testTime",
293+
"test_field" : "us-west-2"
294+
}"""
295+
296+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
297+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
298+
299+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
300+
val monitor = randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))
301+
302+
indexDoc(testIndex, "1", testDoc)
303+
indexDoc(testIndex, "2", testDoc)
304+
305+
val response = executeMonitor(monitor, params = DRYRUN_MONITOR)
306+
307+
val output = entityAsMap(response)
308+
309+
assertEquals(monitor.name, output["monitor_name"])
310+
@Suppress("UNCHECKED_CAST")
311+
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
312+
@Suppress("UNCHECKED_CAST")
313+
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
314+
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
315+
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
316+
assertTrue("Incorrect search result", matchingDocsToQuery.contains("2|$testIndex"))
317+
318+
// ensure doc level query is deleted on dry run
319+
val request = """{
320+
"size": 10,
321+
"query": {
322+
"match_all": {}
323+
}
324+
}"""
325+
var httpResponse = adminClient().makeRequest(
326+
"GET", "/${monitor.dataSources.queryIndex}/_search",
327+
StringEntity(request, ContentType.APPLICATION_JSON)
328+
)
329+
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
330+
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
331+
searchResponse.hits.totalHits?.let { assertEquals(0L, it.value) }
332+
333+
// create and execute second monitor not as dryrun
334+
val testIndex2 = createTestIndex("test1")
335+
val testTime2 = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
336+
val testDoc2 = """{
337+
"message" : "This is an error from IAD region",
338+
"test_strict_date_time" : "$testTime2",
339+
"test_field" : "us-east-1"
340+
}"""
341+
342+
val docQuery2 = DocLevelQuery(query = "test_field:\"us-east-1\"", name = "3", fields = listOf())
343+
val docLevelInput2 = DocLevelMonitorInput("description", listOf(testIndex2), listOf(docQuery2))
344+
345+
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
346+
val monitor2 = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput2), triggers = listOf(trigger2)))
347+
assertNotNull(monitor2.id)
348+
349+
indexDoc(testIndex2, "1", testDoc2)
350+
indexDoc(testIndex2, "5", testDoc2)
351+
352+
val response2 = executeMonitor(monitor2.id)
353+
val output2 = entityAsMap(response2)
354+
355+
assertEquals(monitor2.name, output2["monitor_name"])
356+
@Suppress("UNCHECKED_CAST")
357+
val searchResult2 = (output2.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
358+
@Suppress("UNCHECKED_CAST")
359+
val matchingDocsToQuery2 = searchResult2[docQuery2.id] as List<String>
360+
assertEquals("Incorrect search result", 2, matchingDocsToQuery2.size)
361+
assertTrue("Incorrect search result", matchingDocsToQuery2.containsAll(listOf("1|$testIndex2", "5|$testIndex2")))
362+
363+
val alerts = searchAlertsWithFilter(monitor2)
364+
assertEquals("Alert saved for test monitor", 2, alerts.size)
365+
366+
val findings = searchFindings(monitor2)
367+
assertEquals("Findings saved for test monitor", 2, findings.size)
368+
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
369+
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))
370+
371+
// ensure query from second monitor was saved
372+
val expectedQueries = listOf("test_field_test1_${monitor2.id}:\"us-east-1\"")
373+
httpResponse = adminClient().makeRequest(
374+
"GET", "/${monitor.dataSources.queryIndex}/_search",
375+
StringEntity(request, ContentType.APPLICATION_JSON)
376+
)
377+
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
378+
searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
379+
searchResponse.hits.forEach { hit ->
380+
val query = ((hit.sourceAsMap["query"] as Map<String, Any>)["query_string"] as Map<String, Any>)["query"]
381+
assertTrue(expectedQueries.contains(query))
382+
}
383+
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 1L, it.value) }
255384
}
256385

257386
fun `test execute monitor generates alerts and findings`() {

0 commit comments

Comments
 (0)