@@ -27,6 +27,7 @@ import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
27
27
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.firstFailureOrNull
28
28
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.retry
29
29
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.suspendUntil
30
+ import com.amazon.opendistroforelasticsearch.alerting.model.ActionExecutionResult
30
31
import com.amazon.opendistroforelasticsearch.alerting.model.ActionRunResult
31
32
import com.amazon.opendistroforelasticsearch.alerting.model.Alert
32
33
import com.amazon.opendistroforelasticsearch.alerting.model.Alert.State.ACKNOWLEDGED
@@ -48,13 +49,12 @@ import com.amazon.opendistroforelasticsearch.alerting.script.TriggerExecutionCon
48
49
import com.amazon.opendistroforelasticsearch.alerting.script.TriggerScript
49
50
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
50
51
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
51
- import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.BULK_TIMEOUT
52
- import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INPUT_TIMEOUT
53
52
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
54
53
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
55
54
import org.apache.logging.log4j.LogManager
56
55
import kotlinx.coroutines.CoroutineScope
57
56
import kotlinx.coroutines.Dispatchers
57
+ import kotlinx.coroutines.Job
58
58
import kotlinx.coroutines.SupervisorJob
59
59
import kotlinx.coroutines.launch
60
60
import kotlinx.coroutines.withContext
@@ -73,6 +73,7 @@ import org.elasticsearch.client.Client
73
73
import org.elasticsearch.cluster.service.ClusterService
74
74
import org.elasticsearch.common.Strings
75
75
import org.elasticsearch.common.bytes.BytesReference
76
+ import org.elasticsearch.common.component.AbstractLifecycleComponent
76
77
import org.elasticsearch.common.settings.Settings
77
78
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
78
79
import org.elasticsearch.common.xcontent.NamedXContentRegistry
@@ -91,6 +92,7 @@ import org.elasticsearch.script.TemplateScript
91
92
import org.elasticsearch.search.builder.SearchSourceBuilder
92
93
import org.elasticsearch.threadpool.ThreadPool
93
94
import java.time.Instant
95
+ import kotlin.coroutines.CoroutineContext
94
96
95
97
class MonitorRunner (
96
98
settings : Settings ,
@@ -100,46 +102,44 @@ class MonitorRunner(
100
102
private val xContentRegistry : NamedXContentRegistry ,
101
103
private val alertIndices : AlertIndices ,
102
104
clusterService : ClusterService
103
- ) : JobRunner, CoroutineScope {
105
+ ) : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
104
106
105
107
private val logger = LogManager .getLogger(MonitorRunner ::class .java)
106
108
107
- private val job = SupervisorJob ()
108
- override val coroutineContext = Dispatchers .Default + job
109
+ private lateinit var runnerSupervisor: Job
110
+ override val coroutineContext: CoroutineContext
111
+ get() = Dispatchers .Default + runnerSupervisor
109
112
110
- @Volatile private var searchTimeout = INPUT_TIMEOUT .get(settings)
111
- @Volatile private var bulkTimeout = BULK_TIMEOUT .get(settings)
112
- @Volatile private var alertBackoffMillis = ALERT_BACKOFF_MILLIS .get(settings)
113
- @Volatile private var alertBackoffCount = ALERT_BACKOFF_COUNT .get(settings)
114
- @Volatile private var moveAlertsBackoffMillis = MOVE_ALERTS_BACKOFF_MILLIS .get(settings)
115
- @Volatile private var moveAlertsBackoffCount = MOVE_ALERTS_BACKOFF_COUNT .get(settings)
116
- @Volatile private var retryPolicy = BackoffPolicy .constantBackoff(alertBackoffMillis, alertBackoffCount)
117
- @Volatile private var moveAlertsRetryPolicy = BackoffPolicy .exponentialBackoff(moveAlertsBackoffMillis, moveAlertsBackoffCount)
113
+ @Volatile private var retryPolicy =
114
+ BackoffPolicy .constantBackoff(ALERT_BACKOFF_MILLIS .get(settings), ALERT_BACKOFF_COUNT .get(settings))
115
+ @Volatile private var moveAlertsRetryPolicy =
116
+ BackoffPolicy .exponentialBackoff(MOVE_ALERTS_BACKOFF_MILLIS .get(settings), MOVE_ALERTS_BACKOFF_COUNT .get(settings))
118
117
119
118
init {
120
- clusterService.clusterSettings.addSettingsUpdateConsumer(INPUT_TIMEOUT ) { searchTimeout = it }
121
- clusterService.clusterSettings.addSettingsUpdateConsumer(BULK_TIMEOUT ) { bulkTimeout = it }
122
- clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS ) {
123
- retryPolicy = BackoffPolicy .constantBackoff(it, alertBackoffCount)
119
+ clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS , ALERT_BACKOFF_COUNT ) {
120
+ millis, count -> retryPolicy = BackoffPolicy .constantBackoff(millis, count)
124
121
}
125
- clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_COUNT ) {
126
- retryPolicy = BackoffPolicy .constantBackoff(alertBackoffMillis, it)
127
- }
128
- clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS ) {
129
- moveAlertsRetryPolicy = BackoffPolicy .exponentialBackoff(it, moveAlertsBackoffCount)
130
- }
131
- clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_COUNT ) {
132
- moveAlertsRetryPolicy = BackoffPolicy .exponentialBackoff(alertBackoffMillis, it)
122
+ clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS , MOVE_ALERTS_BACKOFF_COUNT ) {
123
+ millis, count -> moveAlertsRetryPolicy = BackoffPolicy .exponentialBackoff(millis, count)
133
124
}
134
125
}
135
126
127
+ override fun doStart () {
128
+ runnerSupervisor = SupervisorJob ()
129
+ }
130
+
131
+ override fun doStop () {
132
+ runnerSupervisor.cancel()
133
+ }
134
+
135
+ override fun doClose () { }
136
+
136
137
override fun postIndex (job : ScheduledJob ) {
137
138
if (job !is Monitor ) {
138
139
throw IllegalArgumentException (" Invalid job type" )
139
140
}
140
141
141
- // Using Dispatchers.Unconfined as moveAlerts isn't CPU intensive so we can just run it on the calling thread.
142
- GlobalScope .launch(Dispatchers .Unconfined ) {
142
+ launch {
143
143
try {
144
144
moveAlertsRetryPolicy.retry(logger) {
145
145
if (alertIndices.isInitialized()) {
@@ -153,11 +153,7 @@ class MonitorRunner(
153
153
}
154
154
155
155
override fun postDelete (jobId : String ) {
156
- // Using Dispatchers.Unconfined as moveAlerts isn't CPU intensive so we can just run it on the calling thread.
157
- GlobalScope .launch(Dispatchers .Unconfined ) {
158
- // Using Unconfined dispatcher here as moveAlerts doesn't do much CPU intensive work, so we can just
159
- // run it on the ES built-in thread pools.
160
- launch(Dispatchers .Unconfined ) {
156
+ launch {
161
157
try {
162
158
moveAlertsRetryPolicy.retry(logger) {
163
159
if (alertIndices.isInitialized()) {
@@ -451,8 +447,9 @@ class MonitorRunner(
451
447
}
452
448
453
449
val jobSource = getResponse.sourceAsBytesRef
454
- val xcp = XContentHelper .createParser(xContentRegistry, LoggingDeprecationHandler .INSTANCE , jobSource, XContentType .JSON )
455
450
return withContext(Dispatchers .IO ) {
451
+ val xcp = XContentHelper .createParser(xContentRegistry, LoggingDeprecationHandler .INSTANCE ,
452
+ jobSource, XContentType .JSON )
456
453
ensureExpectedToken(XContentParser .Token .START_OBJECT , xcp.nextToken(), xcp::getTokenLocation)
457
454
ensureExpectedToken(XContentParser .Token .FIELD_NAME , xcp.nextToken(), xcp::getTokenLocation)
458
455
ensureExpectedToken(XContentParser .Token .START_OBJECT , xcp.nextToken(), xcp::getTokenLocation)
0 commit comments