Skip to content

Commit 7ad2209

Browse files
add distributed locking to jobs in alerting (#1403) (#1458)
Signed-off-by: Subhobrata Dey <[email protected]>
1 parent cb890f0 commit 7ad2209

File tree

10 files changed

+638
-12
lines changed

10 files changed

+638
-12
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

+5-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import org.opensearch.alerting.core.JobSweeper
1919
import org.opensearch.alerting.core.ScheduledJobIndices
2020
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
2121
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
22+
import org.opensearch.alerting.core.lock.LockService
2223
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
2324
import org.opensearch.alerting.core.schedule.JobScheduler
2425
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
@@ -260,6 +261,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
260261
): Collection<Any> {
261262
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
262263
val settings = environment.settings()
264+
val lockService = LockService(client, clusterService)
263265
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
264266
runner = MonitorRunnerService
265267
.registerClusterService(clusterService)
@@ -276,6 +278,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
276278
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
277279
.registerJvmStats(JvmStats.jvmStats())
278280
.registerWorkflowService(WorkflowService(client, xContentRegistry))
281+
.registerLockService(lockService)
279282
.registerConsumers()
280283
.registerDestinationSettings()
281284
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
@@ -300,9 +303,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
300303
settings
301304
)
302305

303-
DeleteMonitorService.initialize(client)
306+
DeleteMonitorService.initialize(client, lockService)
304307

305-
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
308+
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator, lockService)
306309
}
307310

308311
override fun getSettings(): List<Setting<*>> {

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.opensearch.action.bulk.BackoffPolicy
99
import org.opensearch.alerting.alerts.AlertIndices
10+
import org.opensearch.alerting.core.lock.LockService
1011
import org.opensearch.alerting.model.destination.DestinationContextFactory
1112
import org.opensearch.alerting.settings.AlertingSettings
1213
import org.opensearch.alerting.settings.DestinationSettings
@@ -57,4 +58,5 @@ data class MonitorRunnerExecutionContext(
5758
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
5859
@Volatile var docLevelMonitorShardFetchSize: Int =
5960
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
61+
@Volatile var lockService: LockService? = null
6062
)

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

+38-10
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ import org.opensearch.alerting.alerts.AlertIndices
1717
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
1818
import org.opensearch.alerting.core.JobRunner
1919
import org.opensearch.alerting.core.ScheduledJobIndices
20+
import org.opensearch.alerting.core.lock.LockModel
21+
import org.opensearch.alerting.core.lock.LockService
2022
import org.opensearch.alerting.model.MonitorRunResult
2123
import org.opensearch.alerting.model.WorkflowRunResult
2224
import org.opensearch.alerting.model.destination.DestinationContextFactory
2325
import org.opensearch.alerting.opensearchapi.retry
26+
import org.opensearch.alerting.opensearchapi.suspendUntil
2427
import org.opensearch.alerting.script.TriggerExecutionContext
2528
import org.opensearch.alerting.settings.AlertingSettings
2629
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
@@ -221,6 +224,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
221224
return this
222225
}
223226

227+
fun registerLockService(lockService: LockService): MonitorRunnerService {
228+
monitorCtx.lockService = lockService
229+
return this
230+
}
231+
224232
// Updates destination settings when the reload API is called so that new keystore values are visible
225233
fun reloadDestinationSettings(settings: Settings) {
226234
monitorCtx.destinationSettings = loadDestinationSettings(settings)
@@ -292,20 +300,40 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
292300
when (job) {
293301
is Workflow -> {
294302
launch {
295-
logger.debug(
296-
"PERF_DEBUG: executing workflow ${job.id} on node " +
297-
monitorCtx.clusterService!!.state().nodes().localNode.id
298-
)
299-
runJob(job, periodStart, periodEnd, false)
303+
var lock: LockModel? = null
304+
try {
305+
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
306+
monitorCtx.lockService!!.acquireLock(job, it)
307+
} ?: return@launch
308+
logger.debug("lock ${lock!!.lockId} acquired")
309+
logger.debug(
310+
"PERF_DEBUG: executing workflow ${job.id} on node " +
311+
monitorCtx.clusterService!!.state().nodes().localNode.id
312+
)
313+
runJob(job, periodStart, periodEnd, false)
314+
} finally {
315+
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
316+
logger.debug("lock ${lock!!.lockId} released")
317+
}
300318
}
301319
}
302320
is Monitor -> {
303321
launch {
304-
logger.debug(
305-
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
306-
monitorCtx.clusterService!!.state().nodes().localNode.id
307-
)
308-
runJob(job, periodStart, periodEnd, false)
322+
var lock: LockModel? = null
323+
try {
324+
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
325+
monitorCtx.lockService!!.acquireLock(job, it)
326+
} ?: return@launch
327+
logger.debug("lock ${lock!!.lockId} acquired")
328+
logger.debug(
329+
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
330+
monitorCtx.clusterService!!.state().nodes().localNode.id
331+
)
332+
runJob(job, periodStart, periodEnd, false)
333+
} finally {
334+
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
335+
logger.debug("lock ${lock!!.lockId} released")
336+
}
309337
}
310338
}
311339
else -> {

alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt

+10
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import org.opensearch.action.support.IndicesOptions
2222
import org.opensearch.action.support.WriteRequest.RefreshPolicy
2323
import org.opensearch.action.support.master.AcknowledgedResponse
2424
import org.opensearch.alerting.MonitorMetadataService
25+
import org.opensearch.alerting.core.lock.LockModel
26+
import org.opensearch.alerting.core.lock.LockService
2527
import org.opensearch.alerting.opensearchapi.suspendUntil
2628
import org.opensearch.alerting.util.AlertingException
2729
import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH
@@ -48,11 +50,14 @@ object DeleteMonitorService :
4850
private val log = LogManager.getLogger(this.javaClass)
4951

5052
private lateinit var client: Client
53+
private lateinit var lockService: LockService
5154

5255
fun initialize(
5356
client: Client,
57+
lockService: LockService
5458
) {
5559
DeleteMonitorService.client = client
60+
DeleteMonitorService.lockService = lockService
5661
}
5762

5863
/**
@@ -64,6 +69,7 @@ object DeleteMonitorService :
6469
val deleteResponse = deleteMonitor(monitor.id, refreshPolicy)
6570
deleteDocLevelMonitorQueriesAndIndices(monitor)
6671
deleteMetadata(monitor)
72+
deleteLock(monitor)
6773
return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)
6874
}
6975

@@ -147,6 +153,10 @@ object DeleteMonitorService :
147153
}
148154
}
149155

156+
private suspend fun deleteLock(monitor: Monitor) {
157+
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) }
158+
}
159+
150160
/**
151161
* Checks if the monitor is part of the workflow
152162
*

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt

+9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import org.opensearch.action.search.SearchResponse
2323
import org.opensearch.action.support.ActionFilters
2424
import org.opensearch.action.support.HandledTransportAction
2525
import org.opensearch.action.support.WriteRequest.RefreshPolicy
26+
import org.opensearch.alerting.core.lock.LockModel
27+
import org.opensearch.alerting.core.lock.LockService
2628
import org.opensearch.alerting.model.MonitorMetadata
2729
import org.opensearch.alerting.model.WorkflowMetadata
2830
import org.opensearch.alerting.opensearchapi.addFilter
@@ -73,6 +75,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
7375
val clusterService: ClusterService,
7476
val settings: Settings,
7577
val xContentRegistry: NamedXContentRegistry,
78+
val lockService: LockService
7679
) : HandledTransportAction<ActionRequest, DeleteWorkflowResponse>(
7780
AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest
7881
),
@@ -180,6 +183,12 @@ class TransportDeleteWorkflowAction @Inject constructor(
180183
} catch (t: Exception) {
181184
log.error("Failed to delete delegate monitor metadata. But proceeding with workflow deletion $workflowId", t)
182185
}
186+
try {
187+
// Delete the workflow lock
188+
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(workflowId), it) }
189+
} catch (t: Exception) {
190+
log.error("Failed to delete workflow lock for $workflowId")
191+
}
183192
actionListener.onResponse(deleteWorkflowResponse)
184193
} else {
185194
actionListener.onFailure(

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

+85
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity
1010
import org.opensearch.action.search.SearchResponse
1111
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
1212
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
13+
import org.opensearch.alerting.core.lock.LockService
1314
import org.opensearch.alerting.settings.AlertingSettings
1415
import org.opensearch.client.Response
1516
import org.opensearch.client.ResponseException
@@ -18,16 +19,20 @@ import org.opensearch.commons.alerting.model.Alert
1819
import org.opensearch.commons.alerting.model.DataSources
1920
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
2021
import org.opensearch.commons.alerting.model.DocLevelQuery
22+
import org.opensearch.commons.alerting.model.IntervalSchedule
2123
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
2224
import org.opensearch.commons.alerting.model.action.AlertCategory
2325
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
2426
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
2527
import org.opensearch.core.rest.RestStatus
2628
import org.opensearch.script.Script
29+
import org.opensearch.test.OpenSearchTestCase
2730
import java.time.ZonedDateTime
2831
import java.time.format.DateTimeFormatter
32+
import java.time.temporal.ChronoUnit
2933
import java.time.temporal.ChronoUnit.MILLIS
3034
import java.util.Locale
35+
import java.util.concurrent.TimeUnit
3136

3237
class DocumentMonitorRunnerIT : AlertingRestTestCase() {
3338

@@ -470,6 +475,86 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
470475
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1"))
471476
}
472477

478+
fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() {
479+
val testIndex = createTestIndex()
480+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
481+
val testDoc = """{
482+
"message" : "This is an error from IAD region",
483+
"test_strict_date_time" : "$testTime",
484+
"test_field" : "us-west-2"
485+
}"""
486+
487+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
488+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
489+
490+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
491+
val monitor = createMonitor(
492+
randomDocumentLevelMonitor(
493+
name = "__lag-monitor-test__",
494+
inputs = listOf(docLevelInput),
495+
triggers = listOf(trigger),
496+
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
497+
)
498+
)
499+
assertNotNull(monitor.id)
500+
501+
indexDoc(testIndex, "1", testDoc)
502+
indexDoc(testIndex, "5", testDoc)
503+
Thread.sleep(240000)
504+
505+
val inputMap = HashMap<String, Any>()
506+
inputMap["searchString"] = monitor.name
507+
508+
val responseMap = getAlerts(inputMap).asMap()
509+
val alerts = (responseMap["alerts"] as ArrayList<Map<String, Any>>)
510+
alerts.forEach {
511+
assertTrue(it["error_message"] == null)
512+
}
513+
}
514+
515+
@AwaitsFix(bugUrl = "")
516+
fun `test monitor run generate lock and monitor delete removes lock`() {
517+
val testIndex = createTestIndex()
518+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
519+
val testDoc = """{
520+
"message" : "This is an error from IAD region",
521+
"test_strict_date_time" : "$testTime",
522+
"test_field" : "us-west-2"
523+
}"""
524+
525+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
526+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
527+
528+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
529+
val monitor = createMonitor(
530+
randomDocumentLevelMonitor(
531+
inputs = listOf(docLevelInput),
532+
triggers = listOf(trigger),
533+
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
534+
)
535+
)
536+
assertNotNull(monitor.id)
537+
538+
indexDoc(testIndex, "1", testDoc)
539+
indexDoc(testIndex, "5", testDoc)
540+
OpenSearchTestCase.waitUntil({
541+
val response = client().makeRequest("HEAD", LockService.LOCK_INDEX_NAME)
542+
return@waitUntil (response.restStatus().status == 200)
543+
}, 240, TimeUnit.SECONDS)
544+
545+
var response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
546+
var responseMap = entityAsMap(response)
547+
var noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
548+
assertEquals(1, noOfLocks)
549+
550+
deleteMonitor(monitor)
551+
refreshIndex(LockService.LOCK_INDEX_NAME)
552+
response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
553+
responseMap = entityAsMap(response)
554+
noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
555+
assertEquals(0, noOfLocks)
556+
}
557+
473558
fun `test execute monitor with tag as trigger condition generates alerts and findings`() {
474559
val testIndex = createTestIndex()
475560
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))

alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt

+43
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
3737
import org.opensearch.search.builder.SearchSourceBuilder
3838
import org.opensearch.test.junit.annotations.TestLogging
3939
import java.time.Instant
40+
import java.time.ZonedDateTime
41+
import java.time.format.DateTimeFormatter
4042
import java.time.temporal.ChronoUnit
4143
import java.util.Collections
4244
import java.util.Locale
@@ -1185,4 +1187,45 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
11851187
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
11861188
assertEquals("Findings saved for test monitor", 1, findings.size)
11871189
}
1190+
1191+
fun `test workflow run generates no error alerts with versionconflictengineexception with locks`() {
1192+
val testIndex = createTestIndex()
1193+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
1194+
val testDoc = """{
1195+
"message" : "This is an error from IAD region",
1196+
"test_strict_date_time" : "$testTime",
1197+
"test_field" : "us-west-2"
1198+
}"""
1199+
1200+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
1201+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
1202+
1203+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
1204+
val monitor = createMonitor(
1205+
randomDocumentLevelMonitor(
1206+
name = "__lag-monitor-test__",
1207+
inputs = listOf(docLevelInput),
1208+
triggers = listOf(trigger),
1209+
enabled = false,
1210+
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
1211+
)
1212+
)
1213+
assertNotNull(monitor.id)
1214+
createWorkflow(
1215+
randomWorkflow(
1216+
monitorIds = listOf(monitor.id),
1217+
enabled = true,
1218+
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
1219+
)
1220+
)
1221+
1222+
indexDoc(testIndex, "1", testDoc)
1223+
indexDoc(testIndex, "5", testDoc)
1224+
Thread.sleep(240000)
1225+
1226+
val alerts = searchAlerts(monitor)
1227+
alerts.forEach {
1228+
assertTrue(it.errorMessage == null)
1229+
}
1230+
}
11881231
}

0 commit comments

Comments
 (0)