Skip to content

Commit 6064dad

Browse files
eirseppetardz
andauthored
refactored DeleteMonitor Action to be synchronious (#628) (#630)
* refactored DeleteMonitor Action to be synchronious Signed-off-by: Petar Dzepina <[email protected]> Signed-off-by: Petar Dzepina <[email protected]> Co-authored-by: Petar Dzepina <[email protected]>
1 parent 08335fb commit 6064dad

File tree

1 file changed

+63
-102
lines changed

1 file changed

+63
-102
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt

Lines changed: 63 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55

66
package org.opensearch.alerting.transport
77

8+
import kotlinx.coroutines.CoroutineName
9+
import kotlinx.coroutines.Dispatchers
10+
import kotlinx.coroutines.GlobalScope
11+
import kotlinx.coroutines.launch
812
import org.apache.logging.log4j.LogManager
913
import org.opensearch.OpenSearchStatusException
1014
import org.opensearch.action.ActionListener
@@ -15,6 +19,7 @@ import org.opensearch.action.get.GetRequest
1519
import org.opensearch.action.get.GetResponse
1620
import org.opensearch.action.support.ActionFilters
1721
import org.opensearch.action.support.HandledTransportAction
22+
import org.opensearch.alerting.opensearchapi.suspendUntil
1823
import org.opensearch.alerting.settings.AlertingSettings
1924
import org.opensearch.alerting.util.AlertingException
2025
import org.opensearch.client.Client
@@ -39,7 +44,9 @@ import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
3944
import org.opensearch.rest.RestStatus
4045
import org.opensearch.tasks.Task
4146
import org.opensearch.transport.TransportService
42-
import java.io.IOException
47+
import kotlin.coroutines.resume
48+
import kotlin.coroutines.resumeWithException
49+
import kotlin.coroutines.suspendCoroutine
4350

4451
private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)
4552

@@ -71,7 +78,8 @@ class TransportDeleteMonitorAction @Inject constructor(
7178
if (!validateUserBackendRoles(user, actionListener)) {
7279
return
7380
}
74-
client.threadPool().threadContext.stashContext().use {
81+
82+
GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) {
7583
DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart()
7684
}
7785
}
@@ -83,120 +91,73 @@ class TransportDeleteMonitorAction @Inject constructor(
8391
private val user: User?,
8492
private val monitorId: String
8593
) {
86-
87-
fun resolveUserAndStart() {
88-
if (user == null) {
89-
// Security is disabled, so we can delete the destination without issues
90-
deleteMonitor()
91-
} else if (!doFilterForUser(user)) {
92-
// security is enabled and filterby is disabled.
93-
deleteMonitor()
94-
} else {
95-
try {
96-
start()
97-
} catch (ex: IOException) {
98-
actionListener.onFailure(AlertingException.wrap(ex))
94+
suspend fun resolveUserAndStart() {
95+
try {
96+
val monitor = getMonitor()
97+
98+
val canDelete = user == null ||
99+
!doFilterForUser(user) ||
100+
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)
101+
102+
if (canDelete) {
103+
val deleteResponse = deleteMonitor(monitor)
104+
deleteMetadata(monitor)
105+
deleteDocLevelMonitorQueries(monitor)
106+
actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version))
107+
} else {
108+
actionListener.onFailure(
109+
AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException())
110+
)
99111
}
112+
} catch (t: Exception) {
113+
actionListener.onFailure(AlertingException.wrap(t))
100114
}
101115
}
102116

103-
fun start() {
117+
private suspend fun getMonitor(): Monitor {
104118
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
105-
client.get(
106-
getRequest,
107-
object : ActionListener<GetResponse> {
108-
override fun onResponse(response: GetResponse) {
109-
if (!response.isExists) {
110-
actionListener.onFailure(
111-
AlertingException.wrap(
112-
OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
113-
)
114-
)
115-
return
116-
}
117-
val xcp = XContentHelper.createParser(
118-
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
119-
response.sourceAsBytesRef, XContentType.JSON
120-
)
121-
val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
122-
onGetResponse(monitor)
123-
}
124-
override fun onFailure(t: Exception) {
125-
actionListener.onFailure(AlertingException.wrap(t))
126-
}
127-
}
128-
)
129-
}
130119

131-
private fun onGetResponse(monitor: Monitor) {
132-
if (!checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)) {
133-
return
134-
} else {
135-
deleteMonitor()
120+
val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
121+
if (getResponse.isExists == false) {
122+
actionListener.onFailure(
123+
AlertingException.wrap(
124+
OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
125+
)
126+
)
136127
}
137-
}
138-
139-
private fun deleteMonitor() {
140-
client.delete(
141-
deleteRequest,
142-
object : ActionListener<DeleteResponse> {
143-
override fun onResponse(response: DeleteResponse) {
144-
val clusterState = clusterService.state()
145-
if (clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) {
146-
deleteDocLevelMonitorQueries()
147-
}
148-
deleteMetadata()
149-
150-
actionListener.onResponse(DeleteMonitorResponse(response.id, response.version))
151-
}
152-
153-
override fun onFailure(t: Exception) {
154-
actionListener.onFailure(AlertingException.wrap(t))
155-
}
156-
}
128+
val xcp = XContentHelper.createParser(
129+
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
130+
getResponse.sourceAsBytesRef, XContentType.JSON
157131
)
132+
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
158133
}
159134

160-
private fun deleteMetadata() {
161-
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
162-
client.get(
163-
getRequest,
164-
object : ActionListener<GetResponse> {
165-
override fun onResponse(response: GetResponse) {
166-
if (response.isExists) {
167-
val deleteMetadataRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "$monitorId")
168-
.setRefreshPolicy(deleteRequest.refreshPolicy)
169-
client.delete(
170-
deleteMetadataRequest,
171-
object : ActionListener<DeleteResponse> {
172-
override fun onResponse(response: DeleteResponse) {
173-
}
174-
175-
override fun onFailure(t: Exception) {
176-
}
177-
}
178-
)
179-
}
180-
}
181-
override fun onFailure(t: Exception) {
182-
}
183-
}
184-
)
135+
private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse {
136+
return client.suspendUntil { delete(deleteRequest, it) }
185137
}
186138

187-
private fun deleteDocLevelMonitorQueries() {
188-
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
189-
.source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
190-
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
191-
.execute(
192-
object : ActionListener<BulkByScrollResponse> {
193-
override fun onResponse(response: BulkByScrollResponse) {
194-
}
139+
private suspend fun deleteMetadata(monitor: Monitor) {
140+
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata")
141+
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
142+
}
195143

196-
override fun onFailure(t: Exception) {
144+
private suspend fun deleteDocLevelMonitorQueries(monitor: Monitor) {
145+
val clusterState = clusterService.state()
146+
if (!clusterState.routingTable.hasIndex(monitor.dataSources.queryIndex)) {
147+
return
148+
}
149+
val response: BulkByScrollResponse = suspendCoroutine { cont ->
150+
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
151+
.source(monitor.dataSources.queryIndex)
152+
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
153+
.refresh(true)
154+
.execute(
155+
object : ActionListener<BulkByScrollResponse> {
156+
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
157+
override fun onFailure(t: Exception) = cont.resumeWithException(t)
197158
}
198-
}
199-
)
159+
)
160+
}
200161
}
201162
}
202163
}

0 commit comments

Comments
 (0)