Skip to content

Commit da330a8

Browse files
committed
Bulk index findings in batches of 10000 and make it configurable
1 parent 28530dd commit da330a8

File tree

4 files changed

+51
-20
lines changed

4 files changed

+51
-20
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
346346
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
347347
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
348348
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
349-
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
349+
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
350+
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE
350351
)
351352
}
352353

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.opensearch.alerting.model.MonitorRunResult
2424
import org.opensearch.alerting.model.userErrorMessage
2525
import org.opensearch.alerting.opensearchapi.suspendUntil
2626
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
27+
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
2728
import org.opensearch.alerting.util.AlertingException
2829
import org.opensearch.alerting.util.IndexUtils
2930
import org.opensearch.alerting.util.defaultToPerExecutionAction
@@ -476,6 +477,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
476477
val findingDocPairs = mutableListOf<Pair<String, String>>()
477478
val findings = mutableListOf<Finding>()
478479
val indexRequests = mutableListOf<IndexRequest>()
480+
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
481+
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(FINDINGS_INDEXING_BATCH_SIZE) {
482+
monitorCtx.findingsIndexBatchSize = it
483+
}
479484

480485
docsToQueries.forEach {
481486
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
@@ -502,39 +507,55 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
502507
.string()
503508
logger.debug("Findings: $findingStr")
504509

505-
if (shouldCreateFinding) {
506-
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
507-
.source(findingStr, XContentType.JSON)
508-
.id(finding.id)
509-
.routing(finding.id)
510-
.opType(DocWriteRequest.OpType.INDEX)
510+
if (indexRequests.size > monitorCtx.findingsIndexBatchSize) {
511+
// make bulk indexing call here and flush the indexRequest object
512+
bulkIndexFindings(monitor, monitorCtx, indexRequests)
513+
indexRequests.clear()
514+
} else {
515+
if (shouldCreateFinding) {
516+
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
517+
.source(findingStr, XContentType.JSON)
518+
.id(finding.id)
519+
.routing(finding.id)
520+
.opType(DocWriteRequest.OpType.INDEX)
521+
}
511522
}
512523
}
513524

525+
if (indexRequests.size <= monitorCtx.findingsIndexBatchSize) {
526+
bulkIndexFindings(monitor, monitorCtx, indexRequests)
527+
}
528+
529+
try {
530+
findings.forEach { finding ->
531+
publishFinding(monitor, monitorCtx, finding)
532+
}
533+
} catch (e: Exception) {
534+
// suppress exception
535+
logger.error("Optional finding callback failed", e)
536+
}
537+
return findingDocPairs
538+
}
539+
540+
private suspend fun bulkIndexFindings(
541+
monitor: Monitor,
542+
monitorCtx: MonitorRunnerExecutionContext,
543+
indexRequests: List<IndexRequest>
544+
) {
514545
if (indexRequests.isNotEmpty()) {
515546
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
516547
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
517548
}
518549
if (bulkResponse.hasFailures()) {
519550
bulkResponse.items.forEach { item ->
520551
if (item.isFailed) {
521-
logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
552+
logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
522553
}
523554
}
524555
} else {
525556
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
526557
}
527558
}
528-
529-
try {
530-
findings.forEach { finding ->
531-
publishFinding(monitor, monitorCtx, finding)
532-
}
533-
} catch (e: Exception) {
534-
// suppress exception
535-
logger.error("Optional finding callback failed", e)
536-
}
537-
return findingDocPairs
538559
}
539560

540561
private fun publishFinding(
@@ -658,7 +679,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
658679
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
659680
}
660681
} catch (e: Exception) {
661-
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
682+
logger.error("Failed to run for shard $shard. Error: ${e.message}")
662683
}
663684
}
664685
return matchingDocs

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,6 @@ data class MonitorRunnerExecutionContext(
4747
@Volatile var destinationContextFactory: DestinationContextFactory? = null,
4848

4949
@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
50-
@Volatile var indexTimeout: TimeValue? = null
50+
@Volatile var indexTimeout: TimeValue? = null,
51+
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE
5152
)

alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class AlertingSettings {
1717

1818
companion object {
1919
const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L
20+
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 10000
2021

2122
val ALERTING_MAX_MONITORS = Setting.intSetting(
2223
"plugins.alerting.monitor.max_monitors",
@@ -153,5 +154,12 @@ class AlertingSettings {
153154
-1L,
154155
Setting.Property.NodeScope, Setting.Property.Dynamic
155156
)
157+
158+
val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting(
159+
"plugins.alerting.alert_findings_indexing_batch_size",
160+
DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
161+
0,
162+
Setting.Property.NodeScope, Setting.Property.Dynamic
163+
)
156164
}
157165
}

0 commit comments

Comments
 (0)