@@ -9,12 +9,15 @@ import org.apache.logging.log4j.LogManager
9
9
import org.opensearch.ExceptionsHelper
10
10
import org.opensearch.OpenSearchStatusException
11
11
import org.opensearch.action.ActionListener
12
+ import org.opensearch.action.DocWriteRequest
13
+ import org.opensearch.action.admin.indices.refresh.RefreshAction
14
+ import org.opensearch.action.admin.indices.refresh.RefreshRequest
15
+ import org.opensearch.action.bulk.BulkRequest
16
+ import org.opensearch.action.bulk.BulkResponse
12
17
import org.opensearch.action.index.IndexRequest
13
- import org.opensearch.action.index.IndexResponse
14
18
import org.opensearch.action.search.SearchAction
15
19
import org.opensearch.action.search.SearchRequest
16
20
import org.opensearch.action.search.SearchResponse
17
- import org.opensearch.action.support.WriteRequest
18
21
import org.opensearch.alerting.model.DocumentExecutionContext
19
22
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
20
23
import org.opensearch.alerting.model.InputRunResults
@@ -264,10 +267,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
264
267
// If there are no triggers defined, we still want to generate findings
265
268
if (monitor.triggers.isEmpty()) {
266
269
if (dryrun == false && monitor.id != Monitor .NO_ID ) {
267
- docsToQueries.forEach {
268
- val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
269
- createFindings(monitor, monitorCtx, triggeredQueries, it.key, true )
270
- }
270
+ createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true )
271
271
}
272
272
} else {
273
273
monitor.triggers.forEach {
@@ -349,35 +349,39 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
349
349
trigger : DocumentLevelTrigger ,
350
350
monitor : Monitor ,
351
351
idQueryMap : Map <String , DocLevelQuery >,
352
- docsToQueries : Map <String , List <String >>,
352
+ docsToQueries : MutableMap <String , MutableList <String >>,
353
353
queryToDocIds : Map <DocLevelQuery , Set <String >>,
354
354
dryrun : Boolean
355
355
): DocumentLevelTriggerRunResult {
356
356
val triggerCtx = DocumentLevelTriggerExecutionContext (monitor, trigger)
357
357
val triggerResult = monitorCtx.triggerService!! .runDocLevelTrigger(monitor, trigger, queryToDocIds)
358
358
359
- val findings = mutableListOf<String >()
360
- val findingDocPairs = mutableListOf<Pair <String , String >>()
359
+ val triggerFindingDocPairs = mutableListOf<Pair <String , String >>()
361
360
362
361
// TODO: Implement throttling for findings
363
- docsToQueries.forEach {
364
- val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
365
- val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key, ! dryrun && monitor.id != Monitor .NO_ID )
366
- findings.add(findingId)
362
+ val findingToDocPairs = createFindings(
363
+ monitor,
364
+ monitorCtx,
365
+ docsToQueries,
366
+ idQueryMap,
367
+ ! dryrun && monitor.id != Monitor .NO_ID ,
368
+ )
367
369
368
- if (triggerResult.triggeredDocs.contains(it.key)) {
369
- findingDocPairs.add(Pair (findingId, it.key))
370
+ findingToDocPairs.forEach {
371
+ // Only pick those entries whose docs have triggers associated with them
372
+ if (triggerResult.triggeredDocs.contains(it.second)) {
373
+ triggerFindingDocPairs.add(Pair (it.first, it.second))
370
374
}
371
375
}
372
376
373
377
val actionCtx = triggerCtx.copy(
374
378
triggeredDocs = triggerResult.triggeredDocs,
375
- relatedFindings = findings ,
379
+ relatedFindings = findingToDocPairs.map { it.first } ,
376
380
error = monitorResult.error ? : triggerResult.error
377
381
)
378
382
379
383
val alerts = mutableListOf<Alert >()
380
- findingDocPairs .forEach {
384
+ triggerFindingDocPairs .forEach {
381
385
val alert = monitorCtx.alertService!! .composeDocLevelAlert(
382
386
listOf (it.first),
383
387
listOf (it.second),
@@ -427,49 +431,90 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
427
431
return triggerResult
428
432
}
429
433
434
+ /* *
435
+ * 1. Bulk index all findings based on shouldCreateFinding flag
436
+ * 2. invoke publishFinding() to kickstart auto-correlations
437
+ * 3. Returns a list of pairs for finding id to doc id
438
+ */
430
439
private suspend fun createFindings (
431
440
monitor : Monitor ,
432
441
monitorCtx : MonitorRunnerExecutionContext ,
433
- docLevelQueries : List <DocLevelQuery >,
434
- matchingDocId : String ,
435
- shouldCreateFinding : Boolean
436
- ): String {
437
- // Before the "|" is the doc id and after the "|" is the index
438
- val docIndex = matchingDocId.split(" |" )
442
+ docsToQueries : MutableMap <String , MutableList <String >>,
443
+ idQueryMap : Map <String , DocLevelQuery >,
444
+ shouldCreateFinding : Boolean ,
445
+ ): List <Pair <String , String >> {
439
446
440
- val finding = Finding (
441
- id = UUID .randomUUID().toString(),
442
- relatedDocIds = listOf (docIndex[0 ]),
443
- correlatedDocIds = listOf (docIndex[0 ]),
444
- monitorId = monitor.id,
445
- monitorName = monitor.name,
446
- index = docIndex[1 ],
447
- docLevelQueries = docLevelQueries,
448
- timestamp = Instant .now()
449
- )
447
+ val findingDocPairs = mutableListOf<Pair <String , String >>()
448
+ val findings = mutableListOf<Finding >()
449
+ val indexRequests = mutableListOf<IndexRequest >()
450
450
451
- val findingStr = finding.toXContent( XContentBuilder .builder( XContentType . JSON .xContent()), ToXContent . EMPTY_PARAMS ).string()
452
- logger.debug( " Findings: $findingStr " )
451
+ docsToQueries.forEach {
452
+ val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId] !! }
453
453
454
- if (shouldCreateFinding) {
455
- val indexRequest = IndexRequest (monitor.dataSources.findingsIndex)
456
- .setRefreshPolicy(WriteRequest .RefreshPolicy .IMMEDIATE )
457
- .source(findingStr, XContentType .JSON )
458
- .id(finding.id)
459
- .routing(finding.id)
454
+ // Before the "|" is the doc id and after the "|" is the index
455
+ val docIndex = it.key.split(" |" )
460
456
461
- monitorCtx.client!! .suspendUntil<Client , IndexResponse > {
462
- monitorCtx.client!! .index(indexRequest, it)
457
+ val finding = Finding (
458
+ id = UUID .randomUUID().toString(),
459
+ relatedDocIds = listOf (docIndex[0 ]),
460
+ correlatedDocIds = listOf (docIndex[0 ]),
461
+ monitorId = monitor.id,
462
+ monitorName = monitor.name,
463
+ index = docIndex[1 ],
464
+ docLevelQueries = triggeredQueries,
465
+ timestamp = Instant .now(),
466
+ )
467
+ findingDocPairs.add(Pair (finding.id, it.key))
468
+ findings.add(finding)
469
+
470
+ val findingStr =
471
+ finding.toXContent(XContentBuilder .builder(XContentType .JSON .xContent()), ToXContent .EMPTY_PARAMS )
472
+ .string()
473
+ logger.debug(" Findings: $findingStr " )
474
+
475
+ if (shouldCreateFinding) {
476
+ indexRequests + = IndexRequest (monitor.dataSources.findingsIndex)
477
+ .source(findingStr, XContentType .JSON )
478
+ .id(finding.id)
479
+ .opType(DocWriteRequest .OpType .CREATE )
463
480
}
464
481
}
465
482
483
+ if (indexRequests.isNotEmpty()) {
484
+ bulkIndexFindings(monitor, monitorCtx, indexRequests)
485
+ }
486
+
466
487
try {
467
- publishFinding(monitor, monitorCtx, finding)
488
+ findings.forEach { finding ->
489
+ publishFinding(monitor, monitorCtx, finding)
490
+ }
468
491
} catch (e: Exception ) {
469
492
// suppress exception
470
493
logger.error(" Optional finding callback failed" , e)
471
494
}
472
- return finding.id
495
+ return findingDocPairs
496
+ }
497
+
498
+ private suspend fun bulkIndexFindings (
499
+ monitor : Monitor ,
500
+ monitorCtx : MonitorRunnerExecutionContext ,
501
+ indexRequests : List <IndexRequest >
502
+ ) {
503
+ indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
504
+ val bulkResponse: BulkResponse = monitorCtx.client!! .suspendUntil {
505
+ bulk(BulkRequest ().add(batch), it)
506
+ }
507
+ if (bulkResponse.hasFailures()) {
508
+ bulkResponse.items.forEach { item ->
509
+ if (item.isFailed) {
510
+ logger.error(" Failed indexing the finding ${item.id} of monitor [${monitor.id} ]" )
511
+ }
512
+ }
513
+ } else {
514
+ logger.debug(" [${bulkResponse.items.size} ] All findings successfully indexed." )
515
+ }
516
+ }
517
+ monitorCtx.client!! .execute(RefreshAction .INSTANCE , RefreshRequest (monitor.dataSources.findingsIndex))
473
518
}
474
519
475
520
private fun publishFinding (
@@ -592,7 +637,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
592
637
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
593
638
}
594
639
} catch (e: Exception ) {
595
- logger.warn (" Failed to run for shard $shard . Error: ${e.message} " )
640
+ logger.error (" Failed to run for shard $shard . Error: ${e.message} " )
596
641
}
597
642
}
598
643
return matchingDocs
0 commit comments