-
Notifications
You must be signed in to change notification settings - Fork 114
Implemented cross-cluster monitor support #1404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
1181cb5
8f3a9ea
36482bb
7113fa3
4b907b9
885545f
54925c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import org.opensearch.action.support.WriteRequest | |
import org.opensearch.alerting.alerts.AlertIndices | ||
import org.opensearch.alerting.model.ActionRunResult | ||
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult | ||
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult | ||
import org.opensearch.alerting.model.QueryLevelTriggerRunResult | ||
import org.opensearch.alerting.opensearchapi.firstFailureOrNull | ||
import org.opensearch.alerting.opensearchapi.retry | ||
|
@@ -190,6 +191,19 @@ class AlertService( | |
) | ||
} | ||
|
||
// Including a list of triggered clusters for cluster metrics monitors | ||
var triggeredClusters: MutableList<String>? = null | ||
if (result is ClusterMetricsTriggerRunResult) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
isnt cluster metrics a separate kind of monitpr? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we not add triggered clusters metric to all monitor types' alerts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. Cluster metrics monitors started out as query level monitors before we added additional monitor types when we introduced bucket level monitors. Their alerts were handled in a similar way up until this point, so I added this check in |
||
result.clusterTriggerResults.forEach { | ||
if (it.triggered) { | ||
// Add an empty list if one isn't already present | ||
if (triggeredClusters.isNullOrEmpty()) triggeredClusters = mutableListOf() | ||
|
||
// Add the cluster to the list of triggered clusters | ||
triggeredClusters!!.add(it.cluster) | ||
} | ||
} | ||
|
||
// Merge the alert's error message to the current alert's history | ||
val updatedHistory = currentAlert?.errorHistory.update(alertError) | ||
return if (alertError == null && !result.triggered) { | ||
|
@@ -199,7 +213,8 @@ class AlertService( | |
errorMessage = null, | ||
errorHistory = updatedHistory, | ||
actionExecutionResults = updatedActionExecutionResults, | ||
schemaVersion = IndexUtils.alertIndexSchemaVersion | ||
schemaVersion = IndexUtils.alertIndexSchemaVersion, | ||
clusters = triggeredClusters | ||
) | ||
} else if (alertError == null && currentAlert?.isAcknowledged() == true) { | ||
null | ||
|
@@ -212,6 +227,7 @@ class AlertService( | |
errorHistory = updatedHistory, | ||
actionExecutionResults = updatedActionExecutionResults, | ||
schemaVersion = IndexUtils.alertIndexSchemaVersion, | ||
clusters = triggeredClusters | ||
) | ||
} else { | ||
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) { | ||
|
@@ -223,7 +239,8 @@ class AlertService( | |
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message, | ||
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults, | ||
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId, | ||
workflowId = workflorwRunContext?.workflowId ?: "" | ||
workflowId = workflorwRunContext?.workflowId ?: "", | ||
clusters = triggeredClusters | ||
) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ import org.opensearch.alerting.action.ExecuteWorkflowAction | |
import org.opensearch.alerting.action.GetDestinationsAction | ||
import org.opensearch.alerting.action.GetEmailAccountAction | ||
import org.opensearch.alerting.action.GetEmailGroupAction | ||
import org.opensearch.alerting.action.GetRemoteIndexesAction | ||
import org.opensearch.alerting.action.SearchEmailAccountAction | ||
import org.opensearch.alerting.action.SearchEmailGroupAction | ||
import org.opensearch.alerting.alerts.AlertIndices | ||
|
@@ -34,6 +35,7 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction | |
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction | ||
import org.opensearch.alerting.resthandler.RestGetFindingsAction | ||
import org.opensearch.alerting.resthandler.RestGetMonitorAction | ||
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction | ||
import org.opensearch.alerting.resthandler.RestGetWorkflowAction | ||
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction | ||
import org.opensearch.alerting.resthandler.RestIndexMonitorAction | ||
|
@@ -59,6 +61,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction | |
import org.opensearch.alerting.transport.TransportGetEmailGroupAction | ||
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction | ||
import org.opensearch.alerting.transport.TransportGetMonitorAction | ||
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction | ||
import org.opensearch.alerting.transport.TransportGetWorkflowAction | ||
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction | ||
import org.opensearch.alerting.transport.TransportIndexMonitorAction | ||
|
@@ -132,6 +135,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R | |
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}") | ||
@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors" | ||
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows" | ||
@JvmField val REMOTE_BASE_URI = "/_plugins/_alerting/remote" | ||
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" | ||
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors" | ||
@JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations" | ||
|
@@ -183,7 +187,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R | |
RestGetWorkflowAlertsAction(), | ||
RestGetFindingsAction(), | ||
RestGetWorkflowAction(), | ||
RestDeleteWorkflowAction() | ||
RestDeleteWorkflowAction(), | ||
RestGetRemoteIndexesAction(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indices* There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I went with |
||
) | ||
} | ||
|
||
|
@@ -210,7 +215,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R | |
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java), | ||
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java), | ||
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java), | ||
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java) | ||
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java), | ||
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java), | ||
) | ||
} | ||
|
||
|
@@ -346,7 +352,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R | |
AlertingSettings.FINDING_HISTORY_MAX_DOCS, | ||
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, | ||
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, | ||
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD | ||
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, | ||
AlertingSettings.REMOTE_MONITORING_ENABLED | ||
) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ import org.opensearch.alerting.model.QueryLevelTriggerRunResult | |
import org.opensearch.alerting.opensearchapi.InjectorContextElement | ||
import org.opensearch.alerting.opensearchapi.withClosableContext | ||
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext | ||
import org.opensearch.alerting.settings.AlertingSettings | ||
import org.opensearch.alerting.util.isADMonitor | ||
import org.opensearch.alerting.workflow.WorkflowRunContext | ||
import org.opensearch.commons.alerting.model.Alert | ||
|
@@ -65,7 +66,19 @@ object QueryLevelMonitorRunner : MonitorRunner() { | |
for (trigger in monitor.triggers) { | ||
val currentAlert = currentAlerts[trigger] | ||
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert) | ||
val triggerResult = monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) | ||
val triggerResult = when (monitor.monitorType) { | ||
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> | ||
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) | ||
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { | ||
val remoteMonitoringEnabled = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) | ||
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) | ||
if (remoteMonitoringEnabled) monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!) | ||
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, why cluster metric monitor run query level trigger? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cluster metrics monitors were initially query level monitors before bucket level introduced monitor types. If I remember correctly, we gave cluster metrics monitors a unique type around the implementation of doc level monitors. It could be nice to eventually give cluster metrics monitors their own assets like runner, and trigger; but at the time, their execution was so similar to query level, we didn't change them. |
||
} | ||
else -> | ||
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.") | ||
} | ||
|
||
triggerResults[trigger.id] = triggerResult | ||
|
||
if (monitorCtx.triggerService!!.isQueryLevelTriggerActionable(triggerCtx, triggerResult, workflowRunContext)) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.alerting.action | ||
|
||
import org.opensearch.action.ActionType | ||
|
||
class GetRemoteIndexesAction private constructor() : ActionType<GetRemoteIndexesResponse>(NAME, ::GetRemoteIndexesResponse) { | ||
companion object { | ||
val INSTANCE = GetRemoteIndexesAction() | ||
const val NAME = "cluster:admin/opensearch/alerting/remote/indexes/get" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.alerting.action | ||
|
||
import org.opensearch.action.ActionRequest | ||
import org.opensearch.action.ActionRequestValidationException | ||
import org.opensearch.core.common.io.stream.StreamInput | ||
import org.opensearch.core.common.io.stream.StreamOutput | ||
import java.io.IOException | ||
|
||
class GetRemoteIndexesRequest : ActionRequest { | ||
var indexes: List<String> = listOf() | ||
var includeMappings: Boolean | ||
|
||
constructor(indexes: List<String>, includeMappings: Boolean) : super() { | ||
this.indexes = indexes | ||
this.includeMappings = includeMappings | ||
} | ||
|
||
@Throws(IOException::class) | ||
constructor(sin: StreamInput) : this( | ||
sin.readStringList(), | ||
sin.readBoolean() | ||
) | ||
|
||
override fun validate(): ActionRequestValidationException? { | ||
return null | ||
} | ||
|
||
@Throws(IOException::class) | ||
override fun writeTo(out: StreamOutput) { | ||
out.writeStringArray(indexes.toTypedArray()) | ||
out.writeBoolean(includeMappings) | ||
} | ||
|
||
companion object { | ||
const val INDEXES_FIELD = "indexes" | ||
const val INCLUDE_MAPPINGS_FIELD = "include_mappings" | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider this is for experiemental, does here need a check for cluster setting for this feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that would be necessary, as the feature disabled, this will always be just the local cluster. But I'm open adding a setting check if we think that's best.