Skip to content

Commit a6d2b1e

Browse files
committed
mappings traversal bug fix (opensearch-project#669)
Signed-off-by: Petar Dzepina <[email protected]>
1 parent bf63b34 commit a6d2b1e

File tree

2 files changed

+185
-22
lines changed

2 files changed

+185
-22
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)
3232

3333
class DocLevelMonitorQueries(private val client: Client, private val clusterService: ClusterService) {
3434
companion object {
35+
36+
val PROPERTIES = "properties"
37+
val NESTED = "nested"
38+
val TYPE = "type"
39+
3540
@JvmStatic
3641
fun docLevelQueriesMappings(): String {
3742
return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText()
@@ -95,6 +100,58 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
95100
return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
96101
}
97102

103+
/**
104+
* Does a DFS traversal of index mappings tree.
105+
* Calls processLeafFn on every leaf node.
106+
* Populates flattenPaths list with full paths of leaf nodes
107+
* @param node current node which we're visiting
108+
* @param currentPath current node path from root node
109+
* @param processLeafFn leaf processor function which is called on every leaf discovered
110+
* @param flattenPaths list of full paths of all leaf nodes relative to root
111+
*/
112+
fun traverseMappingsAndUpdate(
113+
node: MutableMap<String, Any>,
114+
currentPath: String,
115+
processLeafFn: (String, MutableMap<String, Any>) -> Triple<String, String, MutableMap<String, Any>>,
116+
flattenPaths: MutableList<String>
117+
) {
118+
// If node contains "properties" property then it is internal(non-leaf) node
119+
if (node.containsKey(PROPERTIES)) {
120+
return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath, processLeafFn, flattenPaths)
121+
} else if (node.containsKey(TYPE) == false) {
122+
// If there is no "type" property, this is either internal(non-leaf) node or leaf node
123+
// newNodes will hold list of updated leaf properties
124+
var newNodes = ArrayList<Triple<String, String, Any>>(node.size)
125+
node.entries.forEach {
126+
// Compute full path relative to root
127+
val fullPath = if (currentPath.isEmpty()) it.key
128+
else "$currentPath.${it.key}"
129+
val nodeProps = it.value as MutableMap<String, Any>
130+
// If it has type property and type is not "nested" then this is a leaf
131+
if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) {
132+
// At this point we know full path of node, so we add it to output array
133+
flattenPaths.add(fullPath)
134+
// Calls processLeafFn and gets old node name, new node name and new properties of node.
135+
// This is all information we need to update this node
136+
val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap<String, Any>)
137+
newNodes.add(Triple(oldName, newName, props))
138+
} else {
139+
// Internal(non-leaf) node - visit children
140+
traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap<String, Any>, fullPath, processLeafFn, flattenPaths)
141+
}
142+
}
143+
// Here we can update all processed leaves in tree
144+
newNodes.forEach {
145+
// If we renamed leaf, we have to remove it first
146+
if (it.first != it.second) {
147+
node.remove(it.first)
148+
}
149+
// Put new properties of leaf
150+
node.put(it.second, it.third)
151+
}
152+
}
153+
}
154+
98155
suspend fun indexDocLevelQueries(
99156
monitor: Monitor,
100157
monitorId: String,
@@ -113,28 +170,39 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
113170
}
114171
val indices = getIndexResponse.indices()
115172

173+
// Run through each backing index and apply appropriate mappings to query index
116174
indices?.forEach { indexName ->
117175
if (clusterState.routingTable.hasIndex(indexName)) {
118176
val indexMetadata = clusterState.metadata.index(indexName)
119177
if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) {
120178
val properties = (
121179
(indexMetadata.mapping()?.sourceAsMap?.get("properties"))
122-
as Map<String, Map<String, Any>>
180+
as MutableMap<String, Any>
123181
)
124-
125-
val updatedProperties = properties.entries.associate {
126-
val newVal = it.value.toMutableMap()
127-
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
128-
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
129-
if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) {
130-
mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
131-
newVal[iter.key] = iter.value
182+
// Node processor function is used to process leaves of index mappings tree
183+
//
184+
val leafNodeProcessor =
185+
fun(fieldName: String, props: MutableMap<String, Any>): Triple<String, String, MutableMap<String, Any>> {
186+
val newProps = props.toMutableMap()
187+
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
188+
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
189+
if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) {
190+
mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
191+
newProps[iter.key] = iter.value
192+
}
132193
}
133194
}
195+
if (props.containsKey("path")) {
196+
newProps["path"] = "${props["path"]}_${indexName}_$monitorId"
197+
}
198+
return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps)
134199
}
135-
if (it.value.containsKey("path")) newVal["path"] = "${it.value["path"]}_${indexName}_$monitorId"
136-
"${it.key}_${indexName}_$monitorId" to newVal
137-
}
200+
// Traverse and update index mappings here while extracting flatten field paths
201+
val flattenPaths = mutableListOf<String>()
202+
traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths)
203+
// Updated mappings ready to be applied on queryIndex
204+
val updatedProperties = properties
205+
138206
val queryIndex = monitor.dataSources.queryIndex
139207

140208
val updateMappingRequest = PutMappingRequest(queryIndex)
@@ -147,8 +215,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
147215
val indexRequests = mutableListOf<IndexRequest>()
148216
queries.forEach {
149217
var query = it.query
150-
properties.forEach { prop ->
151-
query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:")
218+
flattenPaths.forEach { fieldPath ->
219+
query = query.replace("$fieldPath:", "${fieldPath}_${indexName}_$monitorId:")
152220
}
153221
val indexRequest = IndexRequest(queryIndex)
154222
.id(it.id + "_${indexName}_$monitorId")

alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package org.opensearch.alerting
88
import org.junit.Assert
99
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
1010
import org.opensearch.action.admin.indices.create.CreateIndexRequest
11+
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
1112
import org.opensearch.action.admin.indices.refresh.RefreshRequest
1213
import org.opensearch.action.search.SearchRequest
1314
import org.opensearch.action.support.WriteRequest
@@ -33,6 +34,7 @@ import org.opensearch.test.OpenSearchTestCase
3334
import java.time.ZonedDateTime
3435
import java.time.format.DateTimeFormatter
3536
import java.time.temporal.ChronoUnit.MILLIS
37+
import java.util.Map
3638
import java.util.concurrent.TimeUnit
3739
import java.util.stream.Collectors
3840

@@ -128,25 +130,45 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
128130
}
129131

130132
fun `test execute monitor with custom query index`() {
131-
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
132-
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
133+
val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
134+
val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
135+
val docQuery3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5")
136+
val docQuery4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6")
137+
val docQuery5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7")
138+
val docLevelInput = DocLevelMonitorInput(
139+
"description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5)
140+
)
133141
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
142+
val customFindingsIndex = "custom_findings_index"
143+
val customFindingsIndexPattern = "custom_findings_index-1"
134144
val customQueryIndex = "custom_alerts_index"
135145
var monitor = randomDocumentLevelMonitor(
136146
inputs = listOf(docLevelInput),
137147
triggers = listOf(trigger),
138-
dataSources = DataSources(queryIndex = customQueryIndex)
148+
dataSources = DataSources(
149+
queryIndex = customQueryIndex,
150+
findingsIndex = customFindingsIndex,
151+
findingsIndexPattern = customFindingsIndexPattern
152+
)
139153
)
140154
val monitorResponse = createMonitor(monitor)
141155
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
156+
// Trying to test here few different "nesting" situations and "wierd" characters
142157
val testDoc = """{
143158
"message" : "This is an error from IAD region",
159+
"source.ip.v6.v1" : 12345,
160+
"source.ip.v6.v2" : 16645,
161+
"source.ip.v4.v0" : 120,
162+
"test_bad_char" : "\u0000",
144163
"test_strict_date_time" : "$testTime",
145-
"test_field" : "us-west-2"
164+
"test_field.some_other_field" : "us-west-2"
146165
}"""
166+
indexDoc(index, "1", testDoc)
167+
client().admin().indices().putMapping(
168+
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
169+
)
147170
assertFalse(monitorResponse?.id.isNullOrEmpty())
148171
monitor = monitorResponse!!.monitor
149-
indexDoc(index, "1", testDoc)
150172
val id = monitorResponse.id
151173
val executeMonitorResponse = executeMonitor(monitor, id, false)
152174
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
@@ -158,11 +180,85 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
158180
.get()
159181
Assert.assertTrue(getAlertsResponse != null)
160182
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
161-
getAlertsResponse = client()
162-
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null))
183+
val findings = searchFindings(id, customFindingsIndex)
184+
assertEquals("Findings saved for test monitor", 1, findings.size)
185+
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
186+
assertEquals("Didn't match all 5 queries", 5, findings[0].docLevelQueries.size)
187+
}
188+
189+
fun `test execute monitor with custom query index and nested mappings`() {
190+
val docQuery1 = DocLevelQuery(query = "message:\"msg 1 2 3 4\"", name = "3")
191+
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
192+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
193+
val customFindingsIndex = "custom_findings_index"
194+
val customFindingsIndexPattern = "custom_findings_index-1"
195+
val customQueryIndex = "custom_alerts_index"
196+
var monitor = randomDocumentLevelMonitor(
197+
inputs = listOf(docLevelInput),
198+
triggers = listOf(trigger),
199+
dataSources = DataSources(
200+
queryIndex = customQueryIndex,
201+
findingsIndex = customFindingsIndex,
202+
findingsIndexPattern = customFindingsIndexPattern
203+
)
204+
)
205+
val monitorResponse = createMonitor(monitor)
206+
207+
// We are verifying here that index with nested mappings and nested aliases
208+
// won't break query matching
209+
210+
// Create index mappings
211+
val m: MutableMap<String, Any> = HashMap()
212+
val m1: MutableMap<String, Any> = HashMap()
213+
m1["title"] = Map.of("type", "text")
214+
m1["category"] = Map.of("type", "keyword")
215+
m["rule"] = Map.of("type", "nested", "properties", m1)
216+
val properties = Map.of<String, Any>("properties", m)
217+
218+
client().admin().indices().putMapping(
219+
PutMappingRequest(
220+
index
221+
).source(properties)
222+
).get()
223+
224+
// Put alias for nested fields
225+
val mm: MutableMap<String, Any> = HashMap()
226+
val mm1: MutableMap<String, Any> = HashMap()
227+
mm1["title_alias"] = Map.of("type", "alias", "path", "rule.title")
228+
mm["rule"] = Map.of("type", "nested", "properties", mm1)
229+
val properties1 = Map.of<String, Any>("properties", mm)
230+
client().admin().indices().putMapping(
231+
PutMappingRequest(
232+
index
233+
).source(properties1)
234+
).get()
235+
236+
val testDoc = """{
237+
"rule": {"title": "some_title"},
238+
"message": "msg 1 2 3 4"
239+
}"""
240+
indexDoc(index, "2", testDoc)
241+
242+
client().admin().indices().putMapping(
243+
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
244+
)
245+
assertFalse(monitorResponse?.id.isNullOrEmpty())
246+
monitor = monitorResponse!!.monitor
247+
val id = monitorResponse.id
248+
val executeMonitorResponse = executeMonitor(monitor, id, false)
249+
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
250+
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
251+
searchAlerts(id)
252+
val table = Table("asc", "id", null, 1, 0, "")
253+
var getAlertsResponse = client()
254+
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
163255
.get()
164256
Assert.assertTrue(getAlertsResponse != null)
165257
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
258+
val findings = searchFindings(id, customFindingsIndex)
259+
assertEquals("Findings saved for test monitor", 1, findings.size)
260+
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
261+
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
166262
}
167263

168264
fun `test execute monitor with custom query index and custom field mappings`() {
@@ -373,7 +469,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
373469

374470
val customAlertsIndex = "custom_alerts_index"
375471
val customQueryIndex = "custom_query_index"
376-
Assert.assertFalse(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex))
377472
val customFindingsIndex = "custom_findings_index"
378473
val updateMonitorResponse = updateMonitor(
379474
monitor.copy(

0 commit comments

Comments
 (0)