Skip to content

Commit b54b5e7

Browse files
committed
add distributed locking to jobs in alerting
Signed-off-by: Subhobrata Dey <[email protected]>
1 parent afa4f5d commit b54b5e7

File tree

10 files changed

+638
-12
lines changed

10 files changed

+638
-12
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

+5-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import org.opensearch.alerting.core.JobSweeper
1919
import org.opensearch.alerting.core.ScheduledJobIndices
2020
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
2121
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
22+
import org.opensearch.alerting.core.lock.LockService
2223
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
2324
import org.opensearch.alerting.core.schedule.JobScheduler
2425
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
@@ -251,6 +252,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
251252
): Collection<Any> {
252253
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
253254
val settings = environment.settings()
255+
val lockService = LockService(client, clusterService)
254256
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
255257
runner = MonitorRunnerService
256258
.registerClusterService(clusterService)
@@ -267,6 +269,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
267269
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
268270
.registerJvmStats(JvmStats.jvmStats())
269271
.registerWorkflowService(WorkflowService(client, xContentRegistry))
272+
.registerLockService(lockService)
270273
.registerConsumers()
271274
.registerDestinationSettings()
272275
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
@@ -291,9 +294,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
291294
settings
292295
)
293296

294-
DeleteMonitorService.initialize(client)
297+
DeleteMonitorService.initialize(client, lockService)
295298

296-
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
299+
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator, lockService)
297300
}
298301

299302
override fun getSettings(): List<Setting<*>> {

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.opensearch.action.bulk.BackoffPolicy
99
import org.opensearch.alerting.alerts.AlertIndices
10+
import org.opensearch.alerting.core.lock.LockService
1011
import org.opensearch.alerting.model.destination.DestinationContextFactory
1112
import org.opensearch.alerting.settings.AlertingSettings
1213
import org.opensearch.alerting.settings.DestinationSettings
@@ -57,4 +58,5 @@ data class MonitorRunnerExecutionContext(
5758
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
5859
@Volatile var docLevelMonitorShardFetchSize: Int =
5960
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
61+
@Volatile var lockService: LockService? = null
6062
)

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

+38-10
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ import org.opensearch.alerting.alerts.AlertIndices
1717
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
1818
import org.opensearch.alerting.core.JobRunner
1919
import org.opensearch.alerting.core.ScheduledJobIndices
20+
import org.opensearch.alerting.core.lock.LockModel
21+
import org.opensearch.alerting.core.lock.LockService
2022
import org.opensearch.alerting.model.MonitorRunResult
2123
import org.opensearch.alerting.model.WorkflowRunResult
2224
import org.opensearch.alerting.model.destination.DestinationContextFactory
2325
import org.opensearch.alerting.opensearchapi.retry
26+
import org.opensearch.alerting.opensearchapi.suspendUntil
2427
import org.opensearch.alerting.script.TriggerExecutionContext
2528
import org.opensearch.alerting.settings.AlertingSettings
2629
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
@@ -221,6 +224,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
221224
return this
222225
}
223226

227+
fun registerLockService(lockService: LockService): MonitorRunnerService {
228+
monitorCtx.lockService = lockService
229+
return this
230+
}
231+
224232
// Updates destination settings when the reload API is called so that new keystore values are visible
225233
fun reloadDestinationSettings(settings: Settings) {
226234
monitorCtx.destinationSettings = loadDestinationSettings(settings)
@@ -292,20 +300,40 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
292300
when (job) {
293301
is Workflow -> {
294302
launch {
295-
logger.debug(
296-
"PERF_DEBUG: executing workflow ${job.id} on node " +
297-
monitorCtx.clusterService!!.state().nodes().localNode.id
298-
)
299-
runJob(job, periodStart, periodEnd, false)
303+
var lock: LockModel? = null
304+
try {
305+
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
306+
monitorCtx.lockService!!.acquireLock(job, it)
307+
} ?: return@launch
308+
logger.debug("lock ${lock!!.lockId} acquired")
309+
logger.debug(
310+
"PERF_DEBUG: executing workflow ${job.id} on node " +
311+
monitorCtx.clusterService!!.state().nodes().localNode.id
312+
)
313+
runJob(job, periodStart, periodEnd, false)
314+
} finally {
315+
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
316+
logger.debug("lock ${lock!!.lockId} released")
317+
}
300318
}
301319
}
302320
is Monitor -> {
303321
launch {
304-
logger.debug(
305-
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
306-
monitorCtx.clusterService!!.state().nodes().localNode.id
307-
)
308-
runJob(job, periodStart, periodEnd, false)
322+
var lock: LockModel? = null
323+
try {
324+
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
325+
monitorCtx.lockService!!.acquireLock(job, it)
326+
} ?: return@launch
327+
logger.debug("lock ${lock!!.lockId} acquired")
328+
logger.debug(
329+
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
330+
monitorCtx.clusterService!!.state().nodes().localNode.id
331+
)
332+
runJob(job, periodStart, periodEnd, false)
333+
} finally {
334+
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
335+
logger.debug("lock ${lock!!.lockId} released")
336+
}
309337
}
310338
}
311339
else -> {

alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt

+10
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import org.opensearch.action.support.IndicesOptions
2222
import org.opensearch.action.support.WriteRequest.RefreshPolicy
2323
import org.opensearch.action.support.master.AcknowledgedResponse
2424
import org.opensearch.alerting.MonitorMetadataService
25+
import org.opensearch.alerting.core.lock.LockModel
26+
import org.opensearch.alerting.core.lock.LockService
2527
import org.opensearch.alerting.opensearchapi.suspendUntil
2628
import org.opensearch.alerting.util.AlertingException
2729
import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH
@@ -49,11 +51,14 @@ object DeleteMonitorService :
4951
private val log = LogManager.getLogger(this.javaClass)
5052

5153
private lateinit var client: Client
54+
private lateinit var lockService: LockService
5255

5356
fun initialize(
5457
client: Client,
58+
lockService: LockService
5559
) {
5660
DeleteMonitorService.client = client
61+
DeleteMonitorService.lockService = lockService
5762
}
5863

5964
/**
@@ -65,6 +70,7 @@ object DeleteMonitorService :
6570
val deleteResponse = deleteMonitor(monitor.id, refreshPolicy)
6671
deleteDocLevelMonitorQueriesAndIndices(monitor)
6772
deleteMetadata(monitor)
73+
deleteLock(monitor)
6874
return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)
6975
}
7076

@@ -148,6 +154,10 @@ object DeleteMonitorService :
148154
}
149155
}
150156

157+
private suspend fun deleteLock(monitor: Monitor) {
158+
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) }
159+
}
160+
151161
/**
152162
* Checks if the monitor is part of the workflow
153163
*

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt

+9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import org.opensearch.action.search.SearchResponse
2323
import org.opensearch.action.support.ActionFilters
2424
import org.opensearch.action.support.HandledTransportAction
2525
import org.opensearch.action.support.WriteRequest.RefreshPolicy
26+
import org.opensearch.alerting.core.lock.LockModel
27+
import org.opensearch.alerting.core.lock.LockService
2628
import org.opensearch.alerting.model.MonitorMetadata
2729
import org.opensearch.alerting.model.WorkflowMetadata
2830
import org.opensearch.alerting.opensearchapi.addFilter
@@ -73,6 +75,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
7375
val clusterService: ClusterService,
7476
val settings: Settings,
7577
val xContentRegistry: NamedXContentRegistry,
78+
val lockService: LockService
7679
) : HandledTransportAction<ActionRequest, DeleteWorkflowResponse>(
7780
AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest
7881
),
@@ -180,6 +183,12 @@ class TransportDeleteWorkflowAction @Inject constructor(
180183
} catch (t: Exception) {
181184
log.error("Failed to delete delegate monitor metadata. But proceeding with workflow deletion $workflowId", t)
182185
}
186+
try {
187+
// Delete the workflow lock
188+
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(workflowId), it) }
189+
} catch (t: Exception) {
190+
log.error("Failed to delete workflow lock for $workflowId")
191+
}
183192
actionListener.onResponse(deleteWorkflowResponse)
184193
} else {
185194
actionListener.onFailure(

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

+85
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.apache.hc.core5.http.io.entity.StringEntity
1010
import org.opensearch.action.search.SearchResponse
1111
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
1212
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
13+
import org.opensearch.alerting.core.lock.LockService
1314
import org.opensearch.alerting.settings.AlertingSettings
1415
import org.opensearch.client.Response
1516
import org.opensearch.client.ResponseException
@@ -18,16 +19,20 @@ import org.opensearch.commons.alerting.model.Alert
1819
import org.opensearch.commons.alerting.model.DataSources
1920
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
2021
import org.opensearch.commons.alerting.model.DocLevelQuery
22+
import org.opensearch.commons.alerting.model.IntervalSchedule
2123
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
2224
import org.opensearch.commons.alerting.model.action.AlertCategory
2325
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
2426
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
2527
import org.opensearch.core.rest.RestStatus
2628
import org.opensearch.script.Script
29+
import org.opensearch.test.OpenSearchTestCase
2730
import java.time.ZonedDateTime
2831
import java.time.format.DateTimeFormatter
32+
import java.time.temporal.ChronoUnit
2933
import java.time.temporal.ChronoUnit.MILLIS
3034
import java.util.Locale
35+
import java.util.concurrent.TimeUnit
3136

3237
class DocumentMonitorRunnerIT : AlertingRestTestCase() {
3338

@@ -470,6 +475,86 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
470475
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1"))
471476
}
472477

478+
fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() {
479+
val testIndex = createTestIndex()
480+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
481+
val testDoc = """{
482+
"message" : "This is an error from IAD region",
483+
"test_strict_date_time" : "$testTime",
484+
"test_field" : "us-west-2"
485+
}"""
486+
487+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
488+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
489+
490+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
491+
val monitor = createMonitor(
492+
randomDocumentLevelMonitor(
493+
name = "__lag-monitor-test__",
494+
inputs = listOf(docLevelInput),
495+
triggers = listOf(trigger),
496+
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
497+
)
498+
)
499+
assertNotNull(monitor.id)
500+
501+
indexDoc(testIndex, "1", testDoc)
502+
indexDoc(testIndex, "5", testDoc)
503+
Thread.sleep(240000)
504+
505+
val inputMap = HashMap<String, Any>()
506+
inputMap["searchString"] = monitor.name
507+
508+
val responseMap = getAlerts(inputMap).asMap()
509+
val alerts = (responseMap["alerts"] as ArrayList<Map<String, Any>>)
510+
alerts.forEach {
511+
assertTrue(it["error_message"] == null)
512+
}
513+
}
514+
515+
@AwaitsFix(bugUrl = "")
516+
fun `test monitor run generate lock and monitor delete removes lock`() {
517+
val testIndex = createTestIndex()
518+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
519+
val testDoc = """{
520+
"message" : "This is an error from IAD region",
521+
"test_strict_date_time" : "$testTime",
522+
"test_field" : "us-west-2"
523+
}"""
524+
525+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
526+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
527+
528+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
529+
val monitor = createMonitor(
530+
randomDocumentLevelMonitor(
531+
inputs = listOf(docLevelInput),
532+
triggers = listOf(trigger),
533+
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
534+
)
535+
)
536+
assertNotNull(monitor.id)
537+
538+
indexDoc(testIndex, "1", testDoc)
539+
indexDoc(testIndex, "5", testDoc)
540+
OpenSearchTestCase.waitUntil({
541+
val response = client().makeRequest("HEAD", LockService.LOCK_INDEX_NAME)
542+
return@waitUntil (response.restStatus().status == 200)
543+
}, 240, TimeUnit.SECONDS)
544+
545+
var response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
546+
var responseMap = entityAsMap(response)
547+
var noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
548+
assertEquals(1, noOfLocks)
549+
550+
deleteMonitor(monitor)
551+
refreshIndex(LockService.LOCK_INDEX_NAME)
552+
response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
553+
responseMap = entityAsMap(response)
554+
noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
555+
assertEquals(0, noOfLocks)
556+
}
557+
473558
fun `test execute monitor with tag as trigger condition generates alerts and findings`() {
474559
val testIndex = createTestIndex()
475560
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))

alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt

+43
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import org.opensearch.search.builder.SearchSourceBuilder
3838
import org.opensearch.test.OpenSearchTestCase
3939
import org.opensearch.test.junit.annotations.TestLogging
4040
import java.time.Instant
41+
import java.time.ZonedDateTime
42+
import java.time.format.DateTimeFormatter
4143
import java.time.temporal.ChronoUnit
4244
import java.util.Collections
4345
import java.util.Locale
@@ -1190,4 +1192,45 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
11901192
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
11911193
assertEquals("Findings saved for test monitor", 1, findings.size)
11921194
}
1195+
1196+
fun `test workflow run generates no error alerts with versionconflictengineexception with locks`() {
1197+
val testIndex = createTestIndex()
1198+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
1199+
val testDoc = """{
1200+
"message" : "This is an error from IAD region",
1201+
"test_strict_date_time" : "$testTime",
1202+
"test_field" : "us-west-2"
1203+
}"""
1204+
1205+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
1206+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
1207+
1208+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
1209+
val monitor = createMonitor(
1210+
randomDocumentLevelMonitor(
1211+
name = "__lag-monitor-test__",
1212+
inputs = listOf(docLevelInput),
1213+
triggers = listOf(trigger),
1214+
enabled = false,
1215+
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
1216+
)
1217+
)
1218+
assertNotNull(monitor.id)
1219+
createWorkflow(
1220+
randomWorkflow(
1221+
monitorIds = listOf(monitor.id),
1222+
enabled = true,
1223+
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
1224+
)
1225+
)
1226+
1227+
indexDoc(testIndex, "1", testDoc)
1228+
indexDoc(testIndex, "5", testDoc)
1229+
Thread.sleep(240000)
1230+
1231+
val alerts = searchAlerts(monitor)
1232+
alerts.forEach {
1233+
assertTrue(it.errorMessage == null)
1234+
}
1235+
}
11931236
}

0 commit comments

Comments
 (0)