@@ -8,12 +8,15 @@ package org.opensearch.alerting
8
8
import org.apache.logging.log4j.LogManager
9
9
import org.opensearch.ExceptionsHelper
10
10
import org.opensearch.OpenSearchStatusException
11
+ import org.opensearch.action.DocWriteRequest
12
+ import org.opensearch.action.admin.indices.refresh.RefreshAction
13
+ import org.opensearch.action.admin.indices.refresh.RefreshRequest
14
+ import org.opensearch.action.bulk.BulkRequest
15
+ import org.opensearch.action.bulk.BulkResponse
11
16
import org.opensearch.action.index.IndexRequest
12
- import org.opensearch.action.index.IndexResponse
13
17
import org.opensearch.action.search.SearchAction
14
18
import org.opensearch.action.search.SearchRequest
15
19
import org.opensearch.action.search.SearchResponse
16
- import org.opensearch.action.support.WriteRequest
17
20
import org.opensearch.alerting.model.DocumentExecutionContext
18
21
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
19
22
import org.opensearch.alerting.model.InputRunResults
@@ -248,10 +251,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
248
251
// If there are no triggers defined, we still want to generate findings
249
252
if (monitor.triggers.isEmpty()) {
250
253
if (dryrun == false && monitor.id != Monitor .NO_ID ) {
251
- docsToQueries.forEach {
252
- val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
253
- createFindings(monitor, monitorCtx, triggeredQueries, it.key, true )
254
- }
254
+ createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true )
255
255
}
256
256
} else {
257
257
monitor.triggers.forEach {
@@ -340,7 +340,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
340
340
trigger : DocumentLevelTrigger ,
341
341
monitor : Monitor ,
342
342
idQueryMap : Map <String , DocLevelQuery >,
343
- docsToQueries : Map <String , List <String >>,
343
+ docsToQueries : MutableMap <String , MutableList <String >>,
344
344
queryToDocIds : Map <DocLevelQuery , Set <String >>,
345
345
dryrun : Boolean ,
346
346
workflowRunContext : WorkflowRunContext ? ,
@@ -349,35 +349,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
349
349
val triggerCtx = DocumentLevelTriggerExecutionContext (monitor, trigger)
350
350
val triggerResult = monitorCtx.triggerService!! .runDocLevelTrigger(monitor, trigger, queryToDocIds)
351
351
352
- val findings = mutableListOf<String >()
353
- val findingDocPairs = mutableListOf<Pair <String , String >>()
352
+ val triggerFindingDocPairs = mutableListOf<Pair <String , String >>()
354
353
355
354
// TODO: Implement throttling for findings
356
- docsToQueries.forEach {
357
- val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
358
- val findingId = createFindings(
359
- monitor,
360
- monitorCtx,
361
- triggeredQueries,
362
- it.key,
363
- ! dryrun && monitor.id != Monitor .NO_ID ,
364
- executionId
365
- )
366
- findings.add(findingId)
355
+ val findingToDocPairs = createFindings(
356
+ monitor,
357
+ monitorCtx,
358
+ docsToQueries,
359
+ idQueryMap,
360
+ ! dryrun && monitor.id != Monitor .NO_ID ,
361
+ executionId
362
+ )
367
363
368
- if (triggerResult.triggeredDocs.contains(it.key)) {
369
- findingDocPairs.add(Pair (findingId, it.key))
364
+ findingToDocPairs.forEach {
365
+ // Only pick those entries whose docs have triggers associated with them
366
+ if (triggerResult.triggeredDocs.contains(it.second)) {
367
+ triggerFindingDocPairs.add(Pair (it.first, it.second))
370
368
}
371
369
}
372
370
373
371
val actionCtx = triggerCtx.copy(
374
372
triggeredDocs = triggerResult.triggeredDocs,
375
- relatedFindings = findings ,
373
+ relatedFindings = findingToDocPairs.map { it.first } ,
376
374
error = monitorResult.error ? : triggerResult.error
377
375
)
378
376
379
377
val alerts = mutableListOf<Alert >()
380
- findingDocPairs .forEach {
378
+ triggerFindingDocPairs .forEach {
381
379
val alert = monitorCtx.alertService!! .composeDocLevelAlert(
382
380
listOf (it.first),
383
381
listOf (it.second),
@@ -436,51 +434,92 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
436
434
return triggerResult
437
435
}
438
436
437
+ /* *
438
+ * 1. Bulk index all findings based on shouldCreateFinding flag
439
+ * 2. invoke publishFinding() to kickstart auto-correlations
440
+ * 3. Returns a list of pairs for finding id to doc id
441
+ */
439
442
private suspend fun createFindings (
440
443
monitor : Monitor ,
441
444
monitorCtx : MonitorRunnerExecutionContext ,
442
- docLevelQueries : List < DocLevelQuery >,
443
- matchingDocId : String ,
445
+ docsToQueries : MutableMap < String , MutableList < String > >,
446
+ idQueryMap : Map < String , DocLevelQuery > ,
444
447
shouldCreateFinding : Boolean ,
445
448
workflowExecutionId : String? = null,
446
- ): String {
447
- // Before the "|" is the doc id and after the "|" is the index
448
- val docIndex = matchingDocId.split(" |" )
449
+ ): List <Pair <String , String >> {
449
450
450
- val finding = Finding (
451
- id = UUID .randomUUID().toString(),
452
- relatedDocIds = listOf (docIndex[0 ]),
453
- correlatedDocIds = listOf (docIndex[0 ]),
454
- monitorId = monitor.id,
455
- monitorName = monitor.name,
456
- index = docIndex[1 ],
457
- docLevelQueries = docLevelQueries,
458
- timestamp = Instant .now(),
459
- executionId = workflowExecutionId
460
- )
451
+ val findingDocPairs = mutableListOf<Pair <String , String >>()
452
+ val findings = mutableListOf<Finding >()
453
+ val indexRequests = mutableListOf<IndexRequest >()
461
454
462
- val findingStr = finding.toXContent( XContentBuilder .builder( XContentType . JSON .xContent()), ToXContent . EMPTY_PARAMS ).string()
463
- logger.debug( " Findings: $findingStr " )
455
+ docsToQueries.forEach {
456
+ val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId] !! }
464
457
465
- if (shouldCreateFinding) {
466
- val indexRequest = IndexRequest (monitor.dataSources.findingsIndex)
467
- .setRefreshPolicy(WriteRequest .RefreshPolicy .IMMEDIATE )
468
- .source(findingStr, XContentType .JSON )
469
- .id(finding.id)
470
- .routing(finding.id)
458
+ // Before the "|" is the doc id and after the "|" is the index
459
+ val docIndex = it.key.split(" |" )
471
460
472
- monitorCtx.client!! .suspendUntil<Client , IndexResponse > {
473
- monitorCtx.client!! .index(indexRequest, it)
461
+ val finding = Finding (
462
+ id = UUID .randomUUID().toString(),
463
+ relatedDocIds = listOf (docIndex[0 ]),
464
+ correlatedDocIds = listOf (docIndex[0 ]),
465
+ monitorId = monitor.id,
466
+ monitorName = monitor.name,
467
+ index = docIndex[1 ],
468
+ docLevelQueries = triggeredQueries,
469
+ timestamp = Instant .now(),
470
+ executionId = workflowExecutionId
471
+ )
472
+ findingDocPairs.add(Pair (finding.id, it.key))
473
+ findings.add(finding)
474
+
475
+ val findingStr =
476
+ finding.toXContent(XContentBuilder .builder(XContentType .JSON .xContent()), ToXContent .EMPTY_PARAMS )
477
+ .string()
478
+ logger.debug(" Findings: $findingStr " )
479
+
480
+ if (shouldCreateFinding) {
481
+ indexRequests + = IndexRequest (monitor.dataSources.findingsIndex)
482
+ .source(findingStr, XContentType .JSON )
483
+ .id(finding.id)
484
+ .opType(DocWriteRequest .OpType .CREATE )
474
485
}
475
486
}
476
487
488
+ if (indexRequests.isNotEmpty()) {
489
+ bulkIndexFindings(monitor, monitorCtx, indexRequests)
490
+ }
491
+
477
492
try {
478
- publishFinding(monitor, monitorCtx, finding)
493
+ findings.forEach { finding ->
494
+ publishFinding(monitor, monitorCtx, finding)
495
+ }
479
496
} catch (e: Exception ) {
480
497
// suppress exception
481
498
logger.error(" Optional finding callback failed" , e)
482
499
}
483
- return finding.id
500
+ return findingDocPairs
501
+ }
502
+
503
+ private suspend fun bulkIndexFindings (
504
+ monitor : Monitor ,
505
+ monitorCtx : MonitorRunnerExecutionContext ,
506
+ indexRequests : List <IndexRequest >
507
+ ) {
508
+ indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
509
+ val bulkResponse: BulkResponse = monitorCtx.client!! .suspendUntil {
510
+ bulk(BulkRequest ().add(batch), it)
511
+ }
512
+ if (bulkResponse.hasFailures()) {
513
+ bulkResponse.items.forEach { item ->
514
+ if (item.isFailed) {
515
+ logger.error(" Failed indexing the finding ${item.id} of monitor [${monitor.id} ]" )
516
+ }
517
+ }
518
+ } else {
519
+ logger.debug(" [${bulkResponse.items.size} ] All findings successfully indexed." )
520
+ }
521
+ }
522
+ monitorCtx.client!! .execute(RefreshAction .INSTANCE , RefreshRequest (monitor.dataSources.findingsIndex))
484
523
}
485
524
486
525
private fun publishFinding (
@@ -605,7 +644,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
605
644
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
606
645
}
607
646
} catch (e: Exception ) {
608
- logger.warn (" Failed to run for shard $shard . Error: ${e.message} " )
647
+ logger.error (" Failed to run for shard $shard . Error: ${e.message} " )
609
648
}
610
649
}
611
650
return matchingDocs
0 commit comments