Skip to content

Commit cb890f0

Browse files
authored
[Backport 2.x] Add an _exists_ check to document level monitor queries (#1425) (#1456)
* Add an _exists_ check to document level monitor queries (#1425) * clean up and add integ tests Signed-off-by: Joanne Wang <[email protected]> * refactored out common method and renamed test Signed-off-by: Joanne Wang <[email protected]> * remove _exists_ flag Signed-off-by: Joanne Wang <[email protected]> --------- Signed-off-by: Joanne Wang <[email protected]> * fix integ test Signed-off-by: Joanne Wang <[email protected]> --------- Signed-off-by: Joanne Wang <[email protected]>
1 parent d3ede1a commit cb890f0

File tree

2 files changed

+366
-0
lines changed

2 files changed

+366
-0
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
396396
var query = it.query
397397
conflictingPaths.forEach { conflictingPath ->
398398
if (query.contains(conflictingPath)) {
399+
query = transformExistsQuery(query, conflictingPath, "<index>", monitorId)
399400
query = query.replace("$conflictingPath:", "${conflictingPath}_<index>_$monitorId:")
400401
filteredConcreteIndices.addAll(conflictingPathToConcreteIndices[conflictingPath]!!)
401402
}
@@ -418,6 +419,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
418419
var query = it.query
419420
flattenPaths.forEach { fieldPath ->
420421
if (!conflictingPaths.contains(fieldPath.first)) {
422+
query = transformExistsQuery(query, fieldPath.first, sourceIndex, monitorId)
421423
query = query.replace("${fieldPath.first}:", "${fieldPath.first}_${sourceIndex}_$monitorId:")
422424
}
423425
}
@@ -448,6 +450,26 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
448450
}
449451
}
450452

453+
/**
454+
* Transforms the query if it includes an _exists_ clause to append the index name and the monitor id to the field value
455+
*/
456+
private fun transformExistsQuery(query: String, conflictingPath: String, indexName: String, monitorId: String): String {
457+
return query
458+
.replace("_exists_: ", "_exists_:") // remove space to read exists query as one string
459+
.split("\\s+".toRegex())
460+
.joinToString(separator = " ") { segment ->
461+
if (segment.contains("_exists_:")) {
462+
val trimSegement = segment.trim { it == '(' || it == ')' } // remove any delimiters from ends
463+
val (_, value) = trimSegement.split(":", limit = 2) // split into key and value
464+
val newString = if (value == conflictingPath)
465+
segment.replace(conflictingPath, "${conflictingPath}_${indexName}_$monitorId") else segment
466+
newString
467+
} else {
468+
segment
469+
}
470+
}
471+
}
472+
451473
private suspend fun updateQueryIndexMappings(
452474
monitor: Monitor,
453475
monitorMetadata: MonitorMetadata,

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1997,6 +1997,350 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
19971997
assertEquals(1, output.objectMap("trigger_results").values.size)
19981998
}
19991999

2000+
fun `test execute monitor generates alerts and findings with NOT EQUALS query and EXISTS query`() {
2001+
val testIndex = createTestIndex()
2002+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
2003+
val testDoc = """{
2004+
"message" : "This is an error from IAD region",
2005+
"test_strict_date_time" : "$testTime",
2006+
"test_field" : "us-west-2"
2007+
}"""
2008+
2009+
val query = "NOT test_field: \"us-east-1\" AND _exists_: test_field"
2010+
val docQuery = DocLevelQuery(query = query, name = "3", fields = listOf())
2011+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
2012+
2013+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
2014+
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
2015+
assertNotNull(monitor.id)
2016+
2017+
indexDoc(testIndex, "1", testDoc)
2018+
indexDoc(testIndex, "5", testDoc)
2019+
2020+
val response = executeMonitor(monitor.id)
2021+
2022+
val output = entityAsMap(response)
2023+
2024+
assertEquals(monitor.name, output["monitor_name"])
2025+
@Suppress("UNCHECKED_CAST")
2026+
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
2027+
@Suppress("UNCHECKED_CAST")
2028+
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
2029+
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
2030+
assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex")))
2031+
2032+
val alerts = searchAlertsWithFilter(monitor)
2033+
assertEquals("Alert saved for test monitor", 2, alerts.size)
2034+
2035+
val findings = searchFindings(monitor)
2036+
assertEquals("Findings saved for test monitor", 2, findings.size)
2037+
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5"))
2038+
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5"))
2039+
}
2040+
2041+
fun `test document-level monitor when index alias contain docs that do match a NOT EQUALS query and EXISTS query`() {
2042+
val aliasName = "test-alias"
2043+
createIndexAlias(
2044+
aliasName,
2045+
"""
2046+
"properties" : {
2047+
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
2048+
"test_field" : { "type" : "keyword" },
2049+
"number" : { "type" : "keyword" }
2050+
}
2051+
""".trimIndent()
2052+
)
2053+
2054+
val docQuery = DocLevelQuery(query = "NOT test_field:\"us-east-1\" AND _exists_: test_field", name = "3", fields = listOf())
2055+
val docLevelInput = DocLevelMonitorInput("description", listOf("$aliasName"), listOf(docQuery))
2056+
2057+
val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
2058+
val monitor = createMonitor(
2059+
randomDocumentLevelMonitor(
2060+
inputs = listOf(docLevelInput),
2061+
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
2062+
)
2063+
)
2064+
2065+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
2066+
val testDoc = """{
2067+
"@timestamp": "$testTime",
2068+
"message" : "This is an error from IAD region",
2069+
"test_strict_date_time" : "$testTime",
2070+
"test_field" : "us-west-2"
2071+
}"""
2072+
indexDoc(aliasName, "1", testDoc)
2073+
var response = executeMonitor(monitor.id)
2074+
var output = entityAsMap(response)
2075+
var searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
2076+
@Suppress("UNCHECKED_CAST")
2077+
var matchingDocsToQuery = searchResult[docQuery.id] as List<String>
2078+
assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)
2079+
2080+
rolloverDatastream(aliasName)
2081+
indexDoc(aliasName, "2", testDoc)
2082+
response = executeMonitor(monitor.id)
2083+
output = entityAsMap(response)
2084+
searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
2085+
@Suppress("UNCHECKED_CAST")
2086+
matchingDocsToQuery = searchResult[docQuery.id] as List<String>
2087+
assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)
2088+
2089+
deleteIndexAlias(aliasName)
2090+
}
2091+
2092+
fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS and EXISTS query operator`() {
2093+
val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}"
2094+
val testQueryName = "wildcard-test-query"
2095+
val testIndex = createTestIndex("${testIndexPrefix}1")
2096+
val testIndex2 = createTestIndex("${testIndexPrefix}2")
2097+
2098+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
2099+
val testDoc = """{
2100+
"message" : "This is an error from IAD region",
2101+
"test_strict_date_time" : "$testTime",
2102+
"test_field" : "us-west-2"
2103+
}"""
2104+
2105+
val query = "NOT test_field:\"us-west-1\" AND _exists_: test_field"
2106+
val docQuery = DocLevelQuery(query = query, name = testQueryName, fields = listOf())
2107+
val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery))
2108+
2109+
val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]"))
2110+
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
2111+
assertNotNull(monitor.id)
2112+
2113+
indexDoc(testIndex, "1", testDoc)
2114+
indexDoc(testIndex2, "5", testDoc)
2115+
2116+
val response = executeMonitor(monitor.id)
2117+
2118+
val output = entityAsMap(response)
2119+
2120+
assertEquals(monitor.name, output["monitor_name"])
2121+
@Suppress("UNCHECKED_CAST")
2122+
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
2123+
@Suppress("UNCHECKED_CAST")
2124+
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
2125+
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
2126+
assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex2")))
2127+
2128+
val alerts = searchAlertsWithFilter(monitor)
2129+
assertEquals("Alert saved for test monitor", 2, alerts.size)
2130+
2131+
val findings = searchFindings(monitor)
2132+
assertEquals("Findings saved for test monitor", 2, findings.size)
2133+
val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") }
2134+
assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size)
2135+
}
2136+
2137+
fun `test execute monitor with indices having fields with same name different field mappings in multiple indices with NOT EQUALS`() {
2138+
val testIndex = createTestIndex(
2139+
"test1",
2140+
""""properties": {
2141+
"source": {
2142+
"properties": {
2143+
"device": {
2144+
"properties": {
2145+
"hwd": {
2146+
"properties": {
2147+
"id": {
2148+
"type":"text",
2149+
"analyzer":"whitespace"
2150+
}
2151+
}
2152+
}
2153+
}
2154+
}
2155+
}
2156+
},
2157+
"test_field" : {
2158+
"type":"text"
2159+
}
2160+
}
2161+
""".trimIndent()
2162+
)
2163+
2164+
val testIndex2 = createTestIndex(
2165+
"test2",
2166+
""""properties": {
2167+
"test_field" : {
2168+
"type":"keyword"
2169+
}
2170+
}
2171+
""".trimIndent()
2172+
)
2173+
2174+
val testIndex4 = createTestIndex(
2175+
"test4",
2176+
""""properties": {
2177+
"source": {
2178+
"properties": {
2179+
"device": {
2180+
"properties": {
2181+
"hwd": {
2182+
"properties": {
2183+
"id": {
2184+
"type":"text"
2185+
}
2186+
}
2187+
}
2188+
}
2189+
}
2190+
}
2191+
},
2192+
"test_field" : {
2193+
"type":"text"
2194+
}
2195+
}
2196+
""".trimIndent()
2197+
)
2198+
2199+
val testDoc1 = """{
2200+
"source" : {"device" : {"hwd" : {"id" : "123456"}} },
2201+
"nested_field": { "test1": "some text" }
2202+
}"""
2203+
val testDoc2 = """{
2204+
"nested_field": { "test1": "some text" },
2205+
"test_field": "123456"
2206+
}"""
2207+
2208+
val docQuery1 = DocLevelQuery(
2209+
query = "NOT test_field:\"12345\" AND _exists_: test_field",
2210+
name = "4",
2211+
fields = listOf()
2212+
)
2213+
val docQuery2 = DocLevelQuery(
2214+
query = "NOT source.device.hwd.id:\"12345\" AND _exists_: source.device.hwd.id",
2215+
name = "5",
2216+
fields = listOf()
2217+
)
2218+
2219+
val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2))
2220+
2221+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
2222+
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
2223+
assertNotNull(monitor.id)
2224+
2225+
indexDoc(testIndex4, "1", testDoc1)
2226+
indexDoc(testIndex2, "1", testDoc2)
2227+
indexDoc(testIndex, "1", testDoc1)
2228+
indexDoc(testIndex, "2", testDoc2)
2229+
2230+
executeMonitor(monitor.id)
2231+
2232+
val alerts = searchAlertsWithFilter(monitor)
2233+
assertEquals("Alert saved for test monitor", 4, alerts.size)
2234+
2235+
val findings = searchFindings(monitor)
2236+
assertEquals("Findings saved for test monitor", 4, findings.size)
2237+
2238+
val request = """{
2239+
"size": 0,
2240+
"query": {
2241+
"match_all": {}
2242+
}
2243+
}"""
2244+
val httpResponse = adminClient().makeRequest(
2245+
"GET", "/${monitor.dataSources.queryIndex}/_search",
2246+
StringEntity(request, ContentType.APPLICATION_JSON)
2247+
)
2248+
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
2249+
2250+
val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
2251+
searchResponse.hits.totalHits?.let { assertEquals(5L, it.value) }
2252+
}
2253+
2254+
fun `test execute monitor with indices having fields with same name but different field mappings with NOT EQUALS`() {
2255+
val testIndex = createTestIndex(
2256+
"test1",
2257+
""""properties": {
2258+
"source": {
2259+
"properties": {
2260+
"id": {
2261+
"type":"text",
2262+
"analyzer":"whitespace"
2263+
}
2264+
}
2265+
},
2266+
"test_field" : {
2267+
"type":"text",
2268+
"analyzer":"whitespace"
2269+
}
2270+
}
2271+
""".trimIndent()
2272+
)
2273+
2274+
val testIndex2 = createTestIndex(
2275+
"test2",
2276+
""""properties": {
2277+
"source": {
2278+
"properties": {
2279+
"id": {
2280+
"type":"text"
2281+
}
2282+
}
2283+
},
2284+
"test_field" : {
2285+
"type":"text"
2286+
}
2287+
}
2288+
""".trimIndent()
2289+
)
2290+
val testDoc = """{
2291+
"source" : {"id" : "12345" },
2292+
"nested_field": { "test1": "some text" },
2293+
"test_field": "12345"
2294+
}"""
2295+
2296+
val docQuery = DocLevelQuery(
2297+
query = "(NOT test_field:\"123456\" AND _exists_:test_field) AND source.id:\"12345\"",
2298+
name = "5",
2299+
fields = listOf()
2300+
)
2301+
val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery))
2302+
2303+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
2304+
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
2305+
assertNotNull(monitor.id)
2306+
2307+
indexDoc(testIndex, "1", testDoc)
2308+
indexDoc(testIndex2, "1", testDoc)
2309+
2310+
executeMonitor(monitor.id)
2311+
2312+
val alerts = searchAlertsWithFilter(monitor)
2313+
assertEquals("Alert saved for test monitor", 2, alerts.size)
2314+
2315+
val findings = searchFindings(monitor)
2316+
assertEquals("Findings saved for test monitor", 2, findings.size)
2317+
2318+
// as mappings of source.id & test_field are different so, both of them expands
2319+
val expectedQueries = listOf(
2320+
"(NOT test_field_test2_${monitor.id}:\"123456\" AND _exists_:test_field_test2_${monitor.id}) " +
2321+
"AND source.id_test2_${monitor.id}:\"12345\"",
2322+
"(NOT test_field_test1_${monitor.id}:\"123456\" AND _exists_:test_field_test1_${monitor.id}) " +
2323+
"AND source.id_test1_${monitor.id}:\"12345\""
2324+
)
2325+
2326+
val request = """{
2327+
"size": 10,
2328+
"query": {
2329+
"match_all": {}
2330+
}
2331+
}"""
2332+
var httpResponse = adminClient().makeRequest(
2333+
"GET", "/${monitor.dataSources.queryIndex}/_search",
2334+
StringEntity(request, ContentType.APPLICATION_JSON)
2335+
)
2336+
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
2337+
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
2338+
searchResponse.hits.forEach { hit ->
2339+
val query = ((hit.sourceAsMap["query"] as Map<String, Any>)["query_string"] as Map<String, Any>)["query"]
2340+
assertTrue(expectedQueries.contains(query))
2341+
}
2342+
}
2343+
20002344
@Suppress("UNCHECKED_CAST")
20012345
/** helper that returns a field in a json map whose values are all json objects */
20022346
private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {

0 commit comments

Comments
 (0)