Skip to content

Commit d75ba57

Browse files
authored
DocLevelMonitor Error Alert - rework (opensearch-project#892) (opensearch-project#907)
Specific "Error Alert" is created for a monitor, when error happens during monitor run. State of alert is ERROR and subsequent errors would upsert this "Error Alert". If error message is different then previous one, errorMessage field would be overwritten and previous error would be added to errorHistory rolling array(10 elements). --------- Signed-off-by: Petar Dzepina <[email protected]> (cherry picked from commit 461e95f38bd55268e3ac17026e283ac9f18d911b)
1 parent a7bd280 commit d75ba57

File tree

6 files changed

+275
-71
lines changed

6 files changed

+275
-71
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.opensearch.action.bulk.BulkRequest
1313
import org.opensearch.action.bulk.BulkResponse
1414
import org.opensearch.action.delete.DeleteRequest
1515
import org.opensearch.action.index.IndexRequest
16+
import org.opensearch.action.index.IndexResponse
1617
import org.opensearch.action.search.SearchRequest
1718
import org.opensearch.action.search.SearchResponse
1819
import org.opensearch.action.support.WriteRequest
@@ -40,13 +41,15 @@ import org.opensearch.commons.alerting.model.Alert
4041
import org.opensearch.commons.alerting.model.BucketLevelTrigger
4142
import org.opensearch.commons.alerting.model.DataSources
4243
import org.opensearch.commons.alerting.model.Monitor
44+
import org.opensearch.commons.alerting.model.NoOpTrigger
4345
import org.opensearch.commons.alerting.model.Trigger
4446
import org.opensearch.commons.alerting.model.action.AlertCategory
4547
import org.opensearch.core.xcontent.NamedXContentRegistry
4648
import org.opensearch.core.xcontent.XContentParser
4749
import org.opensearch.index.query.QueryBuilders
4850
import org.opensearch.rest.RestStatus
4951
import org.opensearch.search.builder.SearchSourceBuilder
52+
import org.opensearch.search.sort.SortOrder
5053
import java.time.Instant
5154
import java.util.UUID
5255

@@ -193,6 +196,19 @@ class AlertService(
193196
)
194197
}
195198

199+
fun composeMonitorErrorAlert(
200+
id: String,
201+
monitor: Monitor,
202+
alertError: AlertError
203+
): Alert {
204+
val currentTime = Instant.now()
205+
return Alert(
206+
id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime,
207+
lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message,
208+
schemaVersion = IndexUtils.alertIndexSchemaVersion
209+
)
210+
}
211+
196212
fun updateActionResultsForBucketLevelAlert(
197213
currentAlert: Alert,
198214
actionResults: Map<String, ActionRunResult>,
@@ -289,6 +305,60 @@ class AlertService(
289305
} ?: listOf()
290306
}
291307

308+
suspend fun upsertMonitorErrorAlert(monitor: Monitor, errorMessage: String) {
309+
val errorAlertIdPrefix = "error-alert"
310+
val newErrorAlertId = "$errorAlertIdPrefix-${monitor.id}-${UUID.randomUUID()}"
311+
312+
val searchRequest = SearchRequest("${monitor.dataSources.alertsIndex}*")
313+
.source(
314+
SearchSourceBuilder()
315+
.sort(Alert.START_TIME_FIELD, SortOrder.DESC)
316+
.query(
317+
QueryBuilders.boolQuery()
318+
.must(QueryBuilders.queryStringQuery("${Alert.ALERT_ID_FIELD}:$errorAlertIdPrefix*"))
319+
.must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id))
320+
.must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.ERROR))
321+
)
322+
)
323+
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
324+
325+
var alert = composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage))
326+
327+
if (searchResponse.hits.totalHits.value > 0L) {
328+
if (searchResponse.hits.totalHits.value > 1L) {
329+
logger.warn("There are [${searchResponse.hits.totalHits.value}] error alerts for monitor [${monitor.id}]")
330+
}
331+
// Deserialize first/latest Alert
332+
val hit = searchResponse.hits.hits[0]
333+
val xcp = contentParser(hit.sourceRef)
334+
val existingErrorAlert = Alert.parse(xcp, hit.id, hit.version)
335+
336+
val currentTime = Instant.now()
337+
alert = if (alert.errorMessage != existingErrorAlert.errorMessage) {
338+
var newErrorHistory = existingErrorAlert.errorHistory.update(
339+
AlertError(existingErrorAlert.startTime, existingErrorAlert.errorMessage!!)
340+
)
341+
alert.copy(
342+
id = existingErrorAlert.id,
343+
errorHistory = newErrorHistory,
344+
startTime = currentTime,
345+
lastNotificationTime = currentTime
346+
)
347+
} else {
348+
existingErrorAlert.copy(startTime = Instant.now(), lastNotificationTime = currentTime)
349+
}
350+
}
351+
352+
val alertIndexRequest = IndexRequest(monitor.dataSources.alertsIndex)
353+
.routing(alert.monitorId)
354+
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
355+
.opType(DocWriteRequest.OpType.INDEX)
356+
.id(alert.id)
357+
358+
val indexResponse: IndexResponse = client.suspendUntil { index(alertIndexRequest, it) }
359+
logger.debug("Monitor error Alert successfully upserted. Op result: ${indexResponse.result}")
360+
}
361+
292362
suspend fun saveAlerts(
293363
dataSources: DataSources,
294364
alerts: List<Alert>,

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 83 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
2020
import org.opensearch.alerting.model.InputRunResults
2121
import org.opensearch.alerting.model.MonitorMetadata
2222
import org.opensearch.alerting.model.MonitorRunResult
23+
import org.opensearch.alerting.model.userErrorMessage
2324
import org.opensearch.alerting.opensearchapi.suspendUntil
2425
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
2526
import org.opensearch.alerting.util.AlertingException
@@ -192,53 +193,88 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
192193
}
193194
}
194195
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
196+
197+
/*
198+
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
199+
list of matched docId from inputRunResults>
200+
this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser
201+
*/
202+
queries.forEach {
203+
if (inputRunResults.containsKey(it.id)) {
204+
queryToDocIds[it] = inputRunResults[it.id]!!
205+
}
206+
}
207+
208+
val idQueryMap: Map<String, DocLevelQuery> = queries.associateBy { it.id }
209+
210+
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
211+
// If there are no triggers defined, we still want to generate findings
212+
if (monitor.triggers.isEmpty()) {
213+
if (dryrun == false && monitor.id != Monitor.NO_ID) {
214+
docsToQueries.forEach {
215+
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
216+
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
217+
}
218+
}
219+
} else {
220+
monitor.triggers.forEach {
221+
triggerResults[it.id] = runForEachDocTrigger(
222+
monitorCtx,
223+
monitorResult,
224+
it as DocumentLevelTrigger,
225+
monitor,
226+
idQueryMap,
227+
docsToQueries,
228+
queryToDocIds,
229+
dryrun
230+
)
231+
}
232+
}
233+
// Don't update monitor if this is a test monitor
234+
if (!isTempMonitor) {
235+
// If any error happened during trigger execution, upsert monitor error alert
236+
val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults)
237+
if (errorMessage.isNotEmpty()) {
238+
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor = monitor, errorMessage = errorMessage)
239+
}
240+
241+
MonitorMetadataService.upsertMetadata(
242+
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
243+
true
244+
)
245+
}
246+
247+
// TODO: Update the Document as part of the Trigger and return back the trigger action result
248+
return monitorResult.copy(triggerResults = triggerResults)
195249
} catch (e: Exception) {
196-
logger.error("Failed to start Document-level-monitor ${monitor.name}", e)
250+
val errorMessage = ExceptionsHelper.detailedMessage(e)
251+
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage)
252+
logger.error("Failed running Document-level-monitor ${monitor.name}", e)
197253
val alertingException = AlertingException(
198-
ExceptionsHelper.unwrapCause(e).cause?.message.toString(),
254+
errorMessage,
199255
RestStatus.INTERNAL_SERVER_ERROR,
200256
e
201257
)
202-
monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
258+
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
203259
}
260+
}
204261

205-
/*
206-
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
207-
list of matched docId from inputRunResults>
208-
this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser
209-
*/
210-
queries.forEach {
211-
if (inputRunResults.containsKey(it.id)) {
212-
queryToDocIds[it] = inputRunResults[it.id]!!
262+
private fun constructErrorMessageFromTriggerResults(
263+
triggerResults: MutableMap<String, DocumentLevelTriggerRunResult>? = null
264+
): String {
265+
var errorMessage = ""
266+
if (triggerResults != null) {
267+
val triggersErrorBuilder = StringBuilder()
268+
triggerResults.forEach {
269+
if (it.value.error != null) {
270+
triggersErrorBuilder.append("[${it.key}]: [${it.value.error!!.userErrorMessage()}]").append(" | ")
271+
}
272+
}
273+
if (triggersErrorBuilder.isNotEmpty()) {
274+
errorMessage = "Trigger errors: $triggersErrorBuilder"
213275
}
214276
}
215-
216-
val idQueryMap: Map<String, DocLevelQuery> = queries.associateBy { it.id }
217-
218-
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
219-
monitor.triggers.forEach {
220-
triggerResults[it.id] = runForEachDocTrigger(
221-
monitorCtx,
222-
monitorResult,
223-
it as DocumentLevelTrigger,
224-
monitor,
225-
idQueryMap,
226-
docsToQueries,
227-
queryToDocIds,
228-
dryrun
229-
)
230-
}
231-
232-
// Don't update monitor if this is a test monitor
233-
if (!isTempMonitor) {
234-
MonitorMetadataService.upsertMetadata(
235-
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
236-
true
237-
)
238-
}
239-
240-
// TODO: Update the Document as part of the Trigger and return back the trigger action result
241-
return monitorResult.copy(triggerResults = triggerResults)
277+
return errorMessage
242278
}
243279

244280
private suspend fun runForEachDocTrigger(
@@ -285,16 +321,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
285321
alerts.add(alert)
286322
}
287323

288-
if (findingDocPairs.isEmpty() && monitorResult.error != null) {
289-
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
290-
listOf(),
291-
listOf(),
292-
triggerCtx,
293-
monitorResult.alertError() ?: triggerResult.alertError()
294-
)
295-
alerts.add(alert)
296-
}
297-
298324
val shouldDefaultToPerExecution = defaultToPerExecutionAction(
299325
monitorCtx.maxActionableAlertCount,
300326
monitorId = monitor.id,
@@ -567,8 +593,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
567593
searchSourceBuilder.query(boolQueryBuilder)
568594
searchRequest.source(searchSourceBuilder)
569595

570-
val response: SearchResponse = monitorCtx.client!!.suspendUntil {
571-
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
596+
var response: SearchResponse
597+
try {
598+
response = monitorCtx.client!!.suspendUntil {
599+
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
600+
}
601+
} catch (e: Exception) {
602+
throw IllegalStateException(
603+
"Failed to run percolate search for sourceIndex [$index] and queryIndex [$queryIndex] for ${docs.size} document(s)", e
604+
)
572605
}
573606

574607
if (response.status() !== RestStatus.OK) {

alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -137,23 +137,18 @@ class AlertIndices(
137137
}
138138

139139
@Volatile private var alertHistoryEnabled = AlertingSettings.ALERT_HISTORY_ENABLED.get(settings)
140-
141140
@Volatile private var findingHistoryEnabled = AlertingSettings.FINDING_HISTORY_ENABLED.get(settings)
142141

143142
@Volatile private var alertHistoryMaxDocs = AlertingSettings.ALERT_HISTORY_MAX_DOCS.get(settings)
144-
145143
@Volatile private var findingHistoryMaxDocs = AlertingSettings.FINDING_HISTORY_MAX_DOCS.get(settings)
146144

147145
@Volatile private var alertHistoryMaxAge = AlertingSettings.ALERT_HISTORY_INDEX_MAX_AGE.get(settings)
148-
149146
@Volatile private var findingHistoryMaxAge = AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE.get(settings)
150147

151148
@Volatile private var alertHistoryRolloverPeriod = AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD.get(settings)
152-
153149
@Volatile private var findingHistoryRolloverPeriod = AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD.get(settings)
154150

155151
@Volatile private var alertHistoryRetentionPeriod = AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.get(settings)
156-
157152
@Volatile private var findingHistoryRetentionPeriod = AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD.get(settings)
158153

159154
@Volatile private var requestTimeout = AlertingSettings.REQUEST_TIMEOUT.get(settings)
@@ -454,25 +449,17 @@ class AlertIndices(
454449

455450
private fun rolloverAlertHistoryIndex() {
456451
rolloverIndex(
457-
alertHistoryIndexInitialized,
458-
ALERT_HISTORY_WRITE_INDEX,
459-
ALERT_HISTORY_INDEX_PATTERN,
460-
alertMapping(),
461-
alertHistoryMaxDocs,
462-
alertHistoryMaxAge,
463-
ALERT_HISTORY_WRITE_INDEX
452+
alertHistoryIndexInitialized, ALERT_HISTORY_WRITE_INDEX,
453+
ALERT_HISTORY_INDEX_PATTERN, alertMapping(),
454+
alertHistoryMaxDocs, alertHistoryMaxAge, ALERT_HISTORY_WRITE_INDEX
464455
)
465456
}
466457

467458
private fun rolloverFindingHistoryIndex() {
468459
rolloverIndex(
469-
findingHistoryIndexInitialized,
470-
FINDING_HISTORY_WRITE_INDEX,
471-
FINDING_HISTORY_INDEX_PATTERN,
472-
findingMapping(),
473-
findingHistoryMaxDocs,
474-
findingHistoryMaxAge,
475-
FINDING_HISTORY_WRITE_INDEX
460+
findingHistoryIndexInitialized, FINDING_HISTORY_WRITE_INDEX,
461+
FINDING_HISTORY_INDEX_PATTERN, findingMapping(),
462+
findingHistoryMaxDocs, findingHistoryMaxAge, FINDING_HISTORY_WRITE_INDEX
476463
)
477464
}
478465

@@ -488,7 +475,7 @@ class AlertIndices(
488475
clusterStateRequest,
489476
object : ActionListener<ClusterStateResponse> {
490477
override fun onResponse(clusterStateResponse: ClusterStateResponse) {
491-
if (!clusterStateResponse.state.metadata.indices.isEmpty) {
478+
if (clusterStateResponse.state.metadata.indices.isNotEmpty()) {
492479
val indicesToDelete = getIndicesToDelete(clusterStateResponse)
493480
logger.info("Deleting old $tag indices viz $indicesToDelete")
494481
deleteAllOldHistoryIndices(indicesToDelete)
@@ -523,7 +510,7 @@ class AlertIndices(
523510
): String? {
524511
val creationTime = indexMetadata.creationDate
525512
if ((Instant.now().toEpochMilli() - creationTime) > retentionPeriodMillis) {
526-
val alias = indexMetadata.aliases.firstOrNull { writeIndex == it.value.alias }
513+
val alias = indexMetadata.aliases.entries.firstOrNull { writeIndex == it.value.alias }
527514
if (alias != null) {
528515
if (historyEnabled) {
529516
// If the index has the write alias and history is enabled, don't delete the index

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
283283
)
284284
indexRequests.add(indexRequest)
285285
}
286+
log.debug("bulk inserting percolate [${queries.size}] queries")
286287
if (indexRequests.isNotEmpty()) {
287288
val bulkResponse: BulkResponse = client.suspendUntil {
288289
client.bulk(

0 commit comments

Comments
 (0)