Skip to content

Commit f6a6375

Browse files
committed
address comments
Signed-off-by: Subhobrata Dey <[email protected]>
1 parent 3495cf8 commit f6a6375

File tree

5 files changed

+26
-17
lines changed

5 files changed

+26
-17
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
138138
}
139139

140140
// Map of document ids per index when monitor is workflow delegate and has chained findings
141-
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex?.first
142-
val findingIdsForMatchingDocIds = workflowRunContext?.matchingDocIdsPerIndex?.second
141+
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex
142+
val findingIdsForMatchingDocIds = if (workflowRunContext?.findingIds != null) {
143+
workflowRunContext.findingIds
144+
} else {
145+
listOf()
146+
}
143147

144148
val concreteIndicesSeenSoFar = mutableListOf<String>()
145149
val updatedIndexNames = mutableListOf<String>()

alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class InputService(
9090
periodStart = periodStart,
9191
periodEnd = periodEnd,
9292
prevResult = prevResult,
93-
matchingDocIdsPerIndex = matchingDocIdsPerIndex?.first,
93+
matchingDocIdsPerIndex = matchingDocIdsPerIndex,
9494
returnSampleDocs = false
9595
)
9696
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ class TransportDocLevelMonitorFanOutAction
301301
* if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger
302302
* generates a single alert with multiple findings.
303303
*/
304-
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
304+
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
305305
monitor.triggers.forEach {
306306
triggerResults[it.id] = runForEachDocTrigger(
307307
monitorResult,
@@ -316,9 +316,9 @@ class TransportDocLevelMonitorFanOutAction
316316
workflowRunContext = workflowRunContext
317317
)
318318
}
319-
} else if (monitor.shouldPersistFindingsAndAlerts == true) {
319+
} else if (monitor.shouldCreateSingleAlertForFindings == true) {
320320
monitor.triggers.forEach {
321-
triggerResults[it.id] = runForEachDocTriggerWithoutPersistFindingsAndAlerts(
321+
triggerResults[it.id] = runForEachDocTriggerCreateSingleGroupedAlert(
322322
monitorResult,
323323
it as DocumentLevelTrigger,
324324
monitor,
@@ -370,7 +370,7 @@ class TransportDocLevelMonitorFanOutAction
370370
/**
371371
* run doc-level triggers ignoring findings and alerts and generating a single alert.
372372
*/
373-
private suspend fun runForEachDocTriggerWithoutPersistFindingsAndAlerts(
373+
private suspend fun runForEachDocTriggerCreateSingleGroupedAlert(
374374
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
375375
trigger: DocumentLevelTrigger,
376376
monitor: Monitor,
@@ -381,14 +381,14 @@ class TransportDocLevelMonitorFanOutAction
381381
): DocumentLevelTriggerRunResult {
382382
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds)
383383
if (triggerResult.triggeredDocs.isNotEmpty()) {
384-
val findingIds = if (workflowRunContext?.matchingDocIdsPerIndex?.second != null) {
385-
workflowRunContext.matchingDocIdsPerIndex.second
384+
val findingIds = if (workflowRunContext?.findingIds != null) {
385+
workflowRunContext.findingIds
386386
} else {
387387
listOf()
388388
}
389389
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
390390
val alert = alertService.composeDocLevelAlert(
391-
findingIds,
391+
findingIds!!,
392392
triggerResult.triggeredDocs,
393393
triggerCtx,
394394
monitorResult.alertError() ?: triggerResult.alertError(),
@@ -582,7 +582,11 @@ class TransportDocLevelMonitorFanOutAction
582582
.string()
583583
log.debug("Findings: $findingStr")
584584

585-
if (shouldCreateFinding and (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false)) {
585+
if (shouldCreateFinding and (
586+
monitor.shouldCreateSingleAlertForFindings == null ||
587+
monitor.shouldCreateSingleAlertForFindings == false
588+
)
589+
) {
586590
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
587591
.source(findingStr, XContentType.JSON)
588592
.id(finding.id)
@@ -594,7 +598,7 @@ class TransportDocLevelMonitorFanOutAction
594598
bulkIndexFindings(monitor, indexRequests)
595599
}
596600

597-
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
601+
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
598602
try {
599603
findings.forEach { finding ->
600604
publishFinding(monitor, finding)
@@ -957,11 +961,11 @@ class TransportDocLevelMonitorFanOutAction
957961
val boolQueryBuilder = BoolQueryBuilder()
958962
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))
959963

960-
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
964+
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
961965
if (!docIds.isNullOrEmpty()) {
962966
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
963967
}
964-
} else if (monitor.shouldPersistFindingsAndAlerts == true) {
968+
} else if (monitor.shouldCreateSingleAlertForFindings == true) {
965969
val docIdsParam = mutableListOf<String>()
966970
if (docIds != null) {
967971
docIdsParam.addAll(docIds)

alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,10 @@ object CompositeWorkflowRunner : WorkflowRunner() {
131131
workflowId = workflowMetadata.workflowId,
132132
workflowMetadataId = workflowMetadata.id,
133133
chainedMonitorId = delegate.chainedMonitorFindings?.monitorId,
134-
matchingDocIdsPerIndex = indexToDocIdsWithFindings!!,
134+
matchingDocIdsPerIndex = indexToDocIdsWithFindings!!.first,
135135
auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true
136-
else workflow.auditDelegateMonitorAlerts!!
136+
else workflow.auditDelegateMonitorAlerts!!,
137+
findingIds = indexToDocIdsWithFindings.second
137138
)
138139
try {
139140
dataSources = delegateMonitor.dataSources

alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ fun randomDocumentLevelMonitor(
223223
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
224224
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
225225
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources,
226-
ignoreFindingsAndAlerts = ignoreFindingsAndAlerts, owner = owner
226+
shouldCreateSingleAlertForFindings = ignoreFindingsAndAlerts, owner = owner
227227
)
228228
}
229229

0 commit comments

Comments
 (0)