@@ -8,6 +8,7 @@ package org.opensearch.alerting
8
8
import org.apache.logging.log4j.LogManager
9
9
import org.opensearch.ExceptionsHelper
10
10
import org.opensearch.OpenSearchStatusException
11
+ import org.opensearch.action.ActionListener
11
12
import org.opensearch.action.index.IndexRequest
12
13
import org.opensearch.action.index.IndexResponse
13
14
import org.opensearch.action.search.SearchAction
@@ -26,12 +27,16 @@ import org.opensearch.alerting.util.IndexUtils
26
27
import org.opensearch.alerting.util.defaultToPerExecutionAction
27
28
import org.opensearch.alerting.util.getActionExecutionPolicy
28
29
import org.opensearch.client.Client
30
+ import org.opensearch.client.node.NodeClient
29
31
import org.opensearch.cluster.metadata.IndexMetadata
30
32
import org.opensearch.cluster.routing.ShardRouting
31
33
import org.opensearch.cluster.service.ClusterService
32
34
import org.opensearch.common.bytes.BytesReference
33
35
import org.opensearch.common.xcontent.XContentFactory
34
36
import org.opensearch.common.xcontent.XContentType
37
+ import org.opensearch.commons.alerting.AlertingPluginInterface
38
+ import org.opensearch.commons.alerting.action.PublishFindingsRequest
39
+ import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
35
40
import org.opensearch.commons.alerting.model.ActionExecutionResult
36
41
import org.opensearch.commons.alerting.model.Alert
37
42
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
@@ -342,6 +347,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
342
347
val finding = Finding (
343
348
id = UUID .randomUUID().toString(),
344
349
relatedDocIds = listOf (docIndex[0 ]),
350
+ correlatedDocIds = listOf (docIndex[0 ]),
345
351
monitorId = monitor.id,
346
352
monitorName = monitor.name,
347
353
index = docIndex[1 ],
@@ -363,9 +369,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
363
369
monitorCtx.client!! .index(indexRequest, it)
364
370
}
365
371
}
372
+
373
+ try {
374
+ publishFinding(monitor, monitorCtx, finding)
375
+ } catch (e: Exception ) {
376
+ // suppress exception
377
+ logger.error(" Optional finding callback failed" , e)
378
+ }
366
379
return finding.id
367
380
}
368
381
382
+ private fun publishFinding (
383
+ monitor : Monitor ,
384
+ monitorCtx : MonitorRunnerExecutionContext ,
385
+ finding : Finding
386
+ ) {
387
+ val publishFindingsRequest = PublishFindingsRequest (monitor.id, finding)
388
+ AlertingPluginInterface .publishFinding(
389
+ monitorCtx.client!! as NodeClient ,
390
+ publishFindingsRequest,
391
+ object : ActionListener <SubscribeFindingsResponse > {
392
+ override fun onResponse (response : SubscribeFindingsResponse ) {}
393
+
394
+ override fun onFailure (e : Exception ) {}
395
+ }
396
+ )
397
+ }
398
+
369
399
private suspend fun updateLastRunContext (
370
400
lastRunContext : Map <String , Any >,
371
401
monitorCtx : MonitorRunnerExecutionContext ,
0 commit comments