Skip to content

Commit e7ba6d7

Browse files
committed
Bulk index findings and sequentially invoke auto-correlations
Signed-off-by: Megha Goyal <[email protected]>
1 parent 526433a commit e7ba6d7

File tree

1 file changed

+80
-51
lines changed

1 file changed

+80
-51
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 80 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ package org.opensearch.alerting
88
import org.apache.logging.log4j.LogManager
99
import org.opensearch.ExceptionsHelper
1010
import org.opensearch.OpenSearchStatusException
11+
import org.opensearch.action.DocWriteRequest
12+
import org.opensearch.action.bulk.BulkRequest
13+
import org.opensearch.action.bulk.BulkResponse
1114
import org.opensearch.action.index.IndexRequest
12-
import org.opensearch.action.index.IndexResponse
1315
import org.opensearch.action.search.SearchAction
1416
import org.opensearch.action.search.SearchRequest
1517
import org.opensearch.action.search.SearchResponse
@@ -273,10 +275,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
273275
// If there are no triggers defined, we still want to generate findings
274276
if (monitor.triggers.isEmpty()) {
275277
if (dryrun == false && monitor.id != Monitor.NO_ID) {
276-
docsToQueries.forEach {
277-
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
278-
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
279-
}
278+
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true)
280279
}
281280
} else {
282281
monitor.triggers.forEach {
@@ -365,7 +364,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
365364
trigger: DocumentLevelTrigger,
366365
monitor: Monitor,
367366
idQueryMap: Map<String, DocLevelQuery>,
368-
docsToQueries: Map<String, List<String>>,
367+
docsToQueries: MutableMap<String, MutableList<String>>,
369368
queryToDocIds: Map<DocLevelQuery, Set<String>>,
370369
dryrun: Boolean,
371370
workflowRunContext: WorkflowRunContext?,
@@ -374,35 +373,34 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
374373
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
375374
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)
376375

377-
val findings = mutableListOf<String>()
378-
val findingDocPairs = mutableListOf<Pair<String, String>>()
376+
val triggerFindingDocPairs = mutableListOf<Pair<String, String>>()
379377

380378
// TODO: Implement throttling for findings
381-
docsToQueries.forEach {
382-
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
383-
val findingId = createFindings(
384-
monitor,
385-
monitorCtx,
386-
triggeredQueries,
387-
it.key,
388-
!dryrun && monitor.id != Monitor.NO_ID,
389-
executionId
390-
)
391-
findings.add(findingId)
379+
val findingToDocPairs = createFindings(
380+
monitor,
381+
monitorCtx,
382+
docsToQueries,
383+
idQueryMap,
384+
!dryrun && monitor.id != Monitor.NO_ID,
385+
executionId
386+
)
392387

393-
if (triggerResult.triggeredDocs.contains(it.key)) {
394-
findingDocPairs.add(Pair(findingId, it.key))
388+
findingToDocPairs.forEach {
389+
// Only pick those entries whose docs have triggers associated with them
390+
if (triggerResult.triggeredDocs.contains(it.second)) {
391+
triggerFindingDocPairs.add(Pair(it.first, it.second))
395392
}
396393
}
397394

398395
val actionCtx = triggerCtx.copy(
399396
triggeredDocs = triggerResult.triggeredDocs,
400-
relatedFindings = findings,
397+
// confirm if this is right or only trigger-able findings should be present in this list
398+
relatedFindings = findingToDocPairs.map { it.first },
401399
error = monitorResult.error ?: triggerResult.error
402400
)
403401

404402
val alerts = mutableListOf<Alert>()
405-
findingDocPairs.forEach {
403+
triggerFindingDocPairs.forEach {
406404
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
407405
listOf(it.first),
408406
listOf(it.second),
@@ -461,51 +459,82 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
461459
return triggerResult
462460
}
463461

462+
/**
463+
* 1. Bulk index all findings based on shouldCreateFinding flag
464+
* 2. invoke publishFinding() to kickstart auto-correlations
465+
* 3. Returns a list of pairs for finding id to doc id
466+
*/
464467
private suspend fun createFindings(
465468
monitor: Monitor,
466469
monitorCtx: MonitorRunnerExecutionContext,
467-
docLevelQueries: List<DocLevelQuery>,
468-
matchingDocId: String,
470+
docsToQueries: MutableMap<String, MutableList<String>>,
471+
idQueryMap: Map<String, DocLevelQuery>,
469472
shouldCreateFinding: Boolean,
470473
workflowExecutionId: String? = null,
471-
): String {
472-
// Before the "|" is the doc id and after the "|" is the index
473-
val docIndex = matchingDocId.split("|")
474+
): List<Pair<String, String>> {
474475

475-
val finding = Finding(
476-
id = UUID.randomUUID().toString(),
477-
relatedDocIds = listOf(docIndex[0]),
478-
correlatedDocIds = listOf(docIndex[0]),
479-
monitorId = monitor.id,
480-
monitorName = monitor.name,
481-
index = docIndex[1],
482-
docLevelQueries = docLevelQueries,
483-
timestamp = Instant.now(),
484-
executionId = workflowExecutionId
485-
)
476+
val findingDocPairs = mutableListOf<Pair<String, String>>()
477+
val findings = mutableListOf<Finding>()
478+
val indexRequests = mutableListOf<IndexRequest>()
486479

487-
val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
488-
logger.debug("Findings: $findingStr")
480+
docsToQueries.forEach {
481+
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
482+
483+
// Before the "|" is the doc id and after the "|" is the index
484+
val docIndex = it.key.split("|")
489485

490-
if (shouldCreateFinding) {
491-
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
492-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
493-
.source(findingStr, XContentType.JSON)
494-
.id(finding.id)
495-
.routing(finding.id)
486+
val finding = Finding(
487+
id = UUID.randomUUID().toString(),
488+
relatedDocIds = listOf(docIndex[0]),
489+
correlatedDocIds = listOf(docIndex[0]),
490+
monitorId = monitor.id,
491+
monitorName = monitor.name,
492+
index = docIndex[1],
493+
docLevelQueries = triggeredQueries,
494+
timestamp = Instant.now(),
495+
executionId = workflowExecutionId
496+
)
497+
findingDocPairs.add(Pair(finding.id, it.key))
498+
findings.add(finding)
499+
500+
val findingStr =
501+
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
502+
.string()
503+
logger.debug("Findings: $findingStr")
504+
505+
if (shouldCreateFinding) {
506+
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
507+
.source(findingStr, XContentType.JSON)
508+
.id(finding.id)
509+
.routing(finding.id)
510+
.opType(DocWriteRequest.OpType.INDEX)
511+
}
512+
}
496513

497-
monitorCtx.client!!.suspendUntil<Client, IndexResponse> {
498-
monitorCtx.client!!.index(indexRequest, it)
514+
if (indexRequests.isNotEmpty()) {
515+
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
516+
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
517+
}
518+
if (bulkResponse.hasFailures()) {
519+
bulkResponse.items.forEach { item ->
520+
if (item.isFailed) {
521+
logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
522+
}
523+
}
524+
} else {
525+
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
499526
}
500527
}
501528

502529
try {
503-
publishFinding(monitor, monitorCtx, finding)
530+
findings.forEach { finding ->
531+
publishFinding(monitor, monitorCtx, finding)
532+
}
504533
} catch (e: Exception) {
505534
// suppress exception
506535
logger.error("Optional finding callback failed", e)
507536
}
508-
return finding.id
537+
return findingDocPairs
509538
}
510539

511540
private fun publishFinding(

0 commit comments

Comments
 (0)