Skip to content

Commit 4f9e27b

Browse files
author
Nishtha Mehrotra
committed
Force create last run context in monitor worflow metadata when workflow is re-enabled
Signed-off-by: Nishtha Mehrotra <[email protected]>
1 parent 03595f8 commit 4f9e27b

File tree

3 files changed

+131
-3
lines changed

3 files changed

+131
-3
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,15 @@ object MonitorMetadataService :
139139
monitor: Monitor,
140140
createWithRunContext: Boolean = true,
141141
skipIndex: Boolean = false,
142-
workflowMetadataId: String? = null
142+
workflowMetadataId: String? = null,
143+
forceCreateLastRunContext: Boolean = false
143144
): Pair<MonitorMetadata, Boolean> {
144145
try {
145146
val created = true
146-
val metadata = getMetadata(monitor, workflowMetadataId)
147+
var metadata = getMetadata(monitor, workflowMetadataId)
148+
if (forceCreateLastRunContext) {
149+
metadata = metadata?.copy(lastRunContext = createUpdatedRunContext(monitor))
150+
}
147151
return if (metadata != null) {
148152
metadata to !created
149153
} else {
@@ -159,6 +163,20 @@ object MonitorMetadataService :
159163
}
160164
}
161165

166+
private suspend fun createUpdatedRunContext(
167+
monitor: Monitor
168+
): Map<String, MutableMap<String, Any>> {
169+
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
170+
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
171+
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
172+
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
173+
else null
174+
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
175+
createFullRunContext(monitorIndex)
176+
else emptyMap()
177+
return runContext
178+
}
179+
162180
suspend fun getMetadata(monitor: Monitor, workflowMetadataId: String? = null): MonitorMetadata? {
163181
try {
164182
val metadataId = MonitorMetadata.getId(monitor, workflowMetadataId)

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,10 +552,17 @@ class TransportIndexWorkflowAction @Inject constructor(
552552
val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)
553553

554554
for (monitor in monitors) {
555+
var isWorkflowRestarted = false
556+
557+
if (request.workflow.enabled && !currentWorkflow.enabled) {
558+
isWorkflowRestarted = true
559+
}
560+
555561
val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata(
556562
monitor = monitor,
557563
createWithRunContext = true,
558-
workflowMetadataId = workflowMetadata.id
564+
workflowMetadataId = workflowMetadata.id,
565+
forceCreateLastRunContext = isWorkflowRestarted
559566
)
560567

561568
if (!created &&

alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import org.opensearch.commons.alerting.model.Delegate
5555
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
5656
import org.opensearch.commons.alerting.model.DocLevelQuery
5757
import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult
58+
import org.opensearch.commons.alerting.model.Finding
5859
import org.opensearch.commons.alerting.model.IntervalSchedule
5960
import org.opensearch.commons.alerting.model.Monitor
6061
import org.opensearch.commons.alerting.model.ScheduledJob
@@ -6380,4 +6381,106 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
63806381
}
63816382
}
63826383
}
6384+
6385+
fun `test execute workflow when monitor is disabled and re-enabled`() {
6386+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
6387+
6388+
val index1 = "index_123"
6389+
createIndex(index1, Settings.EMPTY)
6390+
val q1 = DocLevelQuery(query = "properties:\"abcd\"", name = "1", fields = listOf())
6391+
6392+
val docLevelInput = DocLevelMonitorInput(
6393+
"description",
6394+
listOf(index1),
6395+
listOf(q1)
6396+
)
6397+
6398+
val customQueryIndex = "custom_alerts_index"
6399+
6400+
val monitor = randomDocumentLevelMonitor(
6401+
inputs = listOf(docLevelInput),
6402+
triggers = listOf(trigger),
6403+
dataSources = DataSources(
6404+
queryIndex = customQueryIndex
6405+
)
6406+
)
6407+
6408+
val monitorResponse = createMonitor(monitor)!!
6409+
6410+
val workflowRequest = randomWorkflow(
6411+
monitorIds = listOf(monitorResponse.id)
6412+
)
6413+
val workflowResponse = upsertWorkflow(workflowRequest)!!
6414+
val workflowId = workflowResponse.id
6415+
val getWorkflowResponse = getWorkflowById(id = workflowResponse.id)
6416+
6417+
assertNotNull(getWorkflowResponse)
6418+
assertEquals(workflowId, getWorkflowResponse.id)
6419+
6420+
// Verify that monitor workflow metadata exists
6421+
assertNotNull(searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata"))
6422+
6423+
val testDoc1 = """{
6424+
"properties": "abcd"
6425+
}"""
6426+
indexDoc(index1, "1", testDoc1)
6427+
indexDoc(index1, "2", testDoc1)
6428+
indexDoc(index1, "3", testDoc1)
6429+
6430+
// Run workflow
6431+
executeWorkflow(workflowRequest, workflowId, false)
6432+
var findings: List<Finding>
6433+
OpenSearchTestCase.waitUntil({
6434+
findings = searchFindings(monitorResponse.id)
6435+
if (findings.size >= 1) {
6436+
return@waitUntil true
6437+
}
6438+
return@waitUntil false
6439+
}, 30, TimeUnit.SECONDS)
6440+
6441+
// Verify that monitor workflow metadata is updated with lastRunContext
6442+
var monitorWokflowMetadata = searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata")
6443+
val lastRunContextBeforeDisable = (monitorWokflowMetadata?.lastRunContext?.get(index1) as? Map<String, Any>)
6444+
assertEquals(2, lastRunContextBeforeDisable?.get("0"))
6445+
6446+
// Disable workflow
6447+
val disabledWorkflowRequest = randomWorkflow(
6448+
monitorIds = listOf(monitorResponse.id),
6449+
id = workflowId,
6450+
enabled = false
6451+
)
6452+
upsertWorkflow(disabledWorkflowRequest)
6453+
OpenSearchTestCase.waitUntil({
6454+
val workflowResponse = getWorkflowById(workflowId)
6455+
if (workflowResponse.workflow?.enabled == false) {
6456+
return@waitUntil true
6457+
}
6458+
return@waitUntil false
6459+
}, 30, TimeUnit.SECONDS)
6460+
6461+
// Index doc, since workflow is disabled, monitor workflow metadata shouldn't be updated
6462+
indexDoc(index1, "4", testDoc1)
6463+
6464+
// re-enable workflow
6465+
val enabledWorkflowRequest = randomWorkflow(
6466+
monitorIds = listOf(monitorResponse.id),
6467+
id = workflowId,
6468+
enabled = true
6469+
)
6470+
upsertWorkflow(enabledWorkflowRequest, method = RestRequest.Method.PUT, id = workflowId)
6471+
OpenSearchTestCase.waitUntil({
6472+
val workflowResponse = getWorkflowById(workflowId)
6473+
if (workflowResponse.workflow?.enabled == true) {
6474+
return@waitUntil true
6475+
}
6476+
return@waitUntil false
6477+
}, 30, TimeUnit.SECONDS)
6478+
6479+
// Verify that monitor workflow metadata exists
6480+
// Since workflow is re-enabled, last run context should be updated with latest sequence number
6481+
monitorWokflowMetadata = searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata")
6482+
assertNotNull(monitorWokflowMetadata)
6483+
val lastRunContext = (monitorWokflowMetadata?.lastRunContext?.get(index1) as? Map<String, Any>)
6484+
assertEquals(3, lastRunContext?.get("0"))
6485+
}
63836486
}

0 commit comments

Comments
 (0)