Skip to content

Commit 461e95f

Browse files
authored
DocLevelMonitor Error Alert - rework (#892)
Specific "Error Alert" is created for a monitor, when error happens during monitor run. State of alert is ERROR and subsequent errors would upsert this "Error Alert". If error message is different then previous one, errorMessage field would be overwritten and previous error would be added to errorHistory rolling array(10 elements). --------- Signed-off-by: Petar Dzepina <[email protected]>
1 parent 8c033b9 commit 461e95f

File tree

6 files changed

+266
-58
lines changed

6 files changed

+266
-58
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.opensearch.action.bulk.BulkRequest
1313
import org.opensearch.action.bulk.BulkResponse
1414
import org.opensearch.action.delete.DeleteRequest
1515
import org.opensearch.action.index.IndexRequest
16+
import org.opensearch.action.index.IndexResponse
1617
import org.opensearch.action.search.SearchRequest
1718
import org.opensearch.action.search.SearchResponse
1819
import org.opensearch.action.support.WriteRequest
@@ -40,13 +41,15 @@ import org.opensearch.commons.alerting.model.Alert
4041
import org.opensearch.commons.alerting.model.BucketLevelTrigger
4142
import org.opensearch.commons.alerting.model.DataSources
4243
import org.opensearch.commons.alerting.model.Monitor
44+
import org.opensearch.commons.alerting.model.NoOpTrigger
4345
import org.opensearch.commons.alerting.model.Trigger
4446
import org.opensearch.commons.alerting.model.action.AlertCategory
4547
import org.opensearch.core.xcontent.NamedXContentRegistry
4648
import org.opensearch.core.xcontent.XContentParser
4749
import org.opensearch.index.query.QueryBuilders
4850
import org.opensearch.rest.RestStatus
4951
import org.opensearch.search.builder.SearchSourceBuilder
52+
import org.opensearch.search.sort.SortOrder
5053
import java.time.Instant
5154
import java.util.UUID
5255

@@ -187,6 +190,19 @@ class AlertService(
187190
)
188191
}
189192

193+
fun composeMonitorErrorAlert(
194+
id: String,
195+
monitor: Monitor,
196+
alertError: AlertError
197+
): Alert {
198+
val currentTime = Instant.now()
199+
return Alert(
200+
id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime,
201+
lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message,
202+
schemaVersion = IndexUtils.alertIndexSchemaVersion
203+
)
204+
}
205+
190206
fun updateActionResultsForBucketLevelAlert(
191207
currentAlert: Alert,
192208
actionResults: Map<String, ActionRunResult>,
@@ -281,6 +297,60 @@ class AlertService(
281297
} ?: listOf()
282298
}
283299

300+
suspend fun upsertMonitorErrorAlert(monitor: Monitor, errorMessage: String) {
301+
val errorAlertIdPrefix = "error-alert"
302+
val newErrorAlertId = "$errorAlertIdPrefix-${monitor.id}-${UUID.randomUUID()}"
303+
304+
val searchRequest = SearchRequest("${monitor.dataSources.alertsIndex}*")
305+
.source(
306+
SearchSourceBuilder()
307+
.sort(Alert.START_TIME_FIELD, SortOrder.DESC)
308+
.query(
309+
QueryBuilders.boolQuery()
310+
.must(QueryBuilders.queryStringQuery("${Alert.ALERT_ID_FIELD}:$errorAlertIdPrefix*"))
311+
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id))
312+
.must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.ERROR))
313+
)
314+
)
315+
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
316+
317+
var alert = composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage))
318+
319+
if (searchResponse.hits.totalHits.value > 0L) {
320+
if (searchResponse.hits.totalHits.value > 1L) {
321+
logger.warn("There are [${searchResponse.hits.totalHits.value}] error alerts for monitor [${monitor.id}]")
322+
}
323+
// Deserialize first/latest Alert
324+
val hit = searchResponse.hits.hits[0]
325+
val xcp = contentParser(hit.sourceRef)
326+
val existingErrorAlert = Alert.parse(xcp, hit.id, hit.version)
327+
328+
val currentTime = Instant.now()
329+
alert = if (alert.errorMessage != existingErrorAlert.errorMessage) {
330+
var newErrorHistory = existingErrorAlert.errorHistory.update(
331+
AlertError(existingErrorAlert.startTime, existingErrorAlert.errorMessage!!)
332+
)
333+
alert.copy(
334+
id = existingErrorAlert.id,
335+
errorHistory = newErrorHistory,
336+
startTime = currentTime,
337+
lastNotificationTime = currentTime
338+
)
339+
} else {
340+
existingErrorAlert.copy(startTime = Instant.now(), lastNotificationTime = currentTime)
341+
}
342+
}
343+
344+
val alertIndexRequest = IndexRequest(monitor.dataSources.alertsIndex)
345+
.routing(alert.monitorId)
346+
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
347+
.opType(DocWriteRequest.OpType.INDEX)
348+
.id(alert.id)
349+
350+
val indexResponse: IndexResponse = client.suspendUntil { index(alertIndexRequest, it) }
351+
logger.debug("Monitor error Alert successfully upserted. Op result: ${indexResponse.result}")
352+
}
353+
284354
suspend fun saveAlerts(
285355
dataSources: DataSources,
286356
alerts: List<Alert>,

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 80 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
2020
import org.opensearch.alerting.model.InputRunResults
2121
import org.opensearch.alerting.model.MonitorMetadata
2222
import org.opensearch.alerting.model.MonitorRunResult
23+
import org.opensearch.alerting.model.userErrorMessage
2324
import org.opensearch.alerting.opensearchapi.suspendUntil
2425
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
2526
import org.opensearch.alerting.util.AlertingException
@@ -192,63 +193,88 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
192193
}
193194
}
194195
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
195-
} catch (e: Exception) {
196-
logger.error("Failed to start Document-level-monitor ${monitor.name}", e)
197-
val alertingException = AlertingException(
198-
ExceptionsHelper.unwrapCause(e).cause?.message.toString(),
199-
RestStatus.INTERNAL_SERVER_ERROR,
200-
e
201-
)
202-
monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
203-
}
204196

205-
/*
206-
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
207-
list of matched docId from inputRunResults>
208-
this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser
209-
*/
210-
queries.forEach {
211-
if (inputRunResults.containsKey(it.id)) {
212-
queryToDocIds[it] = inputRunResults[it.id]!!
197+
/*
198+
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
199+
list of matched docId from inputRunResults>
200+
this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser
201+
*/
202+
queries.forEach {
203+
if (inputRunResults.containsKey(it.id)) {
204+
queryToDocIds[it] = inputRunResults[it.id]!!
205+
}
213206
}
214-
}
215207

216-
val idQueryMap: Map<String, DocLevelQuery> = queries.associateBy { it.id }
208+
val idQueryMap: Map<String, DocLevelQuery> = queries.associateBy { it.id }
217209

218-
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
219-
// If there are no triggers defined, we still want to generate findings
220-
if (monitor.triggers.isEmpty()) {
221-
if (dryrun == false && monitor.id != Monitor.NO_ID) {
222-
docsToQueries.forEach {
223-
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
224-
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
210+
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
211+
// If there are no triggers defined, we still want to generate findings
212+
if (monitor.triggers.isEmpty()) {
213+
if (dryrun == false && monitor.id != Monitor.NO_ID) {
214+
docsToQueries.forEach {
215+
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
216+
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
217+
}
218+
}
219+
} else {
220+
monitor.triggers.forEach {
221+
triggerResults[it.id] = runForEachDocTrigger(
222+
monitorCtx,
223+
monitorResult,
224+
it as DocumentLevelTrigger,
225+
monitor,
226+
idQueryMap,
227+
docsToQueries,
228+
queryToDocIds,
229+
dryrun
230+
)
225231
}
226232
}
227-
} else {
228-
monitor.triggers.forEach {
229-
triggerResults[it.id] = runForEachDocTrigger(
230-
monitorCtx,
231-
monitorResult,
232-
it as DocumentLevelTrigger,
233-
monitor,
234-
idQueryMap,
235-
docsToQueries,
236-
queryToDocIds,
237-
dryrun
233+
// Don't update monitor if this is a test monitor
234+
if (!isTempMonitor) {
235+
// If any error happened during trigger execution, upsert monitor error alert
236+
val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults)
237+
if (errorMessage.isNotEmpty()) {
238+
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor = monitor, errorMessage = errorMessage)
239+
}
240+
241+
MonitorMetadataService.upsertMetadata(
242+
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
243+
true
238244
)
239245
}
240-
}
241246

242-
// Don't update monitor if this is a test monitor
243-
if (!isTempMonitor) {
244-
MonitorMetadataService.upsertMetadata(
245-
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
246-
true
247+
// TODO: Update the Document as part of the Trigger and return back the trigger action result
248+
return monitorResult.copy(triggerResults = triggerResults)
249+
} catch (e: Exception) {
250+
val errorMessage = ExceptionsHelper.detailedMessage(e)
251+
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage)
252+
logger.error("Failed running Document-level-monitor ${monitor.name}", e)
253+
val alertingException = AlertingException(
254+
errorMessage,
255+
RestStatus.INTERNAL_SERVER_ERROR,
256+
e
247257
)
258+
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
248259
}
260+
}
249261

250-
// TODO: Update the Document as part of the Trigger and return back the trigger action result
251-
return monitorResult.copy(triggerResults = triggerResults)
262+
private fun constructErrorMessageFromTriggerResults(
263+
triggerResults: MutableMap<String, DocumentLevelTriggerRunResult>? = null
264+
): String {
265+
var errorMessage = ""
266+
if (triggerResults != null) {
267+
val triggersErrorBuilder = StringBuilder()
268+
triggerResults.forEach {
269+
if (it.value.error != null) {
270+
triggersErrorBuilder.append("[${it.key}]: [${it.value.error!!.userErrorMessage()}]").append(" | ")
271+
}
272+
}
273+
if (triggersErrorBuilder.isNotEmpty()) {
274+
errorMessage = "Trigger errors: $triggersErrorBuilder"
275+
}
276+
}
277+
return errorMessage
252278
}
253279

254280
private suspend fun runForEachDocTrigger(
@@ -295,16 +321,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
295321
alerts.add(alert)
296322
}
297323

298-
if (findingDocPairs.isEmpty() && monitorResult.error != null) {
299-
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
300-
listOf(),
301-
listOf(),
302-
triggerCtx,
303-
monitorResult.alertError() ?: triggerResult.alertError()
304-
)
305-
alerts.add(alert)
306-
}
307-
308324
val shouldDefaultToPerExecution = defaultToPerExecutionAction(
309325
monitorCtx.maxActionableAlertCount,
310326
monitorId = monitor.id,
@@ -576,8 +592,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
576592
searchSourceBuilder.query(boolQueryBuilder)
577593
searchRequest.source(searchSourceBuilder)
578594

579-
val response: SearchResponse = monitorCtx.client!!.suspendUntil {
580-
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
595+
var response: SearchResponse
596+
try {
597+
response = monitorCtx.client!!.suspendUntil {
598+
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
599+
}
600+
} catch (e: Exception) {
601+
throw IllegalStateException(
602+
"Failed to run percolate search for sourceIndex [$index] and queryIndex [$queryIndex] for ${docs.size} document(s)", e
603+
)
581604
}
582605

583606
if (response.status() !== RestStatus.OK) {

alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ class AlertIndices(
351351
}
352352
if (existsResponse.isExists) return true
353353

354+
logger.debug("index: [$index] schema mappings: [$schemaMapping]")
354355
val request = CreateIndexRequest(index)
355356
.mapping(schemaMapping)
356357
.settings(Settings.builder().put("index.hidden", true).build())
@@ -474,7 +475,7 @@ class AlertIndices(
474475
clusterStateRequest,
475476
object : ActionListener<ClusterStateResponse> {
476477
override fun onResponse(clusterStateResponse: ClusterStateResponse) {
477-
if (!clusterStateResponse.state.metadata.indices.isEmpty()) {
478+
if (clusterStateResponse.state.metadata.indices.isNotEmpty()) {
478479
val indicesToDelete = getIndicesToDelete(clusterStateResponse)
479480
logger.info("Deleting old $tag indices viz $indicesToDelete")
480481
deleteAllOldHistoryIndices(indicesToDelete)

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
283283
)
284284
indexRequests.add(indexRequest)
285285
}
286+
log.debug("bulk inserting percolate [${queries.size}] queries")
286287
if (indexRequests.isNotEmpty()) {
287288
val bulkResponse: BulkResponse = client.suspendUntil {
288289
client.bulk(

0 commit comments

Comments
 (0)