Skip to content

Commit ae32748

Browse files
committed
Addressing review comments
Signed-off-by: Megha Goyal <[email protected]>
1 parent 03b86ae commit ae32748

File tree

3 files changed

+20
-23
lines changed

3 files changed

+20
-23
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ import org.apache.logging.log4j.LogManager
99
import org.opensearch.ExceptionsHelper
1010
import org.opensearch.OpenSearchStatusException
1111
import org.opensearch.action.DocWriteRequest
12+
import org.opensearch.action.admin.indices.refresh.RefreshAction
13+
import org.opensearch.action.admin.indices.refresh.RefreshRequest
1214
import org.opensearch.action.bulk.BulkRequest
1315
import org.opensearch.action.bulk.BulkResponse
1416
import org.opensearch.action.index.IndexRequest
1517
import org.opensearch.action.search.SearchAction
1618
import org.opensearch.action.search.SearchRequest
1719
import org.opensearch.action.search.SearchResponse
18-
import org.opensearch.action.support.WriteRequest
1920
import org.opensearch.alerting.model.DocumentExecutionContext
2021
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
2122
import org.opensearch.alerting.model.InputRunResults
@@ -395,7 +396,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
395396

396397
val actionCtx = triggerCtx.copy(
397398
triggeredDocs = triggerResult.triggeredDocs,
398-
// confirm if this is right or only trigger-able findings should be present in this list
399399
relatedFindings = findingToDocPairs.map { it.first },
400400
error = monitorResult.error ?: triggerResult.error
401401
)
@@ -477,10 +477,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
477477
val findingDocPairs = mutableListOf<Pair<String, String>>()
478478
val findings = mutableListOf<Finding>()
479479
val indexRequests = mutableListOf<IndexRequest>()
480-
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
481-
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) {
482-
monitorCtx.findingsIndexBatchSize = it
483-
}
484480

485481
docsToQueries.forEach {
486482
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
@@ -507,22 +503,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
507503
.string()
508504
logger.debug("Findings: $findingStr")
509505

510-
if (indexRequests.size > monitorCtx.findingsIndexBatchSize) {
511-
// make bulk indexing call here and flush the indexRequest object
512-
bulkIndexFindings(monitor, monitorCtx, indexRequests)
513-
indexRequests.clear()
514-
} else {
515-
if (shouldCreateFinding) {
516-
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
517-
.source(findingStr, XContentType.JSON)
518-
.id(finding.id)
519-
.routing(finding.id)
520-
.opType(DocWriteRequest.OpType.INDEX)
521-
}
506+
if (shouldCreateFinding) {
507+
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
508+
.source(findingStr, XContentType.JSON)
509+
.id(finding.id)
510+
.opType(DocWriteRequest.OpType.CREATE)
522511
}
523512
}
524513

525-
if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) {
514+
if (indexRequests.isNotEmpty()) {
526515
bulkIndexFindings(monitor, monitorCtx, indexRequests)
527516
}
528517

@@ -542,9 +531,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
542531
monitorCtx: MonitorRunnerExecutionContext,
543532
indexRequests: List<IndexRequest>
544533
) {
545-
if (indexRequests.isNotEmpty()) {
534+
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
535+
536+
indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
546537
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
547-
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
538+
bulk(BulkRequest().add(indexRequests), it)
548539
}
549540
if (bulkResponse.hasFailures()) {
550541
bulkResponse.items.forEach { item ->
@@ -556,6 +547,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
556547
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
557548
}
558549
}
550+
monitorCtx.client!!.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex))
559551
}
560552

561553
private fun publishFinding(

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.opensearch.alerting.model.WorkflowRunResult
2222
import org.opensearch.alerting.model.destination.DestinationContextFactory
2323
import org.opensearch.alerting.opensearchapi.retry
2424
import org.opensearch.alerting.script.TriggerExecutionContext
25+
import org.opensearch.alerting.settings.AlertingSettings
2526
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
2627
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
2728
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
@@ -169,6 +170,10 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
169170

170171
monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings)
171172

173+
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) {
174+
monitorCtx.findingsIndexBatchSize = it
175+
}
176+
172177
return this
173178
}
174179

alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class AlertingSettings {
1717

1818
companion object {
1919
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
20-
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000
20+
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000
2121

2222
val ALERTING_MAX_MONITORS = Setting.intSetting(
2323
"plugins.alerting.monitor.max_monitors",
@@ -158,7 +158,7 @@ class AlertingSettings {
158158
val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting(
159159
"plugins.alerting.alert_findings_indexing_batch_size",
160160
DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
161-
0,
161+
1,
162162
Setting.Property.NodeScope, Setting.Property.Dynamic
163163
)
164164
}

0 commit comments

Comments
 (0)