Skip to content

Commit f27a687

Browse files
committed
Implemented cross-cluster monitor support (opensearch-project#1404)
* Updated alert mappings to accommodate cross-cluster cluster metrics monitors. Signed-off-by: AWSHurneyt <[email protected]> * Implemented support for cross-cluster cluster metrics monitors. Implemented GetRemoteIndexes API to populate the frontend UI with details regarding the remote clusters, and indexes. Signed-off-by: AWSHurneyt <[email protected]> * Fixed a writeable test after changing QueryLevelTriggerRunResult from a data class to an open class for inheritability. Signed-off-by: AWSHurneyt <[email protected]> * Fixed ktlint errors. Signed-off-by: AWSHurneyt <[email protected]> * Removed changes to IndexUtils as they're only needed by doc monitors. Signed-off-by: AWSHurneyt <[email protected]> --------- Signed-off-by: AWSHurneyt <[email protected]> (cherry picked from commit ea36996) Signed-off-by: AWSHurneyt <[email protected]>
1 parent c8ef164 commit f27a687

16 files changed

+954
-59
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

+19-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.opensearch.action.support.WriteRequest
2020
import org.opensearch.alerting.alerts.AlertIndices
2121
import org.opensearch.alerting.model.ActionRunResult
2222
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
23+
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
2324
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
2425
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
2526
import org.opensearch.alerting.opensearchapi.retry
@@ -190,6 +191,19 @@ class AlertService(
190191
)
191192
}
192193

194+
// Including a list of triggered clusters for cluster metrics monitors
195+
var triggeredClusters: MutableList<String>? = null
196+
if (result is ClusterMetricsTriggerRunResult)
197+
result.clusterTriggerResults.forEach {
198+
if (it.triggered) {
199+
// Add an empty list if one isn't already present
200+
if (triggeredClusters.isNullOrEmpty()) triggeredClusters = mutableListOf()
201+
202+
// Add the cluster to the list of triggered clusters
203+
triggeredClusters!!.add(it.cluster)
204+
}
205+
}
206+
193207
// Merge the alert's error message to the current alert's history
194208
val updatedHistory = currentAlert?.errorHistory.update(alertError)
195209
return if (alertError == null && !result.triggered) {
@@ -199,7 +213,8 @@ class AlertService(
199213
errorMessage = null,
200214
errorHistory = updatedHistory,
201215
actionExecutionResults = updatedActionExecutionResults,
202-
schemaVersion = IndexUtils.alertIndexSchemaVersion
216+
schemaVersion = IndexUtils.alertIndexSchemaVersion,
217+
clusters = triggeredClusters
203218
)
204219
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
205220
null
@@ -212,6 +227,7 @@ class AlertService(
212227
errorHistory = updatedHistory,
213228
actionExecutionResults = updatedActionExecutionResults,
214229
schemaVersion = IndexUtils.alertIndexSchemaVersion,
230+
clusters = triggeredClusters
215231
)
216232
} else {
217233
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
@@ -223,7 +239,8 @@ class AlertService(
223239
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
224240
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
225241
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId,
226-
workflowId = workflorwRunContext?.workflowId ?: ""
242+
workflowId = workflorwRunContext?.workflowId ?: "",
243+
clusters = triggeredClusters
227244
)
228245
}
229246
}

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

+10-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.opensearch.alerting.action.ExecuteWorkflowAction
1111
import org.opensearch.alerting.action.GetDestinationsAction
1212
import org.opensearch.alerting.action.GetEmailAccountAction
1313
import org.opensearch.alerting.action.GetEmailGroupAction
14+
import org.opensearch.alerting.action.GetRemoteIndexesAction
1415
import org.opensearch.alerting.action.SearchEmailAccountAction
1516
import org.opensearch.alerting.action.SearchEmailGroupAction
1617
import org.opensearch.alerting.alerts.AlertIndices
@@ -34,6 +35,7 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
3435
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
3536
import org.opensearch.alerting.resthandler.RestGetFindingsAction
3637
import org.opensearch.alerting.resthandler.RestGetMonitorAction
38+
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction
3739
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
3840
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
3941
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
@@ -59,6 +61,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction
5961
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
6062
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
6163
import org.opensearch.alerting.transport.TransportGetMonitorAction
64+
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction
6265
import org.opensearch.alerting.transport.TransportGetWorkflowAction
6366
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
6467
import org.opensearch.alerting.transport.TransportIndexMonitorAction
@@ -134,6 +137,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
134137

135138
@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"
136139
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
140+
@JvmField val REMOTE_BASE_URI = "/_plugins/_alerting/remote"
137141
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
138142

139143
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@@ -192,7 +196,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
192196
RestGetWorkflowAlertsAction(),
193197
RestGetFindingsAction(),
194198
RestGetWorkflowAction(),
195-
RestDeleteWorkflowAction()
199+
RestDeleteWorkflowAction(),
200+
RestGetRemoteIndexesAction(),
196201
)
197202
}
198203

@@ -219,7 +224,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
219224
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
220225
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
221226
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
222-
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
227+
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
228+
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
223229
)
224230
}
225231

@@ -356,7 +362,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
356362
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
357363
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
358364
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
359-
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE
365+
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
366+
AlertingSettings.REMOTE_MONITORING_ENABLED
360367
)
361368
}
362369

alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt

+39-4
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@
55

66
package org.opensearch.alerting
77

8+
import kotlinx.coroutines.CoroutineScope
9+
import kotlinx.coroutines.Dispatchers
10+
import kotlinx.coroutines.launch
811
import org.apache.logging.log4j.LogManager
912
import org.opensearch.action.search.SearchRequest
1013
import org.opensearch.action.search.SearchResponse
1114
import org.opensearch.alerting.model.InputRunResults
1215
import org.opensearch.alerting.model.TriggerAfterKey
1316
import org.opensearch.alerting.opensearchapi.convertToMap
1417
import org.opensearch.alerting.opensearchapi.suspendUntil
18+
import org.opensearch.alerting.settings.AlertingSettings
1519
import org.opensearch.alerting.util.AggregationQueryRewriter
20+
import org.opensearch.alerting.util.CrossClusterMonitorUtils
1621
import org.opensearch.alerting.util.addUserBackendRolesFilter
1722
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
1823
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
@@ -43,6 +48,8 @@ import org.opensearch.script.TemplateScript
4348
import org.opensearch.search.builder.SearchSourceBuilder
4449
import java.time.Instant
4550

51+
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
52+
4653
/** Service that handles the collection of input results for Monitor executions */
4754
class InputService(
4855
val client: Client,
@@ -100,8 +107,9 @@ class InputService(
100107
.newInstance(searchParams)
101108
.execute()
102109

110+
val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(input.indices, clusterService)
103111
val searchRequest = SearchRequest()
104-
.indices(*input.indices.toTypedArray())
112+
.indices(*indexes.toTypedArray())
105113
.preference(Preference.PRIMARY_FIRST.type())
106114
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
107115
searchRequest.source(SearchSourceBuilder.fromXContent(it))
@@ -115,9 +123,36 @@ class InputService(
115123
results += searchResponse.convertToMap()
116124
}
117125
is ClusterMetricsInput -> {
118-
logger.debug("ClusterMetricsInput clusterMetricType: ${input.clusterMetricType}")
119-
val response = executeTransportAction(input, client)
120-
results += response.toMap()
126+
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)
127+
128+
val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
129+
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)
130+
131+
val responseMap = mutableMapOf<String, Map<String, Any>>()
132+
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
133+
client.threadPool().threadContext.stashContext().use {
134+
scope.launch {
135+
input.clusters.forEach { cluster ->
136+
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
137+
val response = executeTransportAction(input, targetClient)
138+
// Not all supported API reference the cluster name in their response.
139+
// Mapping each response to the cluster name before adding to results.
140+
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
141+
responseMap[cluster] = response.toMap()
142+
}
143+
}
144+
}
145+
val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT)
146+
val startTime = Instant.now().toEpochMilli()
147+
while (
148+
(Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) ||
149+
(responseMap.size < input.clusters.size)
150+
) { /* Wait for responses */ }
151+
results += responseMap
152+
} else {
153+
val response = executeTransportAction(input, client)
154+
results += response.toMap()
155+
}
121156
}
122157
else -> {
123158
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")

alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt

+16-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.opensearch.alerting.model.QueryLevelTriggerRunResult
1111
import org.opensearch.alerting.opensearchapi.InjectorContextElement
1212
import org.opensearch.alerting.opensearchapi.withClosableContext
1313
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
14+
import org.opensearch.alerting.settings.AlertingSettings
1415
import org.opensearch.alerting.util.isADMonitor
1516
import org.opensearch.alerting.workflow.WorkflowRunContext
1617
import org.opensearch.commons.alerting.model.Alert
@@ -65,7 +66,21 @@ object QueryLevelMonitorRunner : MonitorRunner() {
6566
for (trigger in monitor.triggers) {
6667
val currentAlert = currentAlerts[trigger]
6768
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert)
68-
val triggerResult = monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
69+
val triggerResult = when (monitor.monitorType) {
70+
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
71+
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
72+
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
73+
val remoteMonitoringEnabled =
74+
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
75+
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)
76+
if (remoteMonitoringEnabled)
77+
monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!)
78+
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
79+
}
80+
else ->
81+
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.")
82+
}
83+
6984
triggerResults[trigger.id] = triggerResult
7085

7186
if (monitorCtx.triggerService!!.isQueryLevelTriggerActionable(triggerCtx, triggerResult, workflowRunContext)) {

alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt

+50
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,19 @@ import org.apache.logging.log4j.LogManager
99
import org.opensearch.alerting.chainedAlertCondition.parsers.ChainedAlertExpressionParser
1010
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
1111
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
12+
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
13+
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult.ClusterTriggerResult
1214
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
1315
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
1416
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
1517
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
1618
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
1719
import org.opensearch.alerting.script.TriggerScript
1820
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
21+
import org.opensearch.alerting.util.CrossClusterMonitorUtils
1922
import org.opensearch.alerting.util.getBucketKeysHash
2023
import org.opensearch.alerting.workflow.WorkflowRunContext
24+
import org.opensearch.cluster.service.ClusterService
2125
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES
2226
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH
2327
import org.opensearch.commons.alerting.model.AggregationResultBucket
@@ -79,6 +83,52 @@ class TriggerService(val scriptService: ScriptService) {
7983
}
8084
}
8185

86+
fun runClusterMetricsTrigger(
87+
monitor: Monitor,
88+
trigger: QueryLevelTrigger,
89+
ctx: QueryLevelTriggerExecutionContext,
90+
clusterService: ClusterService
91+
): ClusterMetricsTriggerRunResult {
92+
var runResult: ClusterMetricsTriggerRunResult?
93+
try {
94+
val inputResults = ctx.results.getOrElse(0) { mapOf() }
95+
var triggered = false
96+
val clusterTriggerResults = mutableListOf<ClusterTriggerResult>()
97+
if (CrossClusterMonitorUtils.isRemoteMonitor(monitor, clusterService)) {
98+
inputResults.forEach { clusterResult ->
99+
// Reducing the inputResults to only include results from 1 cluster at a time
100+
val clusterTriggerCtx = ctx.copy(results = listOf(mapOf(clusterResult.toPair())))
101+
102+
val clusterTriggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
103+
.newInstance(trigger.condition.params)
104+
.execute(clusterTriggerCtx)
105+
106+
if (clusterTriggered) {
107+
triggered = clusterTriggered
108+
clusterTriggerResults.add(ClusterTriggerResult(cluster = clusterResult.key, triggered = clusterTriggered))
109+
}
110+
}
111+
} else {
112+
triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
113+
.newInstance(trigger.condition.params)
114+
.execute(ctx)
115+
if (triggered) clusterTriggerResults
116+
.add(ClusterTriggerResult(cluster = clusterService.clusterName.value(), triggered = triggered))
117+
}
118+
runResult = ClusterMetricsTriggerRunResult(
119+
triggerName = trigger.name,
120+
triggered = triggered,
121+
error = null,
122+
clusterTriggerResults = clusterTriggerResults
123+
)
124+
} catch (e: Exception) {
125+
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
126+
// if the script fails we need to send an alert so set triggered = true
127+
runResult = ClusterMetricsTriggerRunResult(trigger.name, true, e)
128+
}
129+
return runResult!!
130+
}
131+
82132
// TODO: improve performance and support match all and match any
83133
fun runDocLevelTrigger(
84134
monitor: Monitor,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.action
7+
8+
import org.opensearch.action.ActionType
9+
10+
class GetRemoteIndexesAction private constructor() : ActionType<GetRemoteIndexesResponse>(NAME, ::GetRemoteIndexesResponse) {
11+
companion object {
12+
val INSTANCE = GetRemoteIndexesAction()
13+
const val NAME = "cluster:admin/opensearch/alerting/remote/indexes/get"
14+
}
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.action
7+
8+
import org.opensearch.action.ActionRequest
9+
import org.opensearch.action.ActionRequestValidationException
10+
import org.opensearch.core.common.io.stream.StreamInput
11+
import org.opensearch.core.common.io.stream.StreamOutput
12+
import java.io.IOException
13+
14+
class GetRemoteIndexesRequest : ActionRequest {
15+
var indexes: List<String> = listOf()
16+
var includeMappings: Boolean
17+
18+
constructor(indexes: List<String>, includeMappings: Boolean) : super() {
19+
this.indexes = indexes
20+
this.includeMappings = includeMappings
21+
}
22+
23+
@Throws(IOException::class)
24+
constructor(sin: StreamInput) : this(
25+
sin.readStringList(),
26+
sin.readBoolean()
27+
)
28+
29+
override fun validate(): ActionRequestValidationException? {
30+
return null
31+
}
32+
33+
@Throws(IOException::class)
34+
override fun writeTo(out: StreamOutput) {
35+
out.writeStringArray(indexes.toTypedArray())
36+
out.writeBoolean(includeMappings)
37+
}
38+
39+
companion object {
40+
const val INDEXES_FIELD = "indexes"
41+
const val INCLUDE_MAPPINGS_FIELD = "include_mappings"
42+
}
43+
}

0 commit comments

Comments
 (0)