Skip to content

Implemented cross-cluster monitor support #1404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
Expand Down Expand Up @@ -190,6 +191,19 @@ class AlertService(
)
}

// Including a list of triggered clusters for cluster metrics monitors
var triggeredClusters: MutableList<String>? = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider this is for experiemental, does here need a check for cluster setting for this feature?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would be necessary, as the feature disabled, this will always be just the local cluster. But I'm open adding a setting check if we think that's best.

if (result is ClusterMetricsTriggerRunResult)
Copy link
Member

@eirsep eirsep Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

composeQueryLevelAlert() method is now checking for ClusterMetricsTriggerRunResult?

isnt cluster metrics a separate kind of monitpr?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we not add triggered clusters metric to all monitor types' alerts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Cluster metrics monitors started out as query level monitors before we added additional monitor types when we introduced bucket level monitors. Their alerts were handled in a similar way up until this point, so I added this check in composeQueryLevelAlert rather than duplicate the code elsewhere.

result.clusterTriggerResults.forEach {
if (it.triggered) {
// Add an empty list if one isn't already present
if (triggeredClusters.isNullOrEmpty()) triggeredClusters = mutableListOf()

// Add the cluster to the list of triggered clusters
triggeredClusters!!.add(it.cluster)
}
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
Expand All @@ -199,7 +213,8 @@ class AlertService(
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
Expand All @@ -212,6 +227,7 @@ class AlertService(
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
)
} else {
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Expand All @@ -223,7 +239,8 @@ class AlertService(
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId,
workflowId = workflorwRunContext?.workflowId ?: ""
workflowId = workflorwRunContext?.workflowId ?: "",
clusters = triggeredClusters
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.alerts.AlertIndices
Expand All @@ -34,6 +35,7 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
Expand All @@ -59,6 +61,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
Expand Down Expand Up @@ -132,6 +135,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")
@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val REMOTE_BASE_URI = "/_plugins/_alerting/remote"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
Expand Down Expand Up @@ -183,7 +187,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetWorkflowAlertsAction(),
RestGetFindingsAction(),
RestGetWorkflowAction(),
RestDeleteWorkflowAction()
RestDeleteWorkflowAction(),
RestGetRemoteIndexesAction(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indices*

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with indexes because we had this issue a while back in which we were moving away from indices.
opensearch-project/index-management-dashboards-plugin#637

)
}

Expand All @@ -210,7 +215,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
)
}

Expand Down Expand Up @@ -346,7 +352,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.REMOTE_MONITORING_ENABLED
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

package org.opensearch.alerting

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
Expand Down Expand Up @@ -44,6 +49,8 @@ import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

/** Service that handles the collection of input results for Monitor executions */
class InputService(
val client: Client,
Expand Down Expand Up @@ -101,8 +108,9 @@ class InputService(
.newInstance(searchParams)
.execute()

val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(input.indices, clusterService)
val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.indices(*indexes.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
Expand All @@ -116,9 +124,33 @@ class InputService(
results += searchResponse.convertToMap()
}
is ClusterMetricsInput -> {
logger.debug("ClusterMetricsInput clusterMetricType: ${input.clusterMetricType}")
val response = executeTransportAction(input, client)
results += response.toMap()
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
client.threadPool().threadContext.stashContext().use {
scope.launch {
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
}
}
val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT)
val startTime = Instant.now().toEpochMilli()
while ((Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) || (responseMap.size < input.clusters.size)) { /* Wait for responses */ }
results += responseMap
} else {
val response = executeTransportAction(input, client)
results += response.toMap()
}
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -65,7 +66,19 @@ object QueryLevelMonitorRunner : MonitorRunner() {
for (trigger in monitor.triggers) {
val currentAlert = currentAlerts[trigger]
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert)
val triggerResult = monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
val triggerResult = when (monitor.monitorType) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
val remoteMonitoringEnabled = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)
if (remoteMonitoringEnabled) monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!)
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why cluster metric monitor run query level trigger?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cluster metrics monitors were initially query level monitors before bucket level introduced monitor types. If I remember correctly, we gave cluster metrics monitors a unique type around the implementation of doc level monitors. It could be nice to eventually give cluster metrics monitors their own assets like runner, and trigger; but at the time, their execution was so similar to query level, we didn't change them.

}
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.")
}

triggerResults[trigger.id] = triggerResult

if (monitorCtx.triggerService!!.isQueryLevelTriggerActionable(triggerCtx, triggerResult, workflowRunContext)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.chainedAlertCondition.parsers.ChainedAlertExpressionParser
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult.ClusterTriggerResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH
import org.opensearch.commons.alerting.model.AggregationResultBucket
Expand Down Expand Up @@ -79,6 +83,52 @@ class TriggerService(val scriptService: ScriptService) {
}
}

fun runClusterMetricsTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
ctx: QueryLevelTriggerExecutionContext,
clusterService: ClusterService
): ClusterMetricsTriggerRunResult {
var runResult: ClusterMetricsTriggerRunResult?
try {
val inputResults = ctx.results.getOrElse(0) { mapOf() }
var triggered = false
val clusterTriggerResults = mutableListOf<ClusterTriggerResult>()
if (CrossClusterMonitorUtils.isRemoteMonitor(monitor, clusterService)) {
inputResults.forEach { clusterResult ->
// Reducing the inputResults to only include results from 1 cluster at a time
val clusterTriggerCtx = ctx.copy(results = listOf(mapOf(clusterResult.toPair())))

val clusterTriggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(trigger.condition.params)
.execute(clusterTriggerCtx)

if (clusterTriggered) {
triggered = clusterTriggered
clusterTriggerResults.add(ClusterTriggerResult(cluster = clusterResult.key, triggered = clusterTriggered))
}
}
} else {
triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(trigger.condition.params)
.execute(ctx)
if (triggered) clusterTriggerResults
.add(ClusterTriggerResult(cluster = clusterService.clusterName.value(), triggered = triggered))
}
runResult = ClusterMetricsTriggerRunResult(
triggerName = trigger.name,
triggered = triggered,
error = null,
clusterTriggerResults = clusterTriggerResults
)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// if the script fails we need to send an alert so set triggered = true
runResult = ClusterMetricsTriggerRunResult(trigger.name, true, e)
}
return runResult!!
}

// TODO: improve performance and support match all and match any
fun runDocLevelTrigger(
monitor: Monitor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionType

class GetRemoteIndexesAction private constructor() : ActionType<GetRemoteIndexesResponse>(NAME, ::GetRemoteIndexesResponse) {
companion object {
val INSTANCE = GetRemoteIndexesAction()
const val NAME = "cluster:admin/opensearch/alerting/remote/indexes/get"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException

class GetRemoteIndexesRequest : ActionRequest {
var indexes: List<String> = listOf()
var includeMappings: Boolean

constructor(indexes: List<String>, includeMappings: Boolean) : super() {
this.indexes = indexes
this.includeMappings = includeMappings
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readStringList(),
sin.readBoolean()
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeStringArray(indexes.toTypedArray())
out.writeBoolean(includeMappings)
}

companion object {
const val INDEXES_FIELD = "indexes"
const val INCLUDE_MAPPINGS_FIELD = "include_mappings"
}
}
Loading