Skip to content

Commit eb9951f

Browse files
petardzlezzago
authored andcommitted
[Backport 2.x] QueryIndex rollover when field mapping limit is reached (opensearch-project#729)
Signed-off-by: Petar Dzepina <[email protected]>
1 parent ea34a55 commit eb9951f

File tree

16 files changed

+674
-211
lines changed

16 files changed

+674
-211
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,14 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
238238
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
239239
this.threadPool = threadPool
240240
this.clusterService = clusterService
241+
242+
MonitorMetadataService.initialize(
243+
client,
244+
clusterService,
245+
xContentRegistry,
246+
settings
247+
)
248+
241249
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
242250
}
243251

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.alerting
77

88
import org.apache.logging.log4j.LogManager
99
import org.opensearch.ExceptionsHelper
10+
import org.opensearch.OpenSearchStatusException
1011
import org.opensearch.action.admin.indices.get.GetIndexRequest
1112
import org.opensearch.action.admin.indices.get.GetIndexResponse
1213
import org.opensearch.action.index.IndexRequest
@@ -21,13 +22,13 @@ import org.opensearch.alerting.core.model.DocLevelQuery
2122
import org.opensearch.alerting.core.model.ScheduledJob
2223
import org.opensearch.alerting.model.ActionExecutionResult
2324
import org.opensearch.alerting.model.Alert
24-
import org.opensearch.alerting.model.AlertingConfigAccessor.Companion.getMonitorMetadata
2525
import org.opensearch.alerting.model.DocumentExecutionContext
2626
import org.opensearch.alerting.model.DocumentLevelTrigger
2727
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
2828
import org.opensearch.alerting.model.Finding
2929
import org.opensearch.alerting.model.InputRunResults
3030
import org.opensearch.alerting.model.Monitor
31+
import org.opensearch.alerting.model.MonitorMetadata
3132
import org.opensearch.alerting.model.MonitorRunResult
3233
import org.opensearch.alerting.model.action.PerAlertActionScope
3334
import org.opensearch.alerting.opensearchapi.string
@@ -36,7 +37,6 @@ import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
3637
import org.opensearch.alerting.util.AlertingException
3738
import org.opensearch.alerting.util.defaultToPerExecutionAction
3839
import org.opensearch.alerting.util.getActionExecutionPolicy
39-
import org.opensearch.alerting.util.updateMonitorMetadata
4040
import org.opensearch.client.Client
4141
import org.opensearch.cluster.routing.ShardRouting
4242
import org.opensearch.cluster.service.ClusterService
@@ -55,7 +55,6 @@ import org.opensearch.search.sort.SortOrder
5555
import java.io.IOException
5656
import java.time.Instant
5757
import java.util.UUID
58-
import kotlin.collections.HashMap
5958
import kotlin.math.max
6059

6160
object DocumentLevelMonitorRunner : MonitorRunner() {
@@ -88,22 +87,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
8887
return monitorResult.copy(error = AlertingException.wrap(e))
8988
}
9089

90+
var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(monitor, createWithRunContext = false)
91+
9192
monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex()
9293
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
9394
monitor = monitor,
9495
monitorId = monitor.id,
96+
monitorMetadata,
9597
indexTimeout = monitorCtx.indexTimeout!!
9698
)
9799

98100
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
99101
val index = docLevelMonitorInput.indices[0]
100102
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries
101103

102-
var monitorMetadata = getMonitorMetadata(monitorCtx.client!!, monitorCtx.xContentRegistry!!, "${monitor.id}-metadata")
103-
if (monitorMetadata == null) {
104-
monitorMetadata = createMonitorMetadata(monitor.id)
105-
}
106-
107104
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
108105
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
109106
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>
@@ -132,7 +129,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
132129
// Prepare lastRunContext for each index
133130
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
134131
val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse)
135-
createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, indexName, indexCreatedRecently)
132+
MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently)
136133
}
137134

138135
// Prepare updatedLastRunContext for each index
@@ -160,7 +157,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
160157
val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName)
161158

162159
if (matchingDocs.isNotEmpty()) {
163-
val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor, indexName)
160+
val matchedQueriesForDocs = getMatchedQueries(
161+
monitorCtx,
162+
matchingDocs.map { it.second },
163+
monitor,
164+
monitorMetadata,
165+
indexName
166+
)
164167

165168
matchedQueriesForDocs.forEach { hit ->
166169
val id = hit.id.replace("_${indexName}_${monitor.id}", "")
@@ -214,7 +217,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
214217

215218
// Don't update monitor if this is a test monitor
216219
if (!isTempMonitor) {
217-
updateMonitorMetadata(monitorCtx.client!!, monitorCtx.settings!!, monitorMetadata.copy(lastRunContext = updatedLastRunContext))
220+
MonitorMetadataService.upsertMetadata(
221+
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
222+
true
223+
)
218224
}
219225

220226
// TODO: Update the Document as part of the Trigger and return back the trigger action result
@@ -506,6 +512,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
506512
monitorCtx: MonitorRunnerExecutionContext,
507513
docs: List<BytesReference>,
508514
monitor: Monitor,
515+
monitorMetadata: MonitorMetadata,
509516
index: String
510517
): SearchHits {
511518
val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.matchQuery("index", index))
@@ -516,7 +523,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
516523
}
517524
boolQueryBuilder.filter(percolateQueryBuilder)
518525

519-
val searchRequest = SearchRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
526+
val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id]
527+
if (queryIndex == null) {
528+
val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" +
529+
" sourceIndex:$index queryIndex:${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}"
530+
logger.error(message)
531+
throw AlertingException.wrap(
532+
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
533+
)
534+
}
535+
val searchRequest = SearchRequest(queryIndex)
520536
val searchSourceBuilder = SearchSourceBuilder()
521537
searchSourceBuilder.query(boolQueryBuilder)
522538
searchRequest.source(searchSourceBuilder)
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.alerting
7+
8+
import kotlinx.coroutines.CoroutineName
9+
import kotlinx.coroutines.CoroutineScope
10+
import kotlinx.coroutines.Dispatchers
11+
import kotlinx.coroutines.SupervisorJob
12+
import org.apache.logging.log4j.LogManager
13+
import org.opensearch.ExceptionsHelper
14+
import org.opensearch.OpenSearchSecurityException
15+
import org.opensearch.action.DocWriteRequest
16+
import org.opensearch.action.DocWriteResponse
17+
import org.opensearch.action.admin.indices.get.GetIndexRequest
18+
import org.opensearch.action.admin.indices.get.GetIndexResponse
19+
import org.opensearch.action.admin.indices.stats.IndicesStatsAction
20+
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
21+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
22+
import org.opensearch.action.get.GetRequest
23+
import org.opensearch.action.get.GetResponse
24+
import org.opensearch.action.index.IndexRequest
25+
import org.opensearch.action.index.IndexResponse
26+
import org.opensearch.action.support.WriteRequest
27+
import org.opensearch.alerting.core.model.DocLevelMonitorInput
28+
import org.opensearch.alerting.core.model.ScheduledJob
29+
import org.opensearch.alerting.model.Monitor
30+
import org.opensearch.alerting.model.MonitorMetadata
31+
import org.opensearch.alerting.opensearchapi.suspendUntil
32+
import org.opensearch.alerting.settings.AlertingSettings
33+
import org.opensearch.alerting.util.AlertingException
34+
import org.opensearch.client.Client
35+
import org.opensearch.cluster.service.ClusterService
36+
import org.opensearch.common.settings.Settings
37+
import org.opensearch.common.unit.TimeValue
38+
import org.opensearch.common.xcontent.LoggingDeprecationHandler
39+
import org.opensearch.common.xcontent.NamedXContentRegistry
40+
import org.opensearch.common.xcontent.ToXContent
41+
import org.opensearch.common.xcontent.XContentFactory
42+
import org.opensearch.common.xcontent.XContentHelper
43+
import org.opensearch.common.xcontent.XContentParser
44+
import org.opensearch.common.xcontent.XContentParserUtils
45+
import org.opensearch.common.xcontent.XContentType
46+
import org.opensearch.index.seqno.SequenceNumbers
47+
import org.opensearch.rest.RestStatus
48+
import org.opensearch.transport.RemoteTransportException
49+
50+
private val log = LogManager.getLogger(MonitorMetadataService::class.java)
51+
52+
object MonitorMetadataService :
53+
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("MonitorMetadataService")) {
54+
55+
private lateinit var client: Client
56+
private lateinit var xContentRegistry: NamedXContentRegistry
57+
private lateinit var clusterService: ClusterService
58+
private lateinit var settings: Settings
59+
60+
@Volatile private lateinit var indexTimeout: TimeValue
61+
62+
fun initialize(
63+
client: Client,
64+
clusterService: ClusterService,
65+
xContentRegistry: NamedXContentRegistry,
66+
settings: Settings
67+
) {
68+
this.clusterService = clusterService
69+
this.client = client
70+
this.xContentRegistry = xContentRegistry
71+
this.settings = settings
72+
this.indexTimeout = AlertingSettings.INDEX_TIMEOUT.get(settings)
73+
this.clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.INDEX_TIMEOUT) { indexTimeout = it }
74+
}
75+
76+
@Suppress("ComplexMethod", "ReturnCount")
77+
suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata {
78+
try {
79+
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
80+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
81+
.source(metadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
82+
.id(metadata.id)
83+
.routing(metadata.monitorId)
84+
.setIfSeqNo(metadata.seqNo)
85+
.setIfPrimaryTerm(metadata.primaryTerm)
86+
.timeout(indexTimeout)
87+
88+
if (updating) {
89+
indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm)
90+
} else {
91+
indexRequest.opType(DocWriteRequest.OpType.CREATE)
92+
}
93+
val response: IndexResponse = client.suspendUntil { index(indexRequest, it) }
94+
when (response.result) {
95+
DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> {
96+
val failureReason = "The upsert metadata call failed with a ${response.result?.lowercase} result"
97+
log.error(failureReason)
98+
throw AlertingException(failureReason, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureReason))
99+
}
100+
DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> {
101+
log.debug("Successfully upserted MonitorMetadata:${metadata.id} ")
102+
}
103+
}
104+
return metadata.copy(
105+
seqNo = response.seqNo,
106+
primaryTerm = response.primaryTerm
107+
)
108+
} catch (e: Exception) {
109+
throw AlertingException.wrap(e)
110+
}
111+
}
112+
113+
suspend fun getOrCreateMetadata(monitor: Monitor, createWithRunContext: Boolean = true): Pair<MonitorMetadata, Boolean> {
114+
try {
115+
val created = true
116+
val metadata = getMetadata(monitor)
117+
return if (metadata != null) {
118+
metadata to !created
119+
} else {
120+
val newMetadata = createNewMetadata(monitor, createWithRunContext = createWithRunContext)
121+
upsertMetadata(newMetadata, updating = false) to created
122+
}
123+
} catch (e: Exception) {
124+
throw AlertingException.wrap(e)
125+
}
126+
}
127+
128+
suspend fun getMetadata(monitor: Monitor): MonitorMetadata? {
129+
try {
130+
val metadataId = MonitorMetadata.getId(monitor)
131+
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, metadataId).routing(monitor.id)
132+
133+
val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
134+
return if (getResponse.isExists) {
135+
val xcp = XContentHelper.createParser(
136+
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
137+
getResponse.sourceAsBytesRef, XContentType.JSON
138+
)
139+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
140+
MonitorMetadata.parse(xcp)
141+
} else {
142+
null
143+
}
144+
} catch (e: Exception) {
145+
if (e.message?.contains("no such index") == true) {
146+
return null
147+
} else {
148+
throw AlertingException.wrap(e)
149+
}
150+
}
151+
}
152+
153+
suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
154+
try {
155+
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
156+
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
157+
else null
158+
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
159+
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
160+
else null
161+
if (runContext != null) {
162+
return metadata.copy(
163+
lastRunContext = runContext
164+
)
165+
} else {
166+
return metadata
167+
}
168+
} catch (e: Exception) {
169+
throw AlertingException.wrap(e)
170+
}
171+
}
172+
173+
private suspend fun createNewMetadata(monitor: Monitor, createWithRunContext: Boolean): MonitorMetadata {
174+
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
175+
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
176+
else null
177+
val runContext =
178+
if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
179+
createFullRunContext(monitorIndex)
180+
else emptyMap()
181+
return MonitorMetadata(
182+
id = "${monitor.id}-metadata",
183+
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO,
184+
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
185+
monitorId = monitor.id,
186+
lastActionExecutionTimes = emptyList(),
187+
lastRunContext = runContext,
188+
sourceToQueryIndexMapping = mutableMapOf()
189+
)
190+
}
191+
192+
private suspend fun createFullRunContext(
193+
index: String?,
194+
existingRunContext: MutableMap<String, MutableMap<String, Any>>? = null
195+
): MutableMap<String, MutableMap<String, Any>> {
196+
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
197+
try {
198+
if (index == null) return mutableMapOf()
199+
val getIndexRequest = GetIndexRequest().indices(index)
200+
val getIndexResponse: GetIndexResponse = client.suspendUntil {
201+
client.admin().indices().getIndex(getIndexRequest, it)
202+
}
203+
val indices = getIndexResponse.indices()
204+
205+
indices.forEach { indexName ->
206+
if (!lastRunContext.containsKey(indexName)) {
207+
lastRunContext[indexName] = createRunContextForIndex(index)
208+
}
209+
}
210+
} catch (e: RemoteTransportException) {
211+
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
212+
throw AlertingException("Failed fetching index stats", RestStatus.INTERNAL_SERVER_ERROR, unwrappedException)
213+
} catch (e: OpenSearchSecurityException) {
214+
throw AlertingException(
215+
"Failed fetching index stats - missing required index permissions: ${e.localizedMessage}",
216+
RestStatus.INTERNAL_SERVER_ERROR,
217+
e
218+
)
219+
} catch (e: Exception) {
220+
throw AlertingException("Failed fetching index stats", RestStatus.INTERNAL_SERVER_ERROR, e)
221+
}
222+
return lastRunContext
223+
}
224+
225+
suspend fun createRunContextForIndex(index: String, createdRecently: Boolean = false): MutableMap<String, Any> {
226+
val request = IndicesStatsRequest().indices(index).clear()
227+
val response: IndicesStatsResponse = client.suspendUntil { execute(IndicesStatsAction.INSTANCE, request, it) }
228+
if (response.status != RestStatus.OK) {
229+
val errorMessage = "Failed fetching index stats for index:$index"
230+
throw AlertingException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(errorMessage))
231+
}
232+
val shards = response.shards.filter { it.shardRouting.primary() && it.shardRouting.active() }
233+
val lastRunContext = HashMap<String, Any>()
234+
lastRunContext["index"] = index
235+
val count = shards.size
236+
lastRunContext["shards_count"] = count
237+
238+
for (shard in shards) {
239+
lastRunContext[shard.shardRouting.id.toString()] =
240+
if (createdRecently) -1L
241+
else shard.seqNoStats?.globalCheckpoint ?: SequenceNumbers.UNASSIGNED_SEQ_NO
242+
}
243+
return lastRunContext
244+
}
245+
}

0 commit comments

Comments
 (0)