Skip to content

Commit fee62b5

Browse files
shards assignment to local Node when fanout flag is disabled (#1749)
* shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena <[email protected]> * shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena <[email protected]> * shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena <[email protected]> * shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena <[email protected]> * tests fix Signed-off-by: Riya Saxena <[email protected]> * tests fix Signed-off-by: Riya Saxena <[email protected]> --------- Signed-off-by: Riya Saxena <[email protected]>
1 parent 3755993 commit fee62b5

File tree

3 files changed

+77
-3
lines changed

3 files changed

+77
-3
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

+13-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
239239
shards.remove("index")
240240
shards.remove("shards_count")
241241

242-
val nodeMap = getNodes(monitorCtx)
242+
/**
243+
* if fanout flag is disabled and force assign all shards to local node
244+
* thus effectively making the fan-out a single node operation.
245+
* This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules
246+
**/
247+
val localNode = monitorCtx.clusterService!!.localNode()
248+
val nodeMap: Map<String, DiscoveryNode> = if (docLevelMonitorInput?.fanoutEnabled == true) {
249+
getNodes(monitorCtx)
250+
} else {
251+
logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}")
252+
mapOf(localNode.id to localNode)
253+
}
254+
243255
val nodeShardAssignments = distributeShards(
244256
monitorCtx,
245257
nodeMap.keys.toList(),

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

+62
Original file line numberDiff line numberDiff line change
@@ -2750,6 +2750,68 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
27502750
deleteDataStream(aliasName)
27512751
}
27522752

2753+
fun `test document-level monitor fanout disabled approach when aliases contain indices with multiple shards`() {
2754+
val aliasName = "test-alias"
2755+
createIndexAlias(
2756+
aliasName,
2757+
"""
2758+
"properties" : {
2759+
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
2760+
"test_field" : { "type" : "keyword" },
2761+
"number" : { "type" : "keyword" }
2762+
}
2763+
""".trimIndent(),
2764+
"\"index.number_of_shards\": 7"
2765+
)
2766+
2767+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
2768+
val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery), false)
2769+
2770+
val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
2771+
val monitor = createMonitor(
2772+
randomDocumentLevelMonitor(
2773+
inputs = listOf(docLevelInput),
2774+
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))),
2775+
enabled = true,
2776+
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
2777+
)
2778+
)
2779+
2780+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
2781+
val testDoc = """{
2782+
"@timestamp": "$testTime",
2783+
"message" : "This is an error from IAD region",
2784+
"test_strict_date_time" : "$testTime",
2785+
"test_field" : "us-west-2"
2786+
}"""
2787+
indexDoc(aliasName, "1", testDoc)
2788+
indexDoc(aliasName, "2", testDoc)
2789+
indexDoc(aliasName, "4", testDoc)
2790+
indexDoc(aliasName, "5", testDoc)
2791+
indexDoc(aliasName, "6", testDoc)
2792+
indexDoc(aliasName, "7", testDoc)
2793+
OpenSearchTestCase.waitUntil(
2794+
{ searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 },
2795+
2,
2796+
TimeUnit.MINUTES
2797+
)
2798+
2799+
rolloverDatastream(aliasName)
2800+
indexDoc(aliasName, "11", testDoc)
2801+
indexDoc(aliasName, "12", testDoc)
2802+
indexDoc(aliasName, "14", testDoc)
2803+
indexDoc(aliasName, "15", testDoc)
2804+
indexDoc(aliasName, "16", testDoc)
2805+
indexDoc(aliasName, "17", testDoc)
2806+
OpenSearchTestCase.waitUntil(
2807+
{ searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 },
2808+
2,
2809+
TimeUnit.MINUTES
2810+
)
2811+
2812+
deleteDataStream(aliasName)
2813+
}
2814+
27532815
fun `test execute monitor generates alerts and findings with renewable locks`() {
27542816
val testIndex = createTestIndex()
27552817
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))

sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void onFailure(Exception e) {
136136
};
137137
} else if (runMonitorParam.equals("multiple")) {
138138
SampleRemoteMonitorInput2 input2 = new SampleRemoteMonitorInput2("hello",
139-
new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of()))));
139+
new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())), true));
140140
BytesStreamOutput out1 = new BytesStreamOutput();
141141
input2.writeTo(out1);
142142
BytesReference input1Serialized1 = out1.bytes();
@@ -220,7 +220,7 @@ public void onFailure(Exception e) {
220220
sampleRemoteDocLevelMonitorInput.writeTo(out2);
221221
BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes();
222222

223-
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList());
223+
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList(), true);
224224
RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput);
225225

226226
Monitor remoteDocLevelMonitor = new Monitor(

0 commit comments

Comments
 (0)