Skip to content

Commit 96fc035

Browse files
[Backport 2.11] optimize doc-level monitor execution workflow for datastreams (#1323)
Signed-off-by: Subhobrata Dey <[email protected]>
1 parent 30f7360 commit 96fc035

File tree

10 files changed

+567
-24
lines changed

10 files changed

+567
-24
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
120120

121121
try {
122122
// Resolve all passed indices to concrete indices
123-
val concreteIndices = IndexUtils.resolveAllIndices(
123+
val allConcreteIndices = IndexUtils.resolveAllIndices(
124124
docLevelMonitorInput.indices,
125125
monitorCtx.clusterService!!,
126126
monitorCtx.indexNameExpressionResolver!!
127127
)
128-
if (concreteIndices.isEmpty()) {
128+
if (allConcreteIndices.isEmpty()) {
129129
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
130130
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
131131
}
@@ -141,7 +141,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
141141
// cleanup old indices that are not monitored anymore from the same monitor
142142
val runContextKeys = updatedLastRunContext.keys.toMutableSet()
143143
for (ind in runContextKeys) {
144-
if (!concreteIndices.contains(ind)) {
144+
if (!allConcreteIndices.contains(ind)) {
145145
updatedLastRunContext.remove(ind)
146146
}
147147
}
@@ -150,11 +150,26 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
150150
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex
151151

152152
docLevelMonitorInput.indices.forEach { indexName ->
153-
val concreteIndices = IndexUtils.resolveAllIndices(
153+
var concreteIndices = IndexUtils.resolveAllIndices(
154154
listOf(indexName),
155155
monitorCtx.clusterService!!,
156156
monitorCtx.indexNameExpressionResolver!!
157157
)
158+
var lastWriteIndex: String? = null
159+
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
160+
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
161+
) {
162+
lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) }
163+
if (lastWriteIndex != null) {
164+
val lastWriteIndexCreationDate =
165+
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
166+
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
167+
concreteIndices,
168+
monitorCtx.clusterService!!.state(),
169+
lastWriteIndexCreationDate
170+
)
171+
}
172+
}
158173
val updatedIndexName = indexName.replace("*", "_")
159174
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
160175
monitorCtx.clusterService!!.state(),
@@ -179,7 +194,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
179194
monitorCtx,
180195
concreteIndexName
181196
) as MutableMap<String, Any>
182-
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
197+
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
198+
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
199+
) {
200+
if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) {
201+
updatedLastRunContext.remove(lastWriteIndex)
202+
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
203+
}
204+
} else {
205+
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
206+
}
183207

184208
val count: Int = indexLastRunContext["shards_count"] as Int
185209
for (i: Int in 0 until count) {

alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.opensearch.alerting.model.MonitorMetadata
2828
import org.opensearch.alerting.opensearchapi.suspendUntil
2929
import org.opensearch.alerting.settings.AlertingSettings
3030
import org.opensearch.alerting.util.AlertingException
31+
import org.opensearch.alerting.util.IndexUtils
3132
import org.opensearch.client.Client
3233
import org.opensearch.cluster.service.ClusterService
3334
import org.opensearch.common.settings.Settings
@@ -216,11 +217,19 @@ object MonitorMetadataService :
216217
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
217218
try {
218219
if (index == null) return mutableMapOf()
219-
val getIndexRequest = GetIndexRequest().indices(index)
220-
val getIndexResponse: GetIndexResponse = client.suspendUntil {
221-
client.admin().indices().getIndex(getIndexRequest, it)
220+
221+
val indices = mutableListOf<String>()
222+
if (IndexUtils.isAlias(index, clusterService.state()) ||
223+
IndexUtils.isDataStream(index, clusterService.state())
224+
) {
225+
IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) }
226+
} else {
227+
val getIndexRequest = GetIndexRequest().indices(index)
228+
val getIndexResponse: GetIndexResponse = client.suspendUntil {
229+
client.admin().indices().getIndex(getIndexRequest, it)
230+
}
231+
indices.addAll(getIndexResponse.indices())
222232
}
223-
val indices = getIndexResponse.indices()
224233

225234
indices.forEach { indexName ->
226235
if (!lastRunContext.containsKey(indexName)) {

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,25 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
207207

208208
// Run through each backing index and apply appropriate mappings to query index
209209
indices.forEach { indexName ->
210-
val concreteIndices = IndexUtils.resolveAllIndices(
210+
var concreteIndices = IndexUtils.resolveAllIndices(
211211
listOf(indexName),
212212
monitorCtx.clusterService!!,
213213
monitorCtx.indexNameExpressionResolver!!
214214
)
215+
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
216+
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
217+
) {
218+
val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) }
219+
if (lastWriteIndex != null) {
220+
val lastWriteIndexCreationDate =
221+
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
222+
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
223+
concreteIndices,
224+
monitorCtx.clusterService!!.state(),
225+
lastWriteIndexCreationDate
226+
)
227+
}
228+
}
215229
val updatedIndexName = indexName.replace("*", "_")
216230
val updatedProperties = mutableMapOf<String, Any>()
217231
val allFlattenPaths = mutableSetOf<Pair<String, String>>()

alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.opensearch.alerting.alerts.AlertIndices
1212
import org.opensearch.alerting.core.ScheduledJobIndices
1313
import org.opensearch.client.IndicesAdminClient
1414
import org.opensearch.cluster.ClusterState
15+
import org.opensearch.cluster.metadata.IndexAbstraction
1516
import org.opensearch.cluster.metadata.IndexMetadata
1617
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
1718
import org.opensearch.cluster.service.ClusterService
@@ -153,5 +154,47 @@ class IndexUtils {
153154

154155
return result
155156
}
157+
158+
@JvmStatic
159+
fun isDataStream(name: String, clusterState: ClusterState): Boolean {
160+
return clusterState.metadata().dataStreams().containsKey(name)
161+
}
162+
163+
@JvmStatic
164+
fun isAlias(name: String, clusterState: ClusterState): Boolean {
165+
return clusterState.metadata().hasAlias(name)
166+
}
167+
168+
@JvmStatic
169+
fun getWriteIndex(index: String, clusterState: ClusterState): String? {
170+
if (isAlias(index, clusterState) || isDataStream(index, clusterState)) {
171+
val metadata = clusterState.metadata.indicesLookup[index]?.writeIndex
172+
if (metadata != null) {
173+
return metadata.index.name
174+
}
175+
}
176+
return null
177+
}
178+
179+
@JvmStatic
180+
fun getNewestIndicesByCreationDate(concreteIndices: List<String>, clusterState: ClusterState, thresholdDate: Long): List<String> {
181+
val filteredIndices = mutableListOf<String>()
182+
val lookup = clusterState.metadata().indicesLookup
183+
concreteIndices.forEach { indexName ->
184+
val index = lookup[indexName]
185+
val indexMetadata = clusterState.metadata.index(indexName)
186+
if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) {
187+
if (indexMetadata.creationDate >= thresholdDate) {
188+
filteredIndices.add(indexName)
189+
}
190+
}
191+
}
192+
return filteredIndices
193+
}
194+
195+
@JvmStatic
196+
fun getCreationDateForIndex(index: String, clusterState: ClusterState): Long {
197+
return clusterState.metadata.index(index).creationDate
198+
}
156199
}
157200
}

alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt

Lines changed: 150 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ import javax.management.MBeanServerInvocationHandler
7676
import javax.management.ObjectName
7777
import javax.management.remote.JMXConnectorFactory
7878
import javax.management.remote.JMXServiceURL
79+
import kotlin.collections.ArrayList
80+
import kotlin.collections.HashMap
7981

8082
/**
8183
* Superclass for tests that interact with an external test cluster using OpenSearch's RestClient
@@ -909,7 +911,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
909911
private fun indexDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response {
910912
val requestBody = StringEntity(doc, APPLICATION_JSON)
911913
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
912-
val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody)
914+
val response = client.makeRequest("POST", "$index/_doc/$id?op_type=create", params, requestBody)
913915
assertTrue(
914916
"Unable to index doc: '${doc.take(15)}...' to index: '$index'",
915917
listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
@@ -945,6 +947,11 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
945947
return index
946948
}
947949

950+
protected fun createTestIndex(index: String, mapping: String?, alias: String): String {
951+
createIndex(index, Settings.EMPTY, mapping?.trimIndent(), alias)
952+
return index
953+
}
954+
948955
protected fun createTestConfigIndex(index: String = "." + randomAlphaOfLength(10).lowercase(Locale.ROOT)): String {
949956
try {
950957
createIndex(
@@ -981,7 +988,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
981988
val indicesMap = mutableMapOf<String, Boolean>()
982989
val indicesJson = jsonBuilder().startObject().startArray("actions")
983990
indices.keys.map {
984-
val indexName = createTestIndex(index = it.lowercase(Locale.ROOT), mapping = "")
991+
val indexName = createTestIndex(index = it, mapping = "")
985992
val isWriteIndex = indices.getOrDefault(indexName, false)
986993
indicesMap[indexName] = isWriteIndex
987994
val indexMap = mapOf(
@@ -998,17 +1005,155 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
9981005
return mutableMapOf(alias to indicesMap)
9991006
}
10001007

1008+
protected fun createDataStream(datastream: String, mappings: String?, useComponentTemplate: Boolean) {
1009+
val indexPattern = "$datastream*"
1010+
var componentTemplateMappings = "\"properties\": {" +
1011+
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
1012+
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
1013+
"}"
1014+
if (mappings != null) {
1015+
componentTemplateMappings = mappings
1016+
}
1017+
if (useComponentTemplate) {
1018+
// Setup index_template
1019+
createComponentTemplateWithMappings(
1020+
"my_ds_component_template-$datastream",
1021+
componentTemplateMappings
1022+
)
1023+
}
1024+
createComposableIndexTemplate(
1025+
"my_index_template_ds-$datastream",
1026+
listOf(indexPattern),
1027+
(if (useComponentTemplate) "my_ds_component_template-$datastream" else null),
1028+
mappings,
1029+
true,
1030+
0
1031+
)
1032+
createDataStream(datastream)
1033+
}
1034+
1035+
protected fun createDataStream(datastream: String? = randomAlphaOfLength(10).lowercase(Locale.ROOT)) {
1036+
client().makeRequest("PUT", "_data_stream/$datastream")
1037+
}
1038+
1039+
protected fun deleteDataStream(datastream: String) {
1040+
client().makeRequest("DELETE", "_data_stream/$datastream")
1041+
}
1042+
1043+
protected fun createIndexAlias(alias: String, mappings: String?) {
1044+
val indexPattern = "$alias*"
1045+
var componentTemplateMappings = "\"properties\": {" +
1046+
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
1047+
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
1048+
"}"
1049+
if (mappings != null) {
1050+
componentTemplateMappings = mappings
1051+
}
1052+
createComponentTemplateWithMappings(
1053+
"my_alias_component_template-$alias",
1054+
componentTemplateMappings
1055+
)
1056+
createComposableIndexTemplate(
1057+
"my_index_template_alias-$alias",
1058+
listOf(indexPattern),
1059+
"my_alias_component_template-$alias",
1060+
mappings,
1061+
false,
1062+
0
1063+
)
1064+
createTestIndex(
1065+
"$alias-000001",
1066+
null,
1067+
"""
1068+
"$alias": {
1069+
"is_write_index": true
1070+
}
1071+
""".trimIndent()
1072+
)
1073+
}
1074+
1075+
protected fun deleteIndexAlias(alias: String) {
1076+
client().makeRequest("DELETE", "$alias*/_alias/$alias")
1077+
}
1078+
1079+
protected fun createComponentTemplateWithMappings(componentTemplateName: String, mappings: String?) {
1080+
val body = """{"template" : { "mappings": {$mappings} }}"""
1081+
client().makeRequest(
1082+
"PUT",
1083+
"_component_template/$componentTemplateName",
1084+
emptyMap(),
1085+
StringEntity(body, ContentType.APPLICATION_JSON),
1086+
BasicHeader("Content-Type", "application/json")
1087+
)
1088+
}
1089+
1090+
protected fun createComposableIndexTemplate(
1091+
templateName: String,
1092+
indexPatterns: List<String>,
1093+
componentTemplateName: String?,
1094+
mappings: String?,
1095+
isDataStream: Boolean,
1096+
priority: Int
1097+
) {
1098+
var body = "{\n"
1099+
if (isDataStream) {
1100+
body += "\"data_stream\": { },"
1101+
}
1102+
body += "\"index_patterns\": [" +
1103+
indexPatterns.stream().collect(
1104+
Collectors.joining(",", "\"", "\"")
1105+
) + "],"
1106+
if (componentTemplateName == null) {
1107+
body += "\"template\": {\"mappings\": {$mappings}},"
1108+
}
1109+
if (componentTemplateName != null) {
1110+
body += "\"composed_of\": [\"$componentTemplateName\"],"
1111+
}
1112+
body += "\"priority\":$priority}"
1113+
client().makeRequest(
1114+
"PUT",
1115+
"_index_template/$templateName",
1116+
emptyMap(),
1117+
StringEntity(body, APPLICATION_JSON),
1118+
BasicHeader("Content-Type", "application/json")
1119+
)
1120+
}
1121+
1122+
protected fun getDatastreamWriteIndex(datastream: String): String {
1123+
val response = client().makeRequest("GET", "_data_stream/$datastream", emptyMap(), null)
1124+
var respAsMap = responseAsMap(response)
1125+
if (respAsMap.containsKey("data_streams")) {
1126+
respAsMap = (respAsMap["data_streams"] as ArrayList<HashMap<String, *>>)[0]
1127+
val indices = respAsMap["indices"] as List<Map<String, Any>>
1128+
val index = indices.last()
1129+
return index["index_name"] as String
1130+
} else {
1131+
respAsMap = respAsMap[datastream] as Map<String, Object>
1132+
}
1133+
val indices = respAsMap["indices"] as Array<String>
1134+
return indices.last()
1135+
}
1136+
1137+
protected fun rolloverDatastream(datastream: String) {
1138+
client().makeRequest(
1139+
"POST",
1140+
datastream + "/_rollover",
1141+
emptyMap(),
1142+
null
1143+
)
1144+
}
1145+
10011146
protected fun randomAliasIndices(
10021147
alias: String,
10031148
num: Int = randomIntBetween(1, 10),
10041149
includeWriteIndex: Boolean = true,
10051150
): Map<String, Boolean> {
10061151
val indices = mutableMapOf<String, Boolean>()
1007-
val writeIndex = randomIntBetween(0, num)
1152+
val writeIndex = randomIntBetween(0, num - 1)
10081153
for (i: Int in 0 until num) {
1009-
var indexName = randomAlphaOfLength(10)
1154+
var indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
10101155
while (indexName.equals(alias) || indices.containsKey(indexName))
1011-
indexName = randomAlphaOfLength(10)
1156+
indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
10121157
indices[indexName] = includeWriteIndex && i == writeIndex
10131158
}
10141159
return indices

0 commit comments

Comments
 (0)