Skip to content

Commit 78955a6

Browse files
sbcd90eirsep
authored andcommitted
add distributed locking to jobs in alerting (opensearch-project#1403)
Signed-off-by: Subhobrata Dey <[email protected]>
1 parent 48eb050 commit 78955a6

File tree

10 files changed

+638
-12
lines changed

10 files changed

+638
-12
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

+5-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.opensearch.alerting.core.JobSweeper
2121
import org.opensearch.alerting.core.ScheduledJobIndices
2222
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
2323
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
24+
import org.opensearch.alerting.core.lock.LockService
2425
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
2526
import org.opensearch.alerting.core.schedule.JobScheduler
2627
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
@@ -256,6 +257,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
256257
): Collection<Any> {
257258
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
258259
val settings = environment.settings()
260+
val lockService = LockService(client, clusterService)
259261
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
260262
runner = MonitorRunnerService
261263
.registerClusterService(clusterService)
@@ -272,6 +274,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
272274
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
273275
.registerJvmStats(JvmStats.jvmStats())
274276
.registerWorkflowService(WorkflowService(client, xContentRegistry))
277+
.registerLockService(lockService)
275278
.registerConsumers()
276279
.registerDestinationSettings()
277280
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
@@ -296,9 +299,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
296299
settings
297300
)
298301

299-
DeleteMonitorService.initialize(client)
302+
DeleteMonitorService.initialize(client, lockService)
300303

301-
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
304+
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator, lockService)
302305
}
303306

304307
override fun getSettings(): List<Setting<*>> {

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.opensearch.action.bulk.BackoffPolicy
99
import org.opensearch.alerting.alerts.AlertIndices
10+
import org.opensearch.alerting.core.lock.LockService
1011
import org.opensearch.alerting.model.destination.DestinationContextFactory
1112
import org.opensearch.alerting.settings.AlertingSettings
1213
import org.opensearch.alerting.settings.DestinationSettings
@@ -57,4 +58,5 @@ data class MonitorRunnerExecutionContext(
5758
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
5859
@Volatile var docLevelMonitorShardFetchSize: Int =
5960
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
61+
@Volatile var lockService: LockService? = null
6062
)

alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt

+38-10
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ import org.opensearch.alerting.alerts.AlertIndices
1818
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
1919
import org.opensearch.alerting.core.JobRunner
2020
import org.opensearch.alerting.core.ScheduledJobIndices
21+
import org.opensearch.alerting.core.lock.LockModel
22+
import org.opensearch.alerting.core.lock.LockService
2123
import org.opensearch.alerting.model.MonitorRunResult
2224
import org.opensearch.alerting.model.WorkflowRunResult
2325
import org.opensearch.alerting.model.destination.DestinationContextFactory
2426
import org.opensearch.alerting.opensearchapi.retry
27+
import org.opensearch.alerting.opensearchapi.suspendUntil
2528
import org.opensearch.alerting.script.TriggerExecutionContext
2629
import org.opensearch.alerting.settings.AlertingSettings
2730
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
@@ -221,6 +224,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
221224
return this
222225
}
223226

227+
fun registerLockService(lockService: LockService): MonitorRunnerService {
228+
monitorCtx.lockService = lockService
229+
return this
230+
}
231+
224232
// Updates destination settings when the reload API is called so that new keystore values are visible
225233
fun reloadDestinationSettings(settings: Settings) {
226234
monitorCtx.destinationSettings = loadDestinationSettings(settings)
@@ -292,20 +300,40 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
292300
when (job) {
293301
is Workflow -> {
294302
launch {
295-
logger.debug(
296-
"PERF_DEBUG: executing workflow ${job.id} on node " +
297-
monitorCtx.clusterService!!.state().nodes().localNode.id
298-
)
299-
runJob(job, periodStart, periodEnd, false)
303+
var lock: LockModel? = null
304+
try {
305+
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
306+
monitorCtx.lockService!!.acquireLock(job, it)
307+
} ?: return@launch
308+
logger.debug("lock ${lock!!.lockId} acquired")
309+
logger.debug(
310+
"PERF_DEBUG: executing workflow ${job.id} on node " +
311+
monitorCtx.clusterService!!.state().nodes().localNode.id
312+
)
313+
runJob(job, periodStart, periodEnd, false)
314+
} finally {
315+
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
316+
logger.debug("lock ${lock!!.lockId} released")
317+
}
300318
}
301319
}
302320
is Monitor -> {
303321
launch {
304-
logger.debug(
305-
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
306-
monitorCtx.clusterService!!.state().nodes().localNode.id
307-
)
308-
runJob(job, periodStart, periodEnd, false)
322+
var lock: LockModel? = null
323+
try {
324+
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
325+
monitorCtx.lockService!!.acquireLock(job, it)
326+
} ?: return@launch
327+
logger.debug("lock ${lock!!.lockId} acquired")
328+
logger.debug(
329+
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
330+
monitorCtx.clusterService!!.state().nodes().localNode.id
331+
)
332+
runJob(job, periodStart, periodEnd, false)
333+
} finally {
334+
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
335+
logger.debug("lock ${lock!!.lockId} released")
336+
}
309337
}
310338
}
311339
else -> {

alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt

+10
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import org.opensearch.action.support.IndicesOptions
2323
import org.opensearch.action.support.WriteRequest.RefreshPolicy
2424
import org.opensearch.action.support.master.AcknowledgedResponse
2525
import org.opensearch.alerting.MonitorMetadataService
26+
import org.opensearch.alerting.core.lock.LockModel
27+
import org.opensearch.alerting.core.lock.LockService
2628
import org.opensearch.alerting.opensearchapi.suspendUntil
2729
import org.opensearch.alerting.util.AlertingException
2830
import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH
@@ -48,11 +50,14 @@ object DeleteMonitorService :
4850
private val log = LogManager.getLogger(this.javaClass)
4951

5052
private lateinit var client: Client
53+
private lateinit var lockService: LockService
5154

5255
fun initialize(
5356
client: Client,
57+
lockService: LockService
5458
) {
5559
DeleteMonitorService.client = client
60+
DeleteMonitorService.lockService = lockService
5661
}
5762

5863
/**
@@ -64,6 +69,7 @@ object DeleteMonitorService :
6469
val deleteResponse = deleteMonitor(monitor.id, refreshPolicy)
6570
deleteDocLevelMonitorQueriesAndIndices(monitor)
6671
deleteMetadata(monitor)
72+
deleteLock(monitor)
6773
return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)
6874
}
6975

@@ -147,6 +153,10 @@ object DeleteMonitorService :
147153
}
148154
}
149155

156+
private suspend fun deleteLock(monitor: Monitor) {
157+
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) }
158+
}
159+
150160
/**
151161
* Checks if the monitor is part of the workflow
152162
*

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt

+9
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import org.opensearch.action.search.SearchResponse
2424
import org.opensearch.action.support.ActionFilters
2525
import org.opensearch.action.support.HandledTransportAction
2626
import org.opensearch.action.support.WriteRequest.RefreshPolicy
27+
import org.opensearch.alerting.core.lock.LockModel
28+
import org.opensearch.alerting.core.lock.LockService
2729
import org.opensearch.alerting.model.MonitorMetadata
2830
import org.opensearch.alerting.model.WorkflowMetadata
2931
import org.opensearch.alerting.opensearchapi.addFilter
@@ -73,6 +75,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
7375
val clusterService: ClusterService,
7476
val settings: Settings,
7577
val xContentRegistry: NamedXContentRegistry,
78+
val lockService: LockService
7679
) : HandledTransportAction<ActionRequest, DeleteWorkflowResponse>(
7780
AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest
7881
),
@@ -180,6 +183,12 @@ class TransportDeleteWorkflowAction @Inject constructor(
180183
} catch (t: Exception) {
181184
log.error("Failed to delete delegate monitor metadata. But proceeding with workflow deletion $workflowId", t)
182185
}
186+
try {
187+
// Delete the workflow lock
188+
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(workflowId), it) }
189+
} catch (t: Exception) {
190+
log.error("Failed to delete workflow lock for $workflowId")
191+
}
183192
actionListener.onResponse(deleteWorkflowResponse)
184193
} else {
185194
actionListener.onFailure(

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

+85
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity
1010
import org.opensearch.action.search.SearchResponse
1111
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
1212
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
13+
import org.opensearch.alerting.core.lock.LockService
1314
import org.opensearch.alerting.settings.AlertingSettings
1415
import org.opensearch.client.Response
1516
import org.opensearch.client.ResponseException
@@ -18,16 +19,20 @@ import org.opensearch.commons.alerting.model.Alert
1819
import org.opensearch.commons.alerting.model.DataSources
1920
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
2021
import org.opensearch.commons.alerting.model.DocLevelQuery
22+
import org.opensearch.commons.alerting.model.IntervalSchedule
2123
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
2224
import org.opensearch.commons.alerting.model.action.AlertCategory
2325
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
2426
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
2527
import org.opensearch.rest.RestStatus
2628
import org.opensearch.script.Script
29+
import org.opensearch.test.OpenSearchTestCase
2730
import java.time.ZonedDateTime
2831
import java.time.format.DateTimeFormatter
32+
import java.time.temporal.ChronoUnit
2933
import java.time.temporal.ChronoUnit.MILLIS
3034
import java.util.Locale
35+
import java.util.concurrent.TimeUnit
3136

3237
class DocumentMonitorRunnerIT : AlertingRestTestCase() {
3338

@@ -616,6 +621,86 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
616621
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1"))
617622
}
618623

624+
fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() {
625+
val testIndex = createTestIndex()
626+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
627+
val testDoc = """{
628+
"message" : "This is an error from IAD region",
629+
"test_strict_date_time" : "$testTime",
630+
"test_field" : "us-west-2"
631+
}"""
632+
633+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
634+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
635+
636+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
637+
val monitor = createMonitor(
638+
randomDocumentLevelMonitor(
639+
name = "__lag-monitor-test__",
640+
inputs = listOf(docLevelInput),
641+
triggers = listOf(trigger),
642+
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
643+
)
644+
)
645+
assertNotNull(monitor.id)
646+
647+
indexDoc(testIndex, "1", testDoc)
648+
indexDoc(testIndex, "5", testDoc)
649+
Thread.sleep(240000)
650+
651+
val inputMap = HashMap<String, Any>()
652+
inputMap["searchString"] = monitor.name
653+
654+
val responseMap = getAlerts(inputMap).asMap()
655+
val alerts = (responseMap["alerts"] as ArrayList<Map<String, Any>>)
656+
alerts.forEach {
657+
assertTrue(it["error_message"] == null)
658+
}
659+
}
660+
661+
@AwaitsFix(bugUrl = "")
662+
fun `test monitor run generate lock and monitor delete removes lock`() {
663+
val testIndex = createTestIndex()
664+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
665+
val testDoc = """{
666+
"message" : "This is an error from IAD region",
667+
"test_strict_date_time" : "$testTime",
668+
"test_field" : "us-west-2"
669+
}"""
670+
671+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
672+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
673+
674+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
675+
val monitor = createMonitor(
676+
randomDocumentLevelMonitor(
677+
inputs = listOf(docLevelInput),
678+
triggers = listOf(trigger),
679+
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
680+
)
681+
)
682+
assertNotNull(monitor.id)
683+
684+
indexDoc(testIndex, "1", testDoc)
685+
indexDoc(testIndex, "5", testDoc)
686+
OpenSearchTestCase.waitUntil({
687+
val response = client().makeRequest("HEAD", LockService.LOCK_INDEX_NAME)
688+
return@waitUntil (response.restStatus().status == 200)
689+
}, 240, TimeUnit.SECONDS)
690+
691+
var response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
692+
var responseMap = entityAsMap(response)
693+
var noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
694+
assertEquals(1, noOfLocks)
695+
696+
deleteMonitor(monitor)
697+
refreshIndex(LockService.LOCK_INDEX_NAME)
698+
response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
699+
responseMap = entityAsMap(response)
700+
noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
701+
assertEquals(0, noOfLocks)
702+
}
703+
619704
fun `test execute monitor with tag as trigger condition generates alerts and findings`() {
620705
val testIndex = createTestIndex()
621706
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))

alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt

+43
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
3737
import org.opensearch.search.builder.SearchSourceBuilder
3838
import org.opensearch.test.junit.annotations.TestLogging
3939
import java.time.Instant
40+
import java.time.ZonedDateTime
41+
import java.time.format.DateTimeFormatter
4042
import java.time.temporal.ChronoUnit
4143
import java.util.Collections
4244
import java.util.Locale
@@ -1185,4 +1187,45 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
11851187
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
11861188
assertEquals("Findings saved for test monitor", 1, findings.size)
11871189
}
1190+
1191+
fun `test workflow run generates no error alerts with versionconflictengineexception with locks`() {
1192+
val testIndex = createTestIndex()
1193+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
1194+
val testDoc = """{
1195+
"message" : "This is an error from IAD region",
1196+
"test_strict_date_time" : "$testTime",
1197+
"test_field" : "us-west-2"
1198+
}"""
1199+
1200+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
1201+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
1202+
1203+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
1204+
val monitor = createMonitor(
1205+
randomDocumentLevelMonitor(
1206+
name = "__lag-monitor-test__",
1207+
inputs = listOf(docLevelInput),
1208+
triggers = listOf(trigger),
1209+
enabled = false,
1210+
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
1211+
)
1212+
)
1213+
assertNotNull(monitor.id)
1214+
createWorkflow(
1215+
randomWorkflow(
1216+
monitorIds = listOf(monitor.id),
1217+
enabled = true,
1218+
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
1219+
)
1220+
)
1221+
1222+
indexDoc(testIndex, "1", testDoc)
1223+
indexDoc(testIndex, "5", testDoc)
1224+
Thread.sleep(240000)
1225+
1226+
val alerts = searchAlerts(monitor)
1227+
alerts.forEach {
1228+
assertTrue(it.errorMessage == null)
1229+
}
1230+
}
11881231
}

0 commit comments

Comments
 (0)