Skip to content

Commit 0fb9dd0

Browse files
authored
fix bucket level monitor findings to support term aggs in query (#666)
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent 42abf4d commit 0fb9dd0

File tree

2 files changed

+110
-17
lines changed

2 files changed

+110
-17
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.opensearch.script.ScriptType
4646
import org.opensearch.script.TemplateScript
4747
import org.opensearch.search.aggregations.AggregatorFactories
4848
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
49+
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
4950
import org.opensearch.search.builder.SearchSourceBuilder
5051
import java.time.Instant
5152
import java.util.UUID
@@ -71,6 +72,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
7172
val currentAlerts = try {
7273
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
7374
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
75+
if (monitor.dataSources.findingsEnabled == true) {
76+
monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources)
77+
}
7478
monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor)
7579
} catch (e: Exception) {
7680
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
@@ -142,15 +146,19 @@ object BucketLevelMonitorRunner : MonitorRunner() {
142146
*/
143147
if (triggerResults[trigger.id]?.error != null) continue
144148
val findings =
145-
if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) createFindings(
146-
triggerResult,
147-
monitor,
148-
monitorCtx,
149-
periodStart,
150-
periodEnd,
151-
!dryrun && monitor.id != Monitor.NO_ID
152-
)
153-
else emptyList()
149+
if (monitor.triggers.size == 1 && monitor.dataSources.findingsEnabled == true) {
150+
logger.debug("Creating bucket level findings")
151+
createFindings(
152+
triggerResult,
153+
monitor,
154+
monitorCtx,
155+
periodStart,
156+
periodEnd,
157+
!dryrun && monitor.id != Monitor.NO_ID
158+
)
159+
} else {
160+
emptyList()
161+
}
154162
// TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
155163
// be refactored to use a map instead
156164
val categorizedAlerts = monitorCtx.alertService!!.getCategorizedAlertsForBucketLevelMonitor(
@@ -334,15 +342,30 @@ object BucketLevelMonitorRunner : MonitorRunner() {
334342
val bucketValues: Set<String> = triggerResult.aggregationResultBuckets.keys
335343
val query = input.query
336344
var fieldName = ""
337-
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
345+
338346
for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) {
339-
val sources = (aggFactory as CompositeAggregationBuilder).sources()
340-
for (source in sources) {
341-
if (grouByFields > 0) {
347+
when (aggFactory) {
348+
is CompositeAggregationBuilder -> {
349+
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
350+
val sources = aggFactory.sources()
351+
for (source in sources) {
352+
if (grouByFields > 0) {
353+
logger.error("grouByFields > 0. not generating findings for bucket level monitor ${monitor.id}")
354+
return listOf()
355+
}
356+
grouByFields++
357+
fieldName = source.field()
358+
}
359+
}
360+
is TermsAggregationBuilder -> {
361+
fieldName = aggFactory.field()
362+
}
363+
else -> {
364+
logger.error(
365+
"Bucket level monitor findings supported only for composite and term aggs. Found [{${aggFactory.type}}]"
366+
)
342367
return listOf()
343368
}
344-
grouByFields++
345-
fieldName = source.field()
346369
}
347370
}
348371
if (fieldName != "") {
@@ -370,6 +393,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
370393
}
371394
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
372395
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding)
396+
} else {
397+
logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}")
373398
}
374399
}
375400
}
@@ -403,8 +428,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
403428
)
404429

405430
val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
406-
logger.debug("Findings: $findingStr")
431+
logger.debug("Bucket level monitor ${monitor.id} Findings: $findingStr")
407432
if (shouldCreateFinding) {
433+
logger.debug("Saving bucket level monitor findings for monitor ${monitor.id}")
408434
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
409435
.source(findingStr, XContentType.JSON)
410436
.id(finding.id)

alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.opensearch.rest.RestStatus
4040
import org.opensearch.script.Script
4141
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
4242
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder
43+
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
4344
import org.opensearch.search.builder.SearchSourceBuilder
4445
import java.net.URLEncoder
4546
import java.time.Instant
@@ -1322,7 +1323,73 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
13221323
)
13231324
}
13241325

1325-
fun `test bucket-level monitor with findings enabled`() {
1326+
fun `test bucket-level monitor with findings enabled on term agg`() {
1327+
val testIndex = createTestIndex()
1328+
insertSampleTimeSerializedData(
1329+
testIndex,
1330+
listOf(
1331+
"test_value_1",
1332+
"test_value_2"
1333+
)
1334+
)
1335+
1336+
val query = QueryBuilders.rangeQuery("test_strict_date_time")
1337+
.gt("{{period_end}}||-10d")
1338+
.lte("{{period_end}}")
1339+
.format("epoch_millis")
1340+
val termAgg = TermsAggregationBuilder("test_field").field("test_field")
1341+
val input = SearchInput(indices = listOf(testIndex), query = SearchSourceBuilder().size(0).query(query).aggregation(termAgg))
1342+
val triggerScript = """
1343+
params.docCount > 0
1344+
""".trimIndent()
1345+
1346+
// For the Actions ensure that there is at least one and any PER_ALERT actions contain ACTIVE, DEDUPED and COMPLETED in its policy
1347+
// so that the assertions done later in this test don't fail.
1348+
// The config is being mutated this way to still maintain the randomness in configuration (like including other ActionExecutionScope).
1349+
val actions = randomActionsForBucketLevelTrigger(min = 1).map {
1350+
if (it.actionExecutionPolicy?.actionExecutionScope is PerAlertActionScope) {
1351+
it.copy(
1352+
actionExecutionPolicy = ActionExecutionPolicy(
1353+
PerAlertActionScope(setOf(AlertCategory.NEW, AlertCategory.DEDUPED, AlertCategory.COMPLETED))
1354+
)
1355+
)
1356+
} else {
1357+
it
1358+
}
1359+
}
1360+
var trigger = randomBucketLevelTrigger(actions = actions)
1361+
trigger = trigger.copy(
1362+
bucketSelector = BucketSelectorExtAggregationBuilder(
1363+
name = trigger.id,
1364+
bucketsPathsMap = mapOf("docCount" to "_count"),
1365+
script = Script(triggerScript),
1366+
parentBucketPath = "test_field",
1367+
filter = null
1368+
)
1369+
)
1370+
val monitor = createMonitor(
1371+
randomBucketLevelMonitor(
1372+
inputs = listOf(input),
1373+
enabled = false,
1374+
triggers = listOf(trigger),
1375+
dataSources = DataSources(findingsEnabled = true)
1376+
)
1377+
)
1378+
executeMonitor(monitor.id)
1379+
1380+
// Check created Alerts
1381+
var currentAlerts = searchAlerts(monitor)
1382+
assertEquals("Alerts not saved", 2, currentAlerts.size)
1383+
currentAlerts.forEach { alert ->
1384+
Assert.assertEquals("expected findings for alert", alert.findingIds.size, 1)
1385+
}
1386+
val findings = searchFindings(monitor)
1387+
assertEquals("Findings saved for test monitor", 1, findings.size)
1388+
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
1389+
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
1390+
}
1391+
1392+
fun `test bucket-level monitor with findings enabled on composite agg`() {
13261393
val testIndex = createTestIndex()
13271394
insertSampleTimeSerializedData(
13281395
testIndex,

0 commit comments

Comments
 (0)