15
15
16
16
package com.amazon.opendistroforelasticsearch.alerting.alerts
17
17
18
- import com.amazon.opendistroforelasticsearch.alerting.MonitorRunner
19
18
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX
20
19
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices.Companion.HISTORY_WRITE_INDEX
21
20
import com.amazon.opendistroforelasticsearch.alerting.model.Alert
22
21
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
23
- import org.apache.logging.log4j.Logger
24
- import org.elasticsearch.action.ActionListener
22
+ import com.amazon.opendistroforelasticsearch.alerting.elasticapi.suspendUntil
25
23
import org.elasticsearch.action.bulk.BulkRequest
26
24
import org.elasticsearch.action.bulk.BulkResponse
27
25
import org.elasticsearch.action.delete.DeleteRequest
@@ -30,7 +28,6 @@ import org.elasticsearch.action.search.SearchRequest
30
28
import org.elasticsearch.action.search.SearchResponse
31
29
import org.elasticsearch.client.Client
32
30
import org.elasticsearch.common.bytes.BytesReference
33
- import org.elasticsearch.common.unit.TimeValue
34
31
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
35
32
import org.elasticsearch.common.xcontent.NamedXContentRegistry
36
33
import org.elasticsearch.common.xcontent.ToXContent
@@ -39,12 +36,13 @@ import org.elasticsearch.common.xcontent.XContentHelper
39
36
import org.elasticsearch.common.xcontent.XContentParser
40
37
import org.elasticsearch.common.xcontent.XContentParserUtils
41
38
import org.elasticsearch.common.xcontent.XContentType
39
+ import org.elasticsearch.index.VersionType
42
40
import org.elasticsearch.index.query.QueryBuilders
41
+ import org.elasticsearch.rest.RestStatus
43
42
import org.elasticsearch.search.builder.SearchSourceBuilder
44
- import org.elasticsearch.threadpool.ThreadPool
45
43
46
44
/* *
47
- * Class to manage the moving of active alerts when a monitor or trigger is deleted.
45
+ * Moves defunct active alerts to the alert history index when the corresponding monitor or trigger is deleted.
48
46
*
49
47
* The logic for moving alerts consists of:
50
48
* 1. Find active alerts:
@@ -54,114 +52,65 @@ import org.elasticsearch.threadpool.ThreadPool
54
52
* 3. Delete alerts from [ALERT_INDEX]
55
53
* 4. Schedule a retry if there were any failures
56
54
*/
57
- class AlertMover (
58
- private val client : Client ,
59
- private val threadPool : ThreadPool ,
60
- private val monitorRunner : MonitorRunner ,
61
- private val alertIndices : AlertIndices ,
62
- private val backoff : Iterator <TimeValue >,
63
- private val logger : Logger ,
64
- private val monitorId : String ,
65
- private val monitor : Monitor ? = null
66
- ) {
55
+ suspend fun moveAlerts (client : Client , monitorId : String , monitor : Monitor ? = null) {
56
+ val boolQuery = QueryBuilders .boolQuery()
57
+ .filter(QueryBuilders .termQuery(Alert .MONITOR_ID_FIELD , monitorId))
67
58
68
- private var hasFailures: Boolean = false
69
-
70
- fun run () {
71
- if (alertIndices.isInitialized()) {
72
- findActiveAlerts()
73
- }
74
- }
75
-
76
- private fun findActiveAlerts () {
77
- val boolQuery = QueryBuilders .boolQuery()
78
- .filter(QueryBuilders .termQuery(Alert .MONITOR_ID_FIELD , monitorId))
79
-
80
- if (monitor != null ) {
81
- boolQuery.mustNot(QueryBuilders .termsQuery(Alert .TRIGGER_ID_FIELD , monitor.triggers.map { it.id }))
82
- }
83
-
84
- val activeAlertsQuery = SearchSourceBuilder .searchSource()
85
- .query(boolQuery)
86
- .version(true )
87
-
88
- val activeAlertsRequest = SearchRequest (AlertIndices .ALERT_INDEX )
89
- .routing(monitorId)
90
- .source(activeAlertsQuery)
91
- client.search(activeAlertsRequest, ActionListener .wrap(::onSearchResponse, ::onFailure))
92
- }
93
-
94
- private fun onSearchResponse (response : SearchResponse ) {
95
- // If no alerts are found, simply return
96
- if (response.hits.totalHits.value == 0L ) return
97
- val indexRequests = response.hits.map { hit ->
98
- IndexRequest (AlertIndices .HISTORY_WRITE_INDEX )
99
- .routing(monitorId)
100
- .source(Alert .parse(alertContentParser(hit.sourceRef), hit.id, hit.version)
101
- .copy(state = Alert .State .DELETED )
102
- .toXContent(XContentFactory .jsonBuilder(), ToXContent .EMPTY_PARAMS ))
103
- .setIfSeqNo(hit.seqNo)
104
- .setIfPrimaryTerm(hit.primaryTerm)
105
- .id(hit.id)
106
- }
107
- val copyRequest = BulkRequest ().add(indexRequests)
108
- client.bulk(copyRequest, ActionListener .wrap(::onCopyResponse, ::onFailure))
59
+ if (monitor != null ) {
60
+ boolQuery.mustNot(QueryBuilders .termsQuery(Alert .TRIGGER_ID_FIELD , monitor.triggers.map { it.id }))
109
61
}
110
62
111
- private fun onCopyResponse (response : BulkResponse ) {
112
- val deleteRequests = response.items.filterNot { it.isFailed }.map {
113
- DeleteRequest (AlertIndices .ALERT_INDEX , it.id)
114
- .routing(monitorId)
115
- }
116
- if (response.hasFailures()) {
117
- hasFailures = true
118
- for (it in response.items) {
119
- logger.error(" Failed to move deleted alert to alert history index: ${it.id} " ,
120
- it.failure.cause)
121
- }
122
- }
123
-
124
- val bulkRequest = BulkRequest ().add(deleteRequests)
125
- client.bulk(bulkRequest, ActionListener .wrap(::onDeleteResponse, ::onFailure))
63
+ val activeAlertsQuery = SearchSourceBuilder .searchSource()
64
+ .query(boolQuery)
65
+ .version(true )
66
+
67
+ val activeAlertsRequest = SearchRequest (AlertIndices .ALERT_INDEX )
68
+ .routing(monitorId)
69
+ .source(activeAlertsQuery)
70
+ val response: SearchResponse = client.suspendUntil { search(activeAlertsRequest, it) }
71
+
72
+ // If no alerts are found, simply return
73
+ if (response.hits.totalHits.value == 0L ) return
74
+ val indexRequests = response.hits.map { hit ->
75
+ IndexRequest (AlertIndices .HISTORY_WRITE_INDEX )
76
+ .routing(monitorId)
77
+ .source(Alert .parse(alertContentParser(hit.sourceRef), hit.id, hit.version)
78
+ .copy(state = Alert .State .DELETED )
79
+ .toXContent(XContentFactory .jsonBuilder(), ToXContent .EMPTY_PARAMS ))
80
+ .version(hit.version)
81
+ .versionType(VersionType .EXTERNAL_GTE )
82
+ .id(hit.id)
126
83
}
127
-
128
- private fun onDeleteResponse (response : BulkResponse ) {
129
- if (response.hasFailures()) {
130
- hasFailures = true
131
- for (it in response.items) {
132
- logger.error(" Failed to delete active alert from alert index: ${it.id} " ,
133
- it.failure.cause)
134
- }
135
- }
136
- if (hasFailures) reschedule()
84
+ val copyRequest = BulkRequest ().add(indexRequests)
85
+ val copyResponse: BulkResponse = client.suspendUntil { bulk(copyRequest, it) }
86
+
87
+ val deleteRequests = copyResponse.items.filterNot { it.isFailed }.map {
88
+ DeleteRequest (AlertIndices .ALERT_INDEX , it.id)
89
+ .routing(monitorId)
90
+ .version(it.version)
91
+ .versionType(VersionType .EXTERNAL_GTE )
137
92
}
138
-
139
- private fun onFailure (e : Exception ) {
140
- logger.error(" Failed to move alerts for ${monitorIdTriggerIdsTuple()} " , e)
141
- reschedule()
93
+ val deleteResponse: BulkResponse = client.suspendUntil { bulk(BulkRequest ().add(deleteRequests), it) }
94
+
95
+ if (copyResponse.hasFailures()) {
96
+ val retryCause = copyResponse.items.filter { it.isFailed }
97
+ .firstOrNull { it.status() == RestStatus .TOO_MANY_REQUESTS }
98
+ ?.failure?.cause
99
+ throw RuntimeException (" Failed to copy alerts for [$monitorId , ${monitor?.triggers?.map { it.id }} ]: " +
100
+ copyResponse.buildFailureMessage(), retryCause)
142
101
}
143
-
144
- private fun reschedule () {
145
- if (backoff.hasNext()) {
146
- logger.warn(" Rescheduling AlertMover due to failure for ${monitorIdTriggerIdsTuple()} " )
147
- val wait = backoff.next()
148
- val runnable = Runnable {
149
- monitorRunner.rescheduleAlertMover(monitorId, monitor, backoff)
150
- }
151
- threadPool.schedule(runnable, wait, ThreadPool .Names .SAME )
152
- } else {
153
- logger.warn(" Retries exhausted for ${monitorIdTriggerIdsTuple()} " )
154
- }
102
+ if (deleteResponse.hasFailures()) {
103
+ val retryCause = deleteResponse.items.filter { it.isFailed }
104
+ .firstOrNull { it.status() == RestStatus .TOO_MANY_REQUESTS }
105
+ ?.failure?.cause
106
+ throw RuntimeException (" Failed to delete alerts for [$monitorId , ${monitor?.triggers?.map { it.id }} ]: " +
107
+ deleteResponse.buildFailureMessage(), retryCause)
155
108
}
109
+ }
156
110
157
- private fun alertContentParser (bytesReference : BytesReference ): XContentParser {
158
- val xcp = XContentHelper .createParser(NamedXContentRegistry .EMPTY , LoggingDeprecationHandler .INSTANCE ,
111
+ private fun alertContentParser (bytesReference : BytesReference ): XContentParser {
112
+ val xcp = XContentHelper .createParser(NamedXContentRegistry .EMPTY , LoggingDeprecationHandler .INSTANCE ,
159
113
bytesReference, XContentType .JSON )
160
- XContentParserUtils .ensureExpectedToken(XContentParser .Token .START_OBJECT , xcp.nextToken(), xcp::getTokenLocation)
161
- return xcp
162
- }
163
-
164
- private fun monitorIdTriggerIdsTuple (): String {
165
- return " [$monitorId , ${monitor?.triggers?.map { it.id }} ]"
166
- }
114
+ XContentParserUtils .ensureExpectedToken(XContentParser .Token .START_OBJECT , xcp.nextToken(), xcp::getTokenLocation)
115
+ return xcp
167
116
}
0 commit comments