Skip to content

Commit 96365c6

Browse files
eirsepstevanbz
andauthored
Added rest layer for the workflow. (opensearch-project#963) (opensearch-project#965)
* Added rest layer for the workflow. Added secure tests (opensearch-project#886) * Added rest layer for the workflow. Added secure tests * add execution_id field in alert mapping --------- Signed-off-by: Stevan Buzejic <[email protected]> Signed-off-by: Surya Sashank Nistala <[email protected]> Co-authored-by: Stevan Buzejic <[email protected]>
1 parent 0df3e33 commit 96365c6

File tree

11 files changed

+2896
-9
lines changed

11 files changed

+2896
-9
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting
2727
import org.opensearch.alerting.core.settings.ScheduledJobSettings
2828
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
2929
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
30+
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
3031
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
3132
import org.opensearch.alerting.resthandler.RestGetAlertsAction
3233
import org.opensearch.alerting.resthandler.RestGetDestinationsAction
3334
import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
3435
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
3536
import org.opensearch.alerting.resthandler.RestGetFindingsAction
3637
import org.opensearch.alerting.resthandler.RestGetMonitorAction
38+
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
3739
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
40+
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
3841
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
3942
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
4043
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
@@ -127,7 +130,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
127130
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")
128131

129132
@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"
130-
133+
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
131134
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
132135

133136
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@@ -171,6 +174,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
171174
RestGetMonitorAction(),
172175
RestDeleteMonitorAction(),
173176
RestIndexMonitorAction(),
177+
RestIndexWorkflowAction(),
174178
RestSearchMonitorAction(settings, clusterService),
175179
RestExecuteMonitorAction(),
176180
RestAcknowledgeAlertAction(),
@@ -181,7 +185,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
181185
RestGetEmailGroupAction(),
182186
RestGetDestinationsAction(),
183187
RestGetAlertsAction(),
184-
RestGetFindingsAction()
188+
RestGetFindingsAction(),
189+
RestGetWorkflowAction(),
190+
RestDeleteWorkflowAction()
185191
)
186192
}
187193

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.resthandler
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.action.support.WriteRequest
10+
import org.opensearch.alerting.AlertingPlugin
11+
import org.opensearch.alerting.util.REFRESH
12+
import org.opensearch.client.node.NodeClient
13+
import org.opensearch.commons.alerting.action.AlertingActions
14+
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
15+
import org.opensearch.rest.BaseRestHandler
16+
import org.opensearch.rest.RestHandler
17+
import org.opensearch.rest.RestRequest
18+
import org.opensearch.rest.action.RestToXContentListener
19+
import java.io.IOException
20+
21+
/**
22+
* This class consists of the REST handler to delete workflows.
23+
*/
24+
class RestDeleteWorkflowAction : BaseRestHandler() {
25+
26+
private val log = LogManager.getLogger(javaClass)
27+
28+
override fun getName(): String {
29+
return "delete_workflow_action"
30+
}
31+
32+
override fun routes(): List<RestHandler.Route> {
33+
return listOf(
34+
RestHandler.Route(
35+
RestRequest.Method.DELETE,
36+
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
37+
)
38+
)
39+
}
40+
41+
@Throws(IOException::class)
42+
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
43+
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}")
44+
45+
val workflowId = request.param("workflowID")
46+
val deleteDelegateMonitors = request.paramAsBoolean("deleteDelegateMonitors", false)
47+
log.debug("${request.method()} ${request.uri()}")
48+
49+
val refreshPolicy =
50+
WriteRequest.RefreshPolicy.parse(request.param(REFRESH, WriteRequest.RefreshPolicy.IMMEDIATE.value))
51+
val deleteWorkflowRequest = DeleteWorkflowRequest(workflowId, deleteDelegateMonitors)
52+
53+
return RestChannelConsumer { channel ->
54+
client.execute(
55+
AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, deleteWorkflowRequest,
56+
RestToXContentListener(channel)
57+
)
58+
}
59+
}
60+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting.resthandler
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.alerting.AlertingPlugin
10+
import org.opensearch.alerting.util.context
11+
import org.opensearch.client.node.NodeClient
12+
import org.opensearch.commons.alerting.action.AlertingActions
13+
import org.opensearch.commons.alerting.action.GetWorkflowRequest
14+
import org.opensearch.rest.BaseRestHandler
15+
import org.opensearch.rest.RestHandler
16+
import org.opensearch.rest.RestRequest
17+
import org.opensearch.rest.action.RestToXContentListener
18+
import org.opensearch.search.fetch.subphase.FetchSourceContext
19+
20+
/**
21+
* This class consists of the REST handler to retrieve a workflow .
22+
*/
23+
class RestGetWorkflowAction : BaseRestHandler() {
24+
25+
private val log = LogManager.getLogger(javaClass)
26+
27+
override fun getName(): String {
28+
return "get_workflow_action"
29+
}
30+
31+
override fun routes(): List<RestHandler.Route> {
32+
return listOf(
33+
RestHandler.Route(
34+
RestRequest.Method.GET,
35+
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
36+
)
37+
)
38+
}
39+
40+
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
41+
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}")
42+
43+
val workflowId = request.param("workflowID")
44+
if (workflowId == null || workflowId.isEmpty()) {
45+
throw IllegalArgumentException("missing id")
46+
}
47+
48+
var srcContext = context(request)
49+
if (request.method() == RestRequest.Method.HEAD) {
50+
srcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE
51+
}
52+
val getWorkflowRequest =
53+
GetWorkflowRequest(workflowId, request.method())
54+
return RestChannelConsumer {
55+
channel ->
56+
client.execute(AlertingActions.GET_WORKFLOW_ACTION_TYPE, getWorkflowRequest, RestToXContentListener(channel))
57+
}
58+
}
59+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.alerting.resthandler
6+
7+
import org.opensearch.action.support.WriteRequest
8+
import org.opensearch.alerting.AlertingPlugin
9+
import org.opensearch.alerting.util.AlertingException
10+
import org.opensearch.alerting.util.IF_PRIMARY_TERM
11+
import org.opensearch.alerting.util.IF_SEQ_NO
12+
import org.opensearch.alerting.util.REFRESH
13+
import org.opensearch.client.node.NodeClient
14+
import org.opensearch.common.xcontent.XContentParserUtils
15+
import org.opensearch.commons.alerting.action.AlertingActions
16+
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
17+
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
18+
import org.opensearch.commons.alerting.model.Workflow
19+
import org.opensearch.core.xcontent.ToXContent
20+
import org.opensearch.core.xcontent.XContentParser
21+
import org.opensearch.index.seqno.SequenceNumbers
22+
import org.opensearch.rest.BaseRestHandler
23+
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
24+
import org.opensearch.rest.BytesRestResponse
25+
import org.opensearch.rest.RestChannel
26+
import org.opensearch.rest.RestHandler
27+
import org.opensearch.rest.RestRequest
28+
import org.opensearch.rest.RestResponse
29+
import org.opensearch.rest.RestStatus
30+
import org.opensearch.rest.action.RestResponseListener
31+
import java.io.IOException
32+
import java.time.Instant
33+
34+
/**
35+
* Rest handlers to create and update workflows.
36+
*/
37+
class RestIndexWorkflowAction : BaseRestHandler() {
38+
39+
override fun getName(): String {
40+
return "index_workflow_action"
41+
}
42+
43+
override fun routes(): List<RestHandler.Route> {
44+
return listOf(
45+
RestHandler.Route(RestRequest.Method.POST, AlertingPlugin.WORKFLOW_BASE_URI),
46+
RestHandler.Route(
47+
RestRequest.Method.PUT,
48+
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
49+
)
50+
)
51+
}
52+
53+
@Throws(IOException::class)
54+
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
55+
val id = request.param("workflowID", Workflow.NO_ID)
56+
if (request.method() == RestRequest.Method.PUT && Workflow.NO_ID == id) {
57+
throw AlertingException.wrap(IllegalArgumentException("Missing workflow ID"))
58+
}
59+
60+
// Validate request by parsing JSON to Monitor
61+
val xcp = request.contentParser()
62+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
63+
val workflow = Workflow.parse(xcp, id).copy(lastUpdateTime = Instant.now())
64+
val rbacRoles = request.contentParser().map()["rbac_roles"] as List<String>?
65+
66+
val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
67+
val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
68+
val refreshPolicy = if (request.hasParam(REFRESH)) {
69+
WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
70+
} else {
71+
WriteRequest.RefreshPolicy.IMMEDIATE
72+
}
73+
val workflowRequest =
74+
IndexWorkflowRequest(id, seqNo, primaryTerm, refreshPolicy, request.method(), workflow, rbacRoles)
75+
76+
return RestChannelConsumer { channel ->
77+
client.execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, workflowRequest, indexMonitorResponse(channel, request.method()))
78+
}
79+
}
80+
81+
private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method): RestResponseListener<IndexWorkflowResponse> {
82+
return object : RestResponseListener<IndexWorkflowResponse>(channel) {
83+
@Throws(Exception::class)
84+
override fun buildResponse(response: IndexWorkflowResponse): RestResponse {
85+
var returnStatus = RestStatus.CREATED
86+
if (restMethod == RestRequest.Method.PUT)
87+
returnStatus = RestStatus.OK
88+
89+
val restResponse =
90+
BytesRestResponse(returnStatus, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
91+
if (returnStatus == RestStatus.CREATED) {
92+
val location = "${AlertingPlugin.WORKFLOW_BASE_URI}/${response.id}"
93+
restResponse.addHeader("Location", location)
94+
}
95+
return restResponse
96+
}
97+
}
98+
}
99+
}

alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"required": true
55
},
66
"_meta" : {
7-
"schema_version": 4
7+
"schema_version": 5
88
},
99
"properties": {
1010
"schema_version": {
@@ -25,6 +25,9 @@
2525
"severity": {
2626
"type": "keyword"
2727
},
28+
"execution_id": {
29+
"type": "keyword"
30+
},
2831
"monitor_name": {
2932
"type": "text",
3033
"fields": {

alerting/src/test/kotlin/org/opensearch/alerting/AccessRoles.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66
package org.opensearch.alerting
77

8+
import org.opensearch.alerting.action.ExecuteWorkflowAction
9+
import org.opensearch.commons.alerting.action.AlertingActions
10+
811
val ALL_ACCESS_ROLE = "all_access"
912
val READALL_AND_MONITOR_ROLE = "readall_and_monitor"
1013
val ALERTING_FULL_ACCESS_ROLE = "alerting_full_access"
@@ -16,11 +19,15 @@ val ALERTING_GET_EMAIL_GROUP_ACCESS = "alerting_get_email_group_access"
1619
val ALERTING_SEARCH_EMAIL_GROUP_ACCESS = "alerting_search_email_group_access"
1720
val ALERTING_INDEX_MONITOR_ACCESS = "alerting_index_monitor_access"
1821
val ALERTING_GET_MONITOR_ACCESS = "alerting_get_monitor_access"
22+
val ALERTING_GET_WORKFLOW_ACCESS = "alerting_get_workflow_access"
23+
val ALERTING_DELETE_WORKFLOW_ACCESS = "alerting_delete_workflow_access"
1924
val ALERTING_SEARCH_MONITOR_ONLY_ACCESS = "alerting_search_monitor_access"
2025
val ALERTING_EXECUTE_MONITOR_ACCESS = "alerting_execute_monitor_access"
26+
val ALERTING_EXECUTE_WORKFLOW_ACCESS = "alerting_execute_workflow_access"
2127
val ALERTING_DELETE_MONITOR_ACCESS = "alerting_delete_monitor_access"
2228
val ALERTING_GET_DESTINATION_ACCESS = "alerting_get_destination_access"
2329
val ALERTING_GET_ALERTS_ACCESS = "alerting_get_alerts_access"
30+
val ALERTING_INDEX_WORKFLOW_ACCESS = "alerting_index_workflow_access"
2431

2532
val ROLE_TO_PERMISSION_MAPPING = mapOf(
2633
ALERTING_NO_ACCESS_ROLE to "",
@@ -30,9 +37,13 @@ val ROLE_TO_PERMISSION_MAPPING = mapOf(
3037
ALERTING_SEARCH_EMAIL_GROUP_ACCESS to "cluster:admin/opendistro/alerting/destination/email_group/search",
3138
ALERTING_INDEX_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/write",
3239
ALERTING_GET_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/get",
40+
ALERTING_GET_WORKFLOW_ACCESS to AlertingActions.GET_WORKFLOW_ACTION_NAME,
3341
ALERTING_SEARCH_MONITOR_ONLY_ACCESS to "cluster:admin/opendistro/alerting/monitor/search",
3442
ALERTING_EXECUTE_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/execute",
43+
ALERTING_EXECUTE_WORKFLOW_ACCESS to ExecuteWorkflowAction.NAME,
3544
ALERTING_DELETE_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/delete",
3645
ALERTING_GET_DESTINATION_ACCESS to "cluster:admin/opendistro/alerting/destination/get",
37-
ALERTING_GET_ALERTS_ACCESS to "cluster:admin/opendistro/alerting/alerts/get"
46+
ALERTING_GET_ALERTS_ACCESS to "cluster:admin/opendistro/alerting/alerts/get",
47+
ALERTING_INDEX_WORKFLOW_ACCESS to AlertingActions.INDEX_WORKFLOW_ACTION_NAME,
48+
ALERTING_DELETE_WORKFLOW_ACCESS to AlertingActions.DELETE_WORKFLOW_ACTION_NAME
3849
)

0 commit comments

Comments
 (0)