Skip to content

Commit b03a5ee

Browse files
petardzeirsep
authored andcommitted
mappings traversal bug fix (opensearch-project#669)
Signed-off-by: Petar Dzepina <[email protected]>
1 parent 51712a2 commit b03a5ee

File tree

2 files changed

+165
-37
lines changed

2 files changed

+165
-37
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -101,18 +101,54 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
101101
}
102102

103103
/**
104-
* From given index mapping node, extracts fieldName -> fieldProperties pair
104+
* Does a DFS traversal of index mappings tree.
105+
* Calls processLeafFn on every leaf node.
106+
* Populates flattenPaths list with full paths of leaf nodes
107+
* @param node current node which we're visiting
108+
* @param currentPath current node path from root node
109+
* @param processLeafFn leaf processor function which is called on every leaf discovered
110+
* @param flattenPaths list of full paths of all leaf nodes relative to root
105111
*/
106-
fun extractField(node: MutableMap<String, Any>, currentPath: String): Pair<String, MutableMap<String, Any>> {
112+
fun traverseMappingsAndUpdate(
113+
node: MutableMap<String, Any>,
114+
currentPath: String,
115+
processLeafFn: (String, MutableMap<String, Any>) -> Triple<String, String, MutableMap<String, Any>>,
116+
flattenPaths: MutableList<String>
117+
) {
118+
// If node contains "properties" property then it is internal(non-leaf) node
107119
if (node.containsKey(PROPERTIES)) {
108-
return extractField(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath)
109-
} else if (node.containsKey(NESTED)) {
110-
return extractField(node.get(NESTED) as MutableMap<String, Any>, currentPath)
111-
} else if (node.size == 1 && node.containsKey(TYPE) == false) {
112-
val iter = node.iterator().next()
113-
return extractField(iter.value as MutableMap<String, Any>, currentPath + "." + iter.key)
114-
} else {
115-
return Pair(currentPath, node)
120+
return traverseMappingsAndUpdate(node.get(PROPERTIES) as MutableMap<String, Any>, currentPath, processLeafFn, flattenPaths)
121+
} else if (node.containsKey(TYPE) == false) {
122+
// If there is no "type" property, this is either internal(non-leaf) node or leaf node
123+
// newNodes will hold list of updated leaf properties
124+
var newNodes = ArrayList<Triple<String, String, Any>>(node.size)
125+
node.entries.forEach {
126+
// Compute full path relative to root
127+
val fullPath = if (currentPath.isEmpty()) it.key
128+
else "$currentPath.${it.key}"
129+
val nodeProps = it.value as MutableMap<String, Any>
130+
// If it has type property and type is not "nested" then this is a leaf
131+
if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) {
132+
// At this point we know full path of node, so we add it to output array
133+
flattenPaths.add(fullPath)
134+
// Calls processLeafFn and gets old node name, new node name and new properties of node.
135+
// This is all information we need to update this node
136+
val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap<String, Any>)
137+
newNodes.add(Triple(oldName, newName, props))
138+
} else {
139+
// Internal(non-leaf) node - visit children
140+
traverseMappingsAndUpdate(nodeProps[PROPERTIES] as MutableMap<String, Any>, fullPath, processLeafFn, flattenPaths)
141+
}
142+
}
143+
// Here we can update all processed leaves in tree
144+
newNodes.forEach {
145+
// If we renamed leaf, we have to remove it first
146+
if (it.first != it.second) {
147+
node.remove(it.first)
148+
}
149+
// Put new properties of leaf
150+
node.put(it.second, it.third)
151+
}
116152
}
117153
}
118154

@@ -134,30 +170,39 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
134170
}
135171
val indices = getIndexResponse.indices()
136172

173+
// Run through each backing index and apply appropriate mappings to query index
137174
indices?.forEach { indexName ->
138175
if (clusterState.routingTable.hasIndex(indexName)) {
139176
val indexMetadata = clusterState.metadata.index(indexName)
140177
if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) {
141178
val properties = (
142179
(indexMetadata.mapping()?.sourceAsMap?.get("properties"))
143-
as Map<String, Map<String, Any>>
180+
as MutableMap<String, Any>
144181
)
145-
146-
val updatedProperties = properties.entries.associate {
147-
var (fieldName, fieldProps) = extractField(it.value as MutableMap<String, Any>, it.key)
148-
val newProps = fieldProps
149-
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
150-
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
151-
if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) {
152-
mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
153-
newProps[iter.key] = iter.value
182+
// Node processor function is used to process leaves of index mappings tree
183+
//
184+
val leafNodeProcessor =
185+
fun(fieldName: String, props: MutableMap<String, Any>): Triple<String, String, MutableMap<String, Any>> {
186+
val newProps = props.toMutableMap()
187+
if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) {
188+
val mappingsByType = monitor.dataSources.queryIndexMappingsByType
189+
if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) {
190+
mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry<String, String> ->
191+
newProps[iter.key] = iter.value
192+
}
154193
}
155194
}
195+
if (props.containsKey("path")) {
196+
newProps["path"] = "${props["path"]}_${indexName}_$monitorId"
197+
}
198+
return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps)
156199
}
200+
// Traverse and update index mappings here while extracting flatten field paths
201+
val flattenPaths = mutableListOf<String>()
202+
traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths)
203+
// Updated mappings ready to be applied on queryIndex
204+
val updatedProperties = properties
157205

158-
if (fieldProps.containsKey("path")) newProps["path"] = "${fieldProps["path"]}_${indexName}_$monitorId"
159-
"${fieldName}_${indexName}_$monitorId" to newProps
160-
}
161206
val queryIndex = monitor.dataSources.queryIndex
162207

163208
val updateMappingRequest = PutMappingRequest(queryIndex)
@@ -170,8 +215,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
170215
val indexRequests = mutableListOf<IndexRequest>()
171216
queries.forEach {
172217
var query = it.query
173-
properties.forEach { prop ->
174-
query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:")
218+
flattenPaths.forEach { fieldPath ->
219+
query = query.replace("$fieldPath:", "${fieldPath}_${indexName}_$monitorId:")
175220
}
176221
val indexRequest = IndexRequest(queryIndex)
177222
.id(it.id + "_${indexName}_$monitorId")

alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt

Lines changed: 95 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.opensearch.test.OpenSearchTestCase
3434
import java.time.ZonedDateTime
3535
import java.time.format.DateTimeFormatter
3636
import java.time.temporal.ChronoUnit.MILLIS
37+
import java.util.Map
3738
import java.util.concurrent.TimeUnit
3839
import java.util.stream.Collectors
3940

@@ -129,37 +130,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
129130
}
130131

131132
fun `test execute monitor with custom query index`() {
132-
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
133-
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
133+
val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")
134+
val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")
135+
val docQuery3 = DocLevelQuery(query = "source.ip.v4.v0:120", name = "5")
136+
val docQuery4 = DocLevelQuery(query = "alias.some.fff:\"us-west-2\"", name = "6")
137+
val docQuery5 = DocLevelQuery(query = "message:\"This is an error from IAD region\"", name = "7")
138+
val docLevelInput = DocLevelMonitorInput(
139+
"description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5)
140+
)
134141
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
142+
val customFindingsIndex = "custom_findings_index"
143+
val customFindingsIndexPattern = "custom_findings_index-1"
135144
val customQueryIndex = "custom_alerts_index"
136145
var monitor = randomDocumentLevelMonitor(
137146
inputs = listOf(docLevelInput),
138147
triggers = listOf(trigger),
139-
dataSources = DataSources(queryIndex = customQueryIndex)
148+
dataSources = DataSources(
149+
queryIndex = customQueryIndex,
150+
findingsIndex = customFindingsIndex,
151+
findingsIndexPattern = customFindingsIndexPattern
152+
)
140153
)
141154
val monitorResponse = createMonitor(monitor)
142155
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
156+
// Trying to test here few different "nesting" situations and "wierd" characters
143157
val testDoc = """{
144158
"message" : "This is an error from IAD region",
145-
"source.port": 12345,
159+
"source.ip.v6.v1" : 12345,
160+
"source.ip.v6.v2" : 16645,
161+
"source.ip.v4.v0" : 120,
162+
"test_bad_char" : "\u0000",
146163
"test_strict_date_time" : "$testTime",
147-
"test_field" : "us-west-2"
164+
"test_field.some_other_field" : "us-west-2"
148165
}"""
149166
indexDoc(index, "1", testDoc)
167+
client().admin().indices().putMapping(
168+
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
169+
)
150170
assertFalse(monitorResponse?.id.isNullOrEmpty())
151171
monitor = monitorResponse!!.monitor
172+
val id = monitorResponse.id
173+
val executeMonitorResponse = executeMonitor(monitor, id, false)
174+
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
175+
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
176+
searchAlerts(id)
177+
val table = Table("asc", "id", null, 1, 0, "")
178+
var getAlertsResponse = client()
179+
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
180+
.get()
181+
Assert.assertTrue(getAlertsResponse != null)
182+
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
183+
val findings = searchFindings(id, customFindingsIndex)
184+
assertEquals("Findings saved for test monitor", 1, findings.size)
185+
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
186+
assertEquals("Didn't match all 5 queries", 5, findings[0].docLevelQueries.size)
187+
}
188+
189+
fun `test execute monitor with custom query index and nested mappings`() {
190+
val docQuery1 = DocLevelQuery(query = "message:\"msg 1 2 3 4\"", name = "3")
191+
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
192+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
193+
val customFindingsIndex = "custom_findings_index"
194+
val customFindingsIndexPattern = "custom_findings_index-1"
195+
val customQueryIndex = "custom_alerts_index"
196+
var monitor = randomDocumentLevelMonitor(
197+
inputs = listOf(docLevelInput),
198+
triggers = listOf(trigger),
199+
dataSources = DataSources(
200+
queryIndex = customQueryIndex,
201+
findingsIndex = customFindingsIndex,
202+
findingsIndexPattern = customFindingsIndexPattern
203+
)
204+
)
205+
val monitorResponse = createMonitor(monitor)
206+
207+
// We are verifying here that index with nested mappings and nested aliases
208+
// won't break query matching
209+
210+
// Create index mappings
211+
val m: MutableMap<String, Any> = HashMap()
212+
val m1: MutableMap<String, Any> = HashMap()
213+
m1["title"] = Map.of("type", "text")
214+
m1["category"] = Map.of("type", "keyword")
215+
m["rule"] = Map.of("type", "nested", "properties", m1)
216+
val properties = Map.of<String, Any>("properties", m)
217+
152218
client().admin().indices().putMapping(
153219
PutMappingRequest(
154220
index
155-
).source("test_alias.field_a", "type=alias,path=message")
221+
).source(properties)
156222
).get()
223+
224+
// Put alias for nested fields
225+
val mm: MutableMap<String, Any> = HashMap()
226+
val mm1: MutableMap<String, Any> = HashMap()
227+
mm1["title_alias"] = Map.of("type", "alias", "path", "rule.title")
228+
mm["rule"] = Map.of("type", "nested", "properties", mm1)
229+
val properties1 = Map.of<String, Any>("properties", mm)
157230
client().admin().indices().putMapping(
158231
PutMappingRequest(
159232
index
160-
).source("test_alias2", "type=alias,path=test_field")
233+
).source(properties1)
161234
).get()
162235

236+
val testDoc = """{
237+
"rule": {"title": "some_title"},
238+
"message": "msg 1 2 3 4"
239+
}"""
240+
indexDoc(index, "2", testDoc)
241+
242+
client().admin().indices().putMapping(
243+
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
244+
)
245+
assertFalse(monitorResponse?.id.isNullOrEmpty())
246+
monitor = monitorResponse!!.monitor
163247
val id = monitorResponse.id
164248
val executeMonitorResponse = executeMonitor(monitor, id, false)
165249
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
@@ -171,11 +255,10 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
171255
.get()
172256
Assert.assertTrue(getAlertsResponse != null)
173257
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
174-
getAlertsResponse = client()
175-
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null))
176-
.get()
177-
Assert.assertTrue(getAlertsResponse != null)
178-
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
258+
val findings = searchFindings(id, customFindingsIndex)
259+
assertEquals("Findings saved for test monitor", 1, findings.size)
260+
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
261+
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
179262
}
180263

181264
fun `test execute monitor with custom query index and custom field mappings`() {

0 commit comments

Comments
 (0)