Skip to content

Commit 5ed4db9

Browse files
authored
initial commit for remote monitor support (#1547)
Signed-off-by: Subhobrata Dey <[email protected]>
1 parent 0b2dc76 commit 5ed4db9

File tree

98 files changed

+2222
-1947
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+2222
-1947
lines changed

.github/workflows/test-workflow.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ jobs:
9696
java-version: ${{ matrix.java }}
9797
- name: Build and run with Gradle
9898
working-directory: ${{ env.WORKING_DIR }}
99-
run: ./gradlew assemble integTest ${{ env.BUILD_ARGS }}
99+
run: ./gradlew assemble ${{ env.BUILD_ARGS }}
100100
env:
101101
_JAVA_OPTIONS: ${{ matrix.os_java_options }}
102102
- name: Create Artifact Path

alerting/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6+
import com.github.jengelman.gradle.plugins.shadow.ShadowPlugin
67
import java.util.concurrent.Callable
78
import org.opensearch.gradle.test.RestIntegTestTask
89
import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask
@@ -114,6 +115,7 @@ dependencies {
114115

115116
api project(":alerting-core")
116117
implementation "com.github.seancfoley:ipaddress:5.4.1"
118+
implementation project(path: ":alerting-spi", configuration: 'shadow')
117119

118120
testImplementation "org.antlr:antlr4-runtime:${versions.antlr4}"
119121
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@ import org.opensearch.action.search.SearchRequest
1818
import org.opensearch.action.search.SearchResponse
1919
import org.opensearch.action.support.WriteRequest
2020
import org.opensearch.alerting.alerts.AlertIndices
21-
import org.opensearch.alerting.model.ActionRunResult
22-
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
23-
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
24-
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
2521
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
2622
import org.opensearch.alerting.opensearchapi.retry
2723
import org.opensearch.alerting.opensearchapi.suspendUntil
@@ -31,7 +27,6 @@ import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
3127
import org.opensearch.alerting.util.IndexUtils
3228
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
3329
import org.opensearch.alerting.util.getBucketKeysHash
34-
import org.opensearch.alerting.workflow.WorkflowRunContext
3530
import org.opensearch.client.Client
3631
import org.opensearch.common.unit.TimeValue
3732
import org.opensearch.common.xcontent.LoggingDeprecationHandler
@@ -40,14 +35,19 @@ import org.opensearch.common.xcontent.XContentHelper
4035
import org.opensearch.common.xcontent.XContentType
4136
import org.opensearch.commons.alerting.alerts.AlertError
4237
import org.opensearch.commons.alerting.model.ActionExecutionResult
38+
import org.opensearch.commons.alerting.model.ActionRunResult
4339
import org.opensearch.commons.alerting.model.AggregationResultBucket
4440
import org.opensearch.commons.alerting.model.Alert
4541
import org.opensearch.commons.alerting.model.BucketLevelTrigger
42+
import org.opensearch.commons.alerting.model.ChainedAlertTriggerRunResult
43+
import org.opensearch.commons.alerting.model.ClusterMetricsTriggerRunResult
4644
import org.opensearch.commons.alerting.model.DataSources
4745
import org.opensearch.commons.alerting.model.Monitor
4846
import org.opensearch.commons.alerting.model.NoOpTrigger
47+
import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult
4948
import org.opensearch.commons.alerting.model.Trigger
5049
import org.opensearch.commons.alerting.model.Workflow
50+
import org.opensearch.commons.alerting.model.WorkflowRunContext
5151
import org.opensearch.commons.alerting.model.action.AlertCategory
5252
import org.opensearch.core.action.ActionListener
5353
import org.opensearch.core.common.bytes.BytesReference

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

+24-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package org.opensearch.alerting
77

88
import org.opensearch.action.ActionRequest
9-
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
109
import org.opensearch.alerting.action.ExecuteMonitorAction
1110
import org.opensearch.alerting.action.ExecuteWorkflowAction
1211
import org.opensearch.alerting.action.GetDestinationsAction
@@ -25,6 +24,7 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
2524
import org.opensearch.alerting.core.schedule.JobScheduler
2625
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
2726
import org.opensearch.alerting.core.settings.ScheduledJobSettings
27+
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
2828
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
2929
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
3030
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
@@ -52,6 +52,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MON
5252
import org.opensearch.alerting.settings.DestinationSettings
5353
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
5454
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
55+
import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension
5556
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
5657
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
5758
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
@@ -85,6 +86,7 @@ import org.opensearch.common.settings.Setting
8586
import org.opensearch.common.settings.Settings
8687
import org.opensearch.common.settings.SettingsFilter
8788
import org.opensearch.commons.alerting.action.AlertingActions
89+
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutAction
8890
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
8991
import org.opensearch.commons.alerting.model.BucketLevelTrigger
9092
import org.opensearch.commons.alerting.model.ChainedAlertTrigger
@@ -96,6 +98,7 @@ import org.opensearch.commons.alerting.model.QueryLevelTrigger
9698
import org.opensearch.commons.alerting.model.ScheduledJob
9799
import org.opensearch.commons.alerting.model.SearchInput
98100
import org.opensearch.commons.alerting.model.Workflow
101+
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
99102
import org.opensearch.core.action.ActionResponse
100103
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
101104
import org.opensearch.core.common.io.stream.StreamInput
@@ -110,6 +113,7 @@ import org.opensearch.painless.spi.AllowlistLoader
110113
import org.opensearch.painless.spi.PainlessExtension
111114
import org.opensearch.percolator.PercolatorPluginExt
112115
import org.opensearch.plugins.ActionPlugin
116+
import org.opensearch.plugins.ExtensiblePlugin
113117
import org.opensearch.plugins.ReloadablePlugin
114118
import org.opensearch.plugins.ScriptPlugin
115119
import org.opensearch.plugins.SearchPlugin
@@ -162,6 +166,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
162166
lateinit var alertIndices: AlertIndices
163167
lateinit var clusterService: ClusterService
164168
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
169+
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()
165170

166171
override fun getRestHandlers(
167172
settings: Settings,
@@ -236,6 +241,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
236241
ClusterMetricsInput.XCONTENT_REGISTRY,
237242
DocumentLevelTrigger.XCONTENT_REGISTRY,
238243
ChainedAlertTrigger.XCONTENT_REGISTRY,
244+
RemoteMonitorTrigger.XCONTENT_REGISTRY,
239245
Workflow.XCONTENT_REGISTRY
240246
)
241247
}
@@ -277,6 +283,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
277283
.registerLockService(lockService)
278284
.registerConsumers()
279285
.registerDestinationSettings()
286+
.registerRemoteMonitors(monitorTypeToMonitorRunners)
280287
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
281288
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
282289
scheduler = JobScheduler(threadPool, runner)
@@ -409,4 +416,20 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
409416
)
410417
)
411418
}
419+
420+
override fun loadExtensions(loader: ExtensiblePlugin.ExtensionLoader) {
421+
for (monitorExtension in loader.loadExtensions(RemoteMonitorRunnerExtension::class.java)) {
422+
val monitorTypesToMonitorRunners = monitorExtension.getMonitorTypesToMonitorRunners()
423+
424+
for (monitorTypeToMonitorRunner in monitorTypesToMonitorRunners) {
425+
val monitorType = monitorTypeToMonitorRunner.key
426+
val monitorRunner = monitorTypeToMonitorRunner.value
427+
428+
if (!this.monitorTypeToMonitorRunners.containsKey(monitorType)) {
429+
val monitorRegistry = RemoteMonitorRegistry(monitorType, monitorRunner)
430+
this.monitorTypeToMonitorRunners[monitorType] = monitorRegistry
431+
}
432+
}
433+
}
434+
}
412435
}

alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,7 @@ import org.opensearch.action.index.IndexRequest
1212
import org.opensearch.action.search.SearchRequest
1313
import org.opensearch.action.search.SearchResponse
1414
import org.opensearch.action.support.WriteRequest
15-
import org.opensearch.alerting.model.ActionRunResult
1615
import org.opensearch.alerting.model.AlertContext
17-
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
18-
import org.opensearch.alerting.model.InputRunResults
19-
import org.opensearch.alerting.model.MonitorRunResult
2016
import org.opensearch.alerting.opensearchapi.InjectorContextElement
2117
import org.opensearch.alerting.opensearchapi.convertToMap
2218
import org.opensearch.alerting.opensearchapi.retry
@@ -29,16 +25,20 @@ import org.opensearch.alerting.util.getBucketKeysHash
2925
import org.opensearch.alerting.util.getCancelAfterTimeInterval
3026
import org.opensearch.alerting.util.getCombinedTriggerRunResult
3127
import org.opensearch.alerting.util.printsSampleDocData
32-
import org.opensearch.alerting.workflow.WorkflowRunContext
3328
import org.opensearch.client.Client
3429
import org.opensearch.common.unit.TimeValue
3530
import org.opensearch.common.xcontent.LoggingDeprecationHandler
3631
import org.opensearch.common.xcontent.XContentType
32+
import org.opensearch.commons.alerting.model.ActionRunResult
3733
import org.opensearch.commons.alerting.model.Alert
3834
import org.opensearch.commons.alerting.model.BucketLevelTrigger
35+
import org.opensearch.commons.alerting.model.BucketLevelTriggerRunResult
3936
import org.opensearch.commons.alerting.model.Finding
37+
import org.opensearch.commons.alerting.model.InputRunResults
4038
import org.opensearch.commons.alerting.model.Monitor
39+
import org.opensearch.commons.alerting.model.MonitorRunResult
4140
import org.opensearch.commons.alerting.model.SearchInput
41+
import org.opensearch.commons.alerting.model.WorkflowRunContext
4242
import org.opensearch.commons.alerting.model.action.AlertCategory
4343
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
4444
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

+13-13
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,24 @@ import org.opensearch.ExceptionsHelper
1010
import org.opensearch.Version
1111
import org.opensearch.action.ActionListenerResponseHandler
1212
import org.opensearch.action.support.GroupedActionListener
13-
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
14-
import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest
15-
import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse
16-
import org.opensearch.alerting.model.ActionRunResult
17-
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
18-
import org.opensearch.alerting.model.IndexExecutionContext
19-
import org.opensearch.alerting.model.InputRunResults
20-
import org.opensearch.alerting.model.MonitorRunResult
21-
import org.opensearch.alerting.util.AlertingException
2213
import org.opensearch.alerting.util.IndexUtils
23-
import org.opensearch.alerting.workflow.WorkflowRunContext
2414
import org.opensearch.cluster.metadata.IndexMetadata
2515
import org.opensearch.cluster.node.DiscoveryNode
2616
import org.opensearch.cluster.routing.ShardRouting
2717
import org.opensearch.cluster.service.ClusterService
18+
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutAction
19+
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutRequest
20+
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse
21+
import org.opensearch.commons.alerting.model.ActionRunResult
2822
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
2923
import org.opensearch.commons.alerting.model.DocLevelQuery
24+
import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult
25+
import org.opensearch.commons.alerting.model.IndexExecutionContext
26+
import org.opensearch.commons.alerting.model.InputRunResults
3027
import org.opensearch.commons.alerting.model.Monitor
28+
import org.opensearch.commons.alerting.model.MonitorRunResult
29+
import org.opensearch.commons.alerting.model.WorkflowRunContext
30+
import org.opensearch.commons.alerting.util.AlertingException
3131
import org.opensearch.core.action.ActionListener
3232
import org.opensearch.core.common.breaker.CircuitBreakingException
3333
import org.opensearch.core.common.io.stream.Writeable
@@ -436,7 +436,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
436436
if (res.exception == null) {
437437
return null
438438
} else {
439-
exceptions.add(res.exception)
439+
exceptions.add(res.exception!!)
440440
}
441441
}
442442
return AlertingException.merge(*exceptions.toTypedArray())
@@ -501,9 +501,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
501501
if (response.exception == null) {
502502
if (response.inputResults.error != null) {
503503
if (response.inputResults.error is AlertingException) {
504-
errors.add(response.inputResults.error)
504+
errors.add(response.inputResults.error as AlertingException)
505505
} else {
506-
errors.add(AlertingException.wrap(response.inputResults.error) as AlertingException)
506+
errors.add(AlertingException.wrap(response.inputResults.error as Exception) as AlertingException)
507507
}
508508
}
509509
val partialResult = response.inputResults.results

alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import kotlinx.coroutines.withContext
1010
import org.apache.logging.log4j.LogManager
1111
import org.opensearch.action.search.SearchRequest
1212
import org.opensearch.action.search.SearchResponse
13-
import org.opensearch.alerting.model.InputRunResults
14-
import org.opensearch.alerting.model.TriggerAfterKey
1513
import org.opensearch.alerting.opensearchapi.convertToMap
1614
import org.opensearch.alerting.opensearchapi.suspendUntil
1715
import org.opensearch.alerting.settings.AlertingSettings
@@ -22,7 +20,6 @@ import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTranspor
2220
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
2321
import org.opensearch.alerting.util.getRoleFilterEnabled
2422
import org.opensearch.alerting.util.use
25-
import org.opensearch.alerting.workflow.WorkflowRunContext
2623
import org.opensearch.client.Client
2724
import org.opensearch.cluster.routing.Preference
2825
import org.opensearch.cluster.service.ClusterService
@@ -31,8 +28,11 @@ import org.opensearch.common.settings.Settings
3128
import org.opensearch.common.xcontent.LoggingDeprecationHandler
3229
import org.opensearch.common.xcontent.XContentType
3330
import org.opensearch.commons.alerting.model.ClusterMetricsInput
31+
import org.opensearch.commons.alerting.model.InputRunResults
3432
import org.opensearch.commons.alerting.model.Monitor
3533
import org.opensearch.commons.alerting.model.SearchInput
34+
import org.opensearch.commons.alerting.model.TriggerAfterKey
35+
import org.opensearch.commons.alerting.model.WorkflowRunContext
3636
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput
3737
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
3838
import org.opensearch.core.xcontent.NamedXContentRegistry

alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt

+8-12
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ import org.opensearch.action.get.GetResponse
2525
import org.opensearch.action.index.IndexRequest
2626
import org.opensearch.action.index.IndexResponse
2727
import org.opensearch.action.support.WriteRequest
28-
import org.opensearch.alerting.model.MonitorMetadata
2928
import org.opensearch.alerting.opensearchapi.suspendUntil
3029
import org.opensearch.alerting.settings.AlertingSettings
31-
import org.opensearch.alerting.util.AlertingException
3230
import org.opensearch.alerting.util.IndexUtils
3331
import org.opensearch.client.Client
3432
import org.opensearch.cluster.service.ClusterService
@@ -40,17 +38,16 @@ import org.opensearch.common.xcontent.XContentHelper
4038
import org.opensearch.common.xcontent.XContentType
4139
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
4240
import org.opensearch.commons.alerting.model.Monitor
43-
import org.opensearch.commons.alerting.model.Monitor.MonitorType
41+
import org.opensearch.commons.alerting.model.MonitorMetadata
4442
import org.opensearch.commons.alerting.model.ScheduledJob
43+
import org.opensearch.commons.alerting.util.AlertingException
4544
import org.opensearch.core.rest.RestStatus
4645
import org.opensearch.core.xcontent.NamedXContentRegistry
4746
import org.opensearch.core.xcontent.ToXContent
4847
import org.opensearch.core.xcontent.XContentParser
4948
import org.opensearch.core.xcontent.XContentParserUtils
5049
import org.opensearch.index.seqno.SequenceNumbers
5150
import org.opensearch.transport.RemoteTransportException
52-
import java.util.Locale
53-
import kotlin.collections.HashMap
5451

5552
private val log = LogManager.getLogger(MonitorMetadataService::class.java)
5653

@@ -188,10 +185,10 @@ object MonitorMetadataService :
188185

189186
suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
190187
try {
191-
val monitorIndex = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
188+
val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
192189
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
193190
else null
194-
val runContext = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
191+
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
195192
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
196193
else null
197194
return if (runContext != null) {
@@ -211,13 +208,12 @@ object MonitorMetadataService :
211208
createWithRunContext: Boolean,
212209
workflowMetadataId: String? = null,
213210
): MonitorMetadata {
214-
val monitorIndex = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
211+
val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
215212
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
216213
else null
217-
val runContext =
218-
if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
219-
createFullRunContext(monitorIndex)
220-
else emptyMap()
214+
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
215+
createFullRunContext(monitorIndex)
216+
else emptyMap()
221217
return MonitorMetadata(
222218
id = MonitorMetadata.getId(monitor, workflowMetadataId),
223219
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO,

0 commit comments

Comments
 (0)