Skip to content

Commit 47effcb

Browse files
authored
add support for RemoteDocLevelMonitorInput (#1564)
Signed-off-by: Subhobrata Dey <[email protected]>
1 parent 8ef66c7 commit 47effcb

File tree

7 files changed

+130
-31
lines changed

7 files changed

+130
-31
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt

+7-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
4040
import org.opensearch.commons.alerting.model.Monitor
4141
import org.opensearch.commons.alerting.model.MonitorMetadata
4242
import org.opensearch.commons.alerting.model.ScheduledJob
43+
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
4344
import org.opensearch.commons.alerting.util.AlertingException
4445
import org.opensearch.core.rest.RestStatus
4546
import org.opensearch.core.xcontent.NamedXContentRegistry
@@ -185,8 +186,10 @@ object MonitorMetadataService :
185186

186187
suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
187188
try {
188-
val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
189+
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
189190
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
191+
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
192+
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
190193
else null
191194
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
192195
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
@@ -208,8 +211,10 @@ object MonitorMetadataService :
208211
createWithRunContext: Boolean,
209212
workflowMetadataId: String? = null,
210213
): MonitorMetadata {
211-
val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
214+
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
212215
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
216+
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
217+
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
213218
else null
214219
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
215220
createFullRunContext(monitorIndex)

alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.opensearch.commons.alerting.model.InputRunResults
2323
import org.opensearch.commons.alerting.model.Monitor
2424
import org.opensearch.commons.alerting.model.MonitorRunResult
2525
import org.opensearch.commons.alerting.model.WorkflowRunContext
26+
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
2627
import org.opensearch.commons.alerting.util.AlertingException
2728
import org.opensearch.core.index.shard.ShardId
2829
import org.opensearch.core.rest.RestStatus
@@ -64,7 +65,8 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
6465
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
6566
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>
6667

67-
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
68+
val remoteDocLevelMonitorInput = monitor.inputs[0] as RemoteDocLevelMonitorInput
69+
val docLevelMonitorInput = remoteDocLevelMonitorInput.docLevelMonitorInput
6870
var shards: Set<String> = mutableSetOf()
6971
var concreteIndices = listOf<String>()
7072

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt

+9-2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ import org.opensearch.commons.alerting.model.MonitorMetadata
6464
import org.opensearch.commons.alerting.model.ScheduledJob
6565
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
6666
import org.opensearch.commons.alerting.model.SearchInput
67+
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
68+
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput.Companion.REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD
6769
import org.opensearch.commons.alerting.util.AlertingException
6870
import org.opensearch.commons.alerting.util.isMonitorOfStandardType
6971
import org.opensearch.commons.authuser.User
@@ -185,10 +187,15 @@ class TransportIndexMonitorAction @Inject constructor(
185187
) {
186188
val indices = mutableListOf<String>()
187189
// todo: for doc level alerting: check if index is present before monitor is created.
188-
val searchInputs = request.monitor.inputs.filter { it.name() == SearchInput.SEARCH_FIELD || it.name() == DOC_LEVEL_INPUT_FIELD }
190+
val searchInputs = request.monitor.inputs.filter {
191+
it.name() == SearchInput.SEARCH_FIELD ||
192+
it.name() == DOC_LEVEL_INPUT_FIELD ||
193+
it.name() == REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD
194+
}
189195
searchInputs.forEach {
190196
val inputIndices = if (it.name() == SearchInput.SEARCH_FIELD) (it as SearchInput).indices
191-
else (it as DocLevelMonitorInput).indices
197+
else if (it.name() == DOC_LEVEL_INPUT_FIELD) (it as DocLevelMonitorInput).indices
198+
else (it as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices
192199
indices.addAll(inputIndices)
193200
}
194201
val updatedIndices = indices.map { index ->

sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.alerting;
77

88
import org.opensearch.action.support.WriteRequest;
9+
import org.opensearch.alerting.monitor.inputs.SampleRemoteDocLevelMonitorInput;
910
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput1;
1011
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput2;
1112
import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTrigger1;
@@ -22,6 +23,7 @@
2223
import org.opensearch.commons.alerting.model.Monitor;
2324
import org.opensearch.commons.alerting.model.action.Action;
2425
import org.opensearch.commons.alerting.model.action.Throttle;
26+
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput;
2527
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput;
2628
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger;
2729
import org.opensearch.core.action.ActionListener;
@@ -206,6 +208,15 @@ public void onFailure(Exception e) {
206208
);
207209
};
208210
} else {
211+
SampleRemoteDocLevelMonitorInput sampleRemoteDocLevelMonitorInput =
212+
new SampleRemoteDocLevelMonitorInput("hello", Map.of("world", 1), 2);
213+
BytesStreamOutput out2 = new BytesStreamOutput();
214+
sampleRemoteDocLevelMonitorInput.writeTo(out2);
215+
BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes();
216+
217+
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", List.of("index"), emptyList());
218+
RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput);
219+
209220
Monitor remoteDocLevelMonitor = new Monitor(
210221
Monitor.NO_ID,
211222
Monitor.NO_VERSION,
@@ -217,7 +228,7 @@ public void onFailure(Exception e) {
217228
SampleRemoteMonitorPlugin.SAMPLE_REMOTE_DOC_LEVEL_MONITOR,
218229
null,
219230
0,
220-
List.of(new DocLevelMonitorInput("description", List.of("index"), emptyList())),
231+
List.of(remoteDocLevelMonitorInput),
221232
List.of(),
222233
Map.of(),
223234
new DataSources(),

sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/fanouts/TransportRemoteDocLevelMonitorFanOutAction.java

+41-24
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,21 @@
1010
import org.opensearch.action.support.ActionFilters;
1111
import org.opensearch.action.support.HandledTransportAction;
1212
import org.opensearch.action.support.WriteRequest;
13+
import org.opensearch.alerting.monitor.inputs.SampleRemoteDocLevelMonitorInput;
1314
import org.opensearch.alerting.monitor.runners.SampleRemoteDocLevelMonitorRunner;
1415
import org.opensearch.client.Client;
1516
import org.opensearch.cluster.service.ClusterService;
1617
import org.opensearch.common.inject.Inject;
1718
import org.opensearch.common.settings.Settings;
1819
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutRequest;
1920
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse;
21+
import org.opensearch.commons.alerting.model.DocLevelMonitorInput;
2022
import org.opensearch.commons.alerting.model.InputRunResults;
2123
import org.opensearch.commons.alerting.model.Monitor;
24+
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput;
2225
import org.opensearch.core.action.ActionListener;
26+
import org.opensearch.core.common.bytes.BytesReference;
27+
import org.opensearch.core.common.io.stream.StreamInput;
2328
import org.opensearch.core.xcontent.NamedXContentRegistry;
2429
import org.opensearch.tasks.Task;
2530
import org.opensearch.transport.TransportService;
@@ -55,30 +60,42 @@ public TransportRemoteDocLevelMonitorFanOutAction(
5560

5661
@Override
5762
protected void doExecute(Task task, DocLevelMonitorFanOutRequest request, ActionListener<DocLevelMonitorFanOutResponse> actionListener) {
58-
Monitor monitor = request.getMonitor();
59-
Map<String, Object> lastRunContext = request.getMonitorMetadata().getLastRunContext();
60-
((Map<String, Object>) lastRunContext.get("index")).put("0", 0);
61-
IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX)
62-
.source(Map.of("sample", "record")).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
63-
this.client.index(indexRequest, new ActionListener<>() {
64-
@Override
65-
public void onResponse(IndexResponse indexResponse) {
66-
DocLevelMonitorFanOutResponse response = new DocLevelMonitorFanOutResponse(
67-
clusterService.localNode().getId(),
68-
request.getExecutionId(),
69-
monitor.getId(),
70-
lastRunContext,
71-
new InputRunResults(),
72-
new HashMap<>(),
73-
null
74-
);
75-
actionListener.onResponse(response);
76-
}
63+
try {
64+
Monitor monitor = request.getMonitor();
65+
Map<String, Object> lastRunContext = request.getMonitorMetadata().getLastRunContext();
7766

78-
@Override
79-
public void onFailure(Exception e) {
80-
actionListener.onFailure(e);
81-
}
82-
});
67+
RemoteDocLevelMonitorInput input = (RemoteDocLevelMonitorInput) monitor.getInputs().get(0);
68+
BytesReference customInputSerialized = input.getInput();
69+
StreamInput sin = StreamInput.wrap(customInputSerialized.toBytesRef().bytes);
70+
SampleRemoteDocLevelMonitorInput sampleRemoteDocLevelMonitorInput = new SampleRemoteDocLevelMonitorInput(sin);
71+
DocLevelMonitorInput docLevelMonitorInput = input.getDocLevelMonitorInput();
72+
String index = docLevelMonitorInput.getIndices().get(0);
73+
74+
((Map<String, Object>) lastRunContext.get(index)).put("0", 0);
75+
IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX)
76+
.source(sampleRemoteDocLevelMonitorInput.getB()).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
77+
this.client.index(indexRequest, new ActionListener<>() {
78+
@Override
79+
public void onResponse(IndexResponse indexResponse) {
80+
DocLevelMonitorFanOutResponse response = new DocLevelMonitorFanOutResponse(
81+
clusterService.localNode().getId(),
82+
request.getExecutionId(),
83+
monitor.getId(),
84+
lastRunContext,
85+
new InputRunResults(),
86+
new HashMap<>(),
87+
null
88+
);
89+
actionListener.onResponse(response);
90+
}
91+
92+
@Override
93+
public void onFailure(Exception e) {
94+
actionListener.onFailure(e);
95+
}
96+
});
97+
} catch (Exception ex) {
98+
actionListener.onFailure(ex);
99+
}
83100
}
84101
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.monitor.inputs;
7+
8+
import org.opensearch.core.common.io.stream.StreamInput;
9+
import org.opensearch.core.common.io.stream.StreamOutput;
10+
import org.opensearch.core.common.io.stream.Writeable;
11+
12+
import java.io.IOException;
13+
import java.util.Map;
14+
15+
public class SampleRemoteDocLevelMonitorInput implements Writeable {
16+
17+
private String a;
18+
19+
private Map<String, Object> b;
20+
21+
private int c;
22+
23+
public SampleRemoteDocLevelMonitorInput(String a, Map<String, Object> b, int c) {
24+
this.a = a;
25+
this.b = b;
26+
this.c = c;
27+
}
28+
29+
public SampleRemoteDocLevelMonitorInput(StreamInput sin) throws IOException {
30+
this(
31+
sin.readString(),
32+
sin.readMap(),
33+
sin.readInt()
34+
);
35+
}
36+
37+
@Override
38+
public void writeTo(StreamOutput out) throws IOException {
39+
out.writeString(a);
40+
out.writeMap(b);
41+
out.writeInt(c);
42+
}
43+
44+
public int getC() {
45+
return c;
46+
}
47+
48+
public Map<String, Object> getB() {
49+
return b;
50+
}
51+
52+
public String getA() {
53+
return a;
54+
}
55+
}

sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,9 @@ public void testSampleRemoteDocLevelMonitor() throws IOException, InterruptedExc
167167
LoggingDeprecationHandler.INSTANCE,
168168
searchResponse.getEntity().getContent()
169169
).map();
170-
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1);
170+
found.set(Integer.parseInt((((Map<String, Object>) ((Map<String, Object>) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1 &&
171+
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("world") &&
172+
((Map<String, Object>) ((List<Map<String, Object>>) ((Map<String, Object>) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("world").toString().equals("1"));
171173
return found.get();
172174
} catch (IOException ex) {
173175
return false;

0 commit comments

Comments
 (0)