@@ -13,10 +13,12 @@ import org.opensearch.action.search.SearchRequest
13
13
import org.opensearch.action.search.SearchResponse
14
14
import org.opensearch.action.support.WriteRequest
15
15
import org.opensearch.alerting.model.ActionRunResult
16
+ import org.opensearch.alerting.model.AlertContext
16
17
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
17
18
import org.opensearch.alerting.model.InputRunResults
18
19
import org.opensearch.alerting.model.MonitorRunResult
19
20
import org.opensearch.alerting.opensearchapi.InjectorContextElement
21
+ import org.opensearch.alerting.opensearchapi.convertToMap
20
22
import org.opensearch.alerting.opensearchapi.retry
21
23
import org.opensearch.alerting.opensearchapi.suspendUntil
22
24
import org.opensearch.alerting.opensearchapi.withClosableContext
@@ -25,7 +27,9 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction
25
27
import org.opensearch.alerting.util.getActionExecutionPolicy
26
28
import org.opensearch.alerting.util.getBucketKeysHash
27
29
import org.opensearch.alerting.util.getCombinedTriggerRunResult
30
+ import org.opensearch.alerting.util.printsSampleDocData
28
31
import org.opensearch.alerting.workflow.WorkflowRunContext
32
+ import org.opensearch.client.Client
29
33
import org.opensearch.common.xcontent.LoggingDeprecationHandler
30
34
import org.opensearch.common.xcontent.XContentType
31
35
import org.opensearch.commons.alerting.model.Alert
@@ -221,6 +225,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
221
225
}
222
226
}
223
227
228
+ // The alertSampleDocs map structure is Map<TriggerId, Map<BucketKeysHash, List<Alert>>>
229
+ val alertSampleDocs = mutableMapOf<String , Map <String , List <Map <String , Any >>>>()
224
230
for (trigger in monitor.triggers) {
225
231
val alertsToUpdate = mutableSetOf<Alert >()
226
232
val completedAlertsToUpdate = mutableSetOf<Alert >()
@@ -231,6 +237,32 @@ object BucketLevelMonitorRunner : MonitorRunner() {
231
237
? : mutableListOf ()
232
238
// Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution
233
239
nextAlerts[trigger.id]?.set(AlertCategory .DEDUPED , dedupedAlerts)
240
+
241
+ // Only collect sample docs for triggered triggers, and only when at least 1 action prints sample doc data.
242
+ val isTriggered = ! nextAlerts[trigger.id]?.get(AlertCategory .NEW ).isNullOrEmpty()
243
+ if (isTriggered && printsSampleDocData(trigger)) {
244
+ try {
245
+ val searchRequest = monitorCtx.inputService!! .getSearchRequest(
246
+ monitor = monitor.copy(triggers = listOf (trigger)),
247
+ searchInput = monitor.inputs[0 ] as SearchInput ,
248
+ periodStart = periodStart,
249
+ periodEnd = periodEnd,
250
+ prevResult = monitorResult.inputResults,
251
+ matchingDocIdsPerIndex = null ,
252
+ returnSampleDocs = true
253
+ )
254
+ val sampleDocumentsByBucket = getSampleDocs(
255
+ client = monitorCtx.client!! ,
256
+ monitorId = monitor.id,
257
+ triggerId = trigger.id,
258
+ searchRequest = searchRequest
259
+ )
260
+ alertSampleDocs[trigger.id] = sampleDocumentsByBucket
261
+ } catch (e: Exception ) {
262
+ logger.error(" Error retrieving sample documents for trigger {} of monitor {}." , trigger.id, monitor.id, e)
263
+ }
264
+ }
265
+
234
266
val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory .NEW ) ? : mutableListOf ()
235
267
val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory .COMPLETED ) ? : mutableListOf ()
236
268
@@ -256,9 +288,12 @@ object BucketLevelMonitorRunner : MonitorRunner() {
256
288
for (alertCategory in actionExecutionScope.actionableAlerts) {
257
289
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ? : mutableListOf ()
258
290
for (alert in alertsToExecuteActionsFor) {
291
+ val alertContext = if (alertCategory != AlertCategory .NEW ) AlertContext (alert = alert)
292
+ else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs)
293
+
259
294
val actionCtx = getActionContextForAlertCategory(
260
295
alertCategory,
261
- alert ,
296
+ alertContext ,
262
297
triggerCtx,
263
298
monitorOrTriggerError
264
299
)
@@ -292,7 +327,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
292
327
293
328
val actionCtx = triggerCtx.copy(
294
329
dedupedAlerts = dedupedAlerts,
295
- newAlerts = newAlerts,
330
+ newAlerts = newAlerts.map {
331
+ getAlertContext(alert = it, alertSampleDocs = alertSampleDocs)
332
+ },
296
333
completedAlerts = completedAlerts,
297
334
error = monitorResult.error ? : triggerResult.error
298
335
)
@@ -487,17 +524,93 @@ object BucketLevelMonitorRunner : MonitorRunner() {
487
524
488
525
private fun getActionContextForAlertCategory (
489
526
alertCategory : AlertCategory ,
490
- alert : Alert ,
527
+ alertContext : AlertContext ,
491
528
ctx : BucketLevelTriggerExecutionContext ,
492
529
error : Exception ?
493
530
): BucketLevelTriggerExecutionContext {
494
531
return when (alertCategory) {
495
532
AlertCategory .DEDUPED ->
496
- ctx.copy(dedupedAlerts = listOf (alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
533
+ ctx.copy(dedupedAlerts = listOf (alertContext. alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
497
534
AlertCategory .NEW ->
498
- ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf (alert ), completedAlerts = emptyList(), error = error)
535
+ ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf (alertContext ), completedAlerts = emptyList(), error = error)
499
536
AlertCategory .COMPLETED ->
500
- ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf (alert), error = error)
537
+ ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf (alertContext.alert), error = error)
538
+ }
539
+ }
540
+
541
+ private fun getAlertContext (
542
+ alert : Alert ,
543
+ alertSampleDocs : Map <String , Map <String , List <Map <String , Any >>>>
544
+ ): AlertContext {
545
+ val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash()
546
+ val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey)
547
+ return if (! bucketKey.isNullOrEmpty() && ! sampleDocs.isNullOrEmpty()) {
548
+ AlertContext (alert = alert, sampleDocs = sampleDocs)
549
+ } else {
550
+ logger.error(
551
+ " Failed to retrieve sample documents for alert {} from trigger {} of monitor {} during execution {}." ,
552
+ alert.id,
553
+ alert.triggerId,
554
+ alert.monitorId,
555
+ alert.executionId
556
+ )
557
+ AlertContext (alert = alert, sampleDocs = listOf ())
501
558
}
502
559
}
560
+
561
+ /* *
562
+ * Executes the monitor's query with the addition of 2 top_hits aggregations that are used to return the top 5,
563
+ * and bottom 5 documents for each bucket.
564
+ *
565
+ * @return Map<BucketKeysHash, List<Alert>>
566
+ */
567
+ @Suppress(" UNCHECKED_CAST" )
568
+ private suspend fun getSampleDocs (
569
+ client : Client ,
570
+ monitorId : String ,
571
+ triggerId : String ,
572
+ searchRequest : SearchRequest
573
+ ): Map <String , List <Map <String , Any >>> {
574
+ val sampleDocumentsByBucket = mutableMapOf<String , List <Map <String , Any >>>()
575
+ val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
576
+ val aggs = searchResponse.convertToMap().getOrDefault(" aggregations" , mapOf<String , Any >()) as Map <String , Any >
577
+ val compositeAgg = aggs.getOrDefault(" composite_agg" , mapOf<String , Any >()) as Map <String , Any >
578
+ val buckets = compositeAgg.getOrDefault(" buckets" , emptyList<Map <String , Any >>()) as List <Map <String , Any >>
579
+
580
+ buckets.forEach { bucket ->
581
+ val bucketKey = getBucketKeysHash((bucket.getOrDefault(" key" , mapOf<String , String >()) as Map <String , String >).values.toList())
582
+ if (bucketKey.isEmpty()) throw IllegalStateException (" Cannot format bucket keys." )
583
+
584
+ val unwrappedTopHits = (bucket.getOrDefault(" top_hits" , mapOf<String , Any >()) as Map <String , Any >)
585
+ .getOrDefault(" hits" , mapOf<String , Any >()) as Map <String , Any >
586
+ val topHits = unwrappedTopHits.getOrDefault(" hits" , listOf<Map <String , Any >>()) as List <Map <String , Any >>
587
+
588
+ val unwrappedLowHits = (bucket.getOrDefault(" low_hits" , mapOf<String , Any >()) as Map <String , Any >)
589
+ .getOrDefault(" hits" , mapOf<String , Any >()) as Map <String , Any >
590
+ val lowHits = unwrappedLowHits.getOrDefault(" hits" , listOf<Map <String , Any >>()) as List <Map <String , Any >>
591
+
592
+ // Reversing the order of lowHits so allHits will be in descending order.
593
+ val allHits = topHits + lowHits.reversed()
594
+
595
+ if (allHits.isEmpty()) {
596
+ // We expect sample documents to be available for each bucket.
597
+ logger.error(" Sample documents not found for trigger {} of monitor {}." , triggerId, monitorId)
598
+ }
599
+
600
+ // Removing duplicate hits. The top_hits, and low_hits results return a max of 5 docs each.
601
+ // The same document could be present in both hit lists if there are fewer than 10 documents in the bucket of data.
602
+ val uniqueHitIds = mutableSetOf<String >()
603
+ val dedupedHits = mutableListOf<Map <String , Any >>()
604
+ allHits.forEach { hit ->
605
+ val hitId = hit[" _id" ] as String
606
+ if (! uniqueHitIds.contains(hitId)) {
607
+ uniqueHitIds.add(hitId)
608
+ dedupedHits.add(hit)
609
+ }
610
+ }
611
+ sampleDocumentsByBucket[bucketKey] = dedupedHits
612
+ }
613
+
614
+ return sampleDocumentsByBucket
615
+ }
503
616
}
0 commit comments