Skip to content

Commit 5b0ffd6

Browse files
committed
optimize doc-level monitor execution workflow for datastreams #1302
Signed-off-by: Megha Goyal <[email protected]>
1 parent 40a1c4b commit 5b0ffd6

File tree

6 files changed

+562
-16
lines changed

6 files changed

+562
-16
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
115115

116116
try {
117117
// Resolve all passed indices to concrete indices
118-
val concreteIndices = IndexUtils.resolveAllIndices(
118+
val allConcreteIndices = IndexUtils.resolveAllIndices(
119119
docLevelMonitorInput.indices,
120120
monitorCtx.clusterService!!,
121121
monitorCtx.indexNameExpressionResolver!!
122122
)
123-
if (concreteIndices.isEmpty()) {
123+
if (allConcreteIndices.isEmpty()) {
124124
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
125125
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
126126
}
@@ -135,17 +135,32 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
135135

136136
// cleanup old indices that are not monitored anymore from the same monitor
137137
for (ind in updatedLastRunContext.keys) {
138-
if (!concreteIndices.contains(ind)) {
138+
if (!allConcreteIndices.contains(ind)) {
139139
updatedLastRunContext.remove(ind)
140140
}
141141
}
142142

143143
docLevelMonitorInput.indices.forEach { indexName ->
144-
val concreteIndices = IndexUtils.resolveAllIndices(
144+
var concreteIndices = IndexUtils.resolveAllIndices(
145145
listOf(indexName),
146146
monitorCtx.clusterService!!,
147147
monitorCtx.indexNameExpressionResolver!!
148148
)
149+
var lastWriteIndex: String? = null
150+
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
151+
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
152+
) {
153+
lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) }
154+
if (lastWriteIndex != null) {
155+
val lastWriteIndexCreationDate =
156+
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
157+
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
158+
concreteIndices,
159+
monitorCtx.clusterService!!.state(),
160+
lastWriteIndexCreationDate
161+
)
162+
}
163+
}
149164
val updatedIndexName = indexName.replace("*", "_")
150165
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
151166
monitorCtx.clusterService!!.state(),
@@ -170,7 +185,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
170185
monitorCtx,
171186
concreteIndexName
172187
) as MutableMap<String, Any>
173-
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
188+
189+
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
190+
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
191+
) {
192+
if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) {
193+
updatedLastRunContext.remove(lastWriteIndex)
194+
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
195+
}
196+
} else {
197+
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
198+
}
174199

175200
val count: Int = indexLastRunContext["shards_count"] as Int
176201
for (i: Int in 0 until count) {

alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.opensearch.alerting.model.MonitorMetadata
2828
import org.opensearch.alerting.opensearchapi.suspendUntil
2929
import org.opensearch.alerting.settings.AlertingSettings
3030
import org.opensearch.alerting.util.AlertingException
31+
import org.opensearch.alerting.util.IndexUtils
3132
import org.opensearch.client.Client
3233
import org.opensearch.cluster.service.ClusterService
3334
import org.opensearch.common.settings.Settings
@@ -206,11 +207,18 @@ object MonitorMetadataService :
206207
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
207208
try {
208209
if (index == null) return mutableMapOf()
209-
val getIndexRequest = GetIndexRequest().indices(index)
210-
val getIndexResponse: GetIndexResponse = client.suspendUntil {
211-
client.admin().indices().getIndex(getIndexRequest, it)
210+
val indices = mutableListOf<String>()
211+
if (IndexUtils.isAlias(index, clusterService.state()) ||
212+
IndexUtils.isDataStream(index, clusterService.state())
213+
) {
214+
IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) }
215+
} else {
216+
val getIndexRequest = GetIndexRequest().indices(index)
217+
val getIndexResponse: GetIndexResponse = client.suspendUntil {
218+
client.admin().indices().getIndex(getIndexRequest, it)
219+
}
220+
indices.addAll(getIndexResponse.indices())
212221
}
213-
val indices = getIndexResponse.indices()
214222

215223
indices.forEach { indexName ->
216224
if (!lastRunContext.containsKey(indexName)) {

alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,27 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
207207

208208
// Run through each backing index and apply appropriate mappings to query index
209209
indices.forEach { indexName ->
210-
val concreteIndices = IndexUtils.resolveAllIndices(
210+
var concreteIndices = IndexUtils.resolveAllIndices(
211211
listOf(indexName),
212212
monitorCtx.clusterService!!,
213213
monitorCtx.indexNameExpressionResolver!!
214214
)
215+
216+
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
217+
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
218+
) {
219+
val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) }
220+
if (lastWriteIndex != null) {
221+
val lastWriteIndexCreationDate =
222+
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
223+
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
224+
concreteIndices,
225+
monitorCtx.clusterService!!.state(),
226+
lastWriteIndexCreationDate
227+
)
228+
}
229+
}
230+
215231
val updatedIndexName = indexName.replace("*", "_")
216232
val updatedProperties = mutableMapOf<String, Any>()
217233
val allFlattenPaths = mutableSetOf<Pair<String, String>>()

alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.opensearch.alerting.alerts.AlertIndices
1313
import org.opensearch.alerting.core.ScheduledJobIndices
1414
import org.opensearch.client.IndicesAdminClient
1515
import org.opensearch.cluster.ClusterState
16+
import org.opensearch.cluster.metadata.IndexAbstraction
1617
import org.opensearch.cluster.metadata.IndexMetadata
1718
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
1819
import org.opensearch.cluster.service.ClusterService
@@ -151,5 +152,47 @@ class IndexUtils {
151152

152153
return result
153154
}
155+
156+
@JvmStatic
157+
fun isDataStream(name: String, clusterState: ClusterState): Boolean {
158+
return clusterState.metadata().dataStreams().containsKey(name)
159+
}
160+
161+
@JvmStatic
162+
fun isAlias(name: String, clusterState: ClusterState): Boolean {
163+
return clusterState.metadata().hasAlias(name)
164+
}
165+
166+
@JvmStatic
167+
fun getWriteIndex(index: String, clusterState: ClusterState): String? {
168+
if (isAlias(index, clusterState) || isDataStream(index, clusterState)) {
169+
val metadata = clusterState.metadata.indicesLookup[index]?.writeIndex
170+
if (metadata != null) {
171+
return metadata.index.name
172+
}
173+
}
174+
return null
175+
}
176+
177+
@JvmStatic
178+
fun getNewestIndicesByCreationDate(concreteIndices: List<String>, clusterState: ClusterState, thresholdDate: Long): List<String> {
179+
val filteredIndices = mutableListOf<String>()
180+
val lookup = clusterState.metadata().indicesLookup
181+
concreteIndices.forEach { indexName ->
182+
val index = lookup[indexName]
183+
val indexMetadata = clusterState.metadata.index(indexName)
184+
if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) {
185+
if (indexMetadata.creationDate >= thresholdDate) {
186+
filteredIndices.add(indexName)
187+
}
188+
}
189+
}
190+
return filteredIndices
191+
}
192+
193+
@JvmStatic
194+
fun getCreationDateForIndex(index: String, clusterState: ClusterState): Long {
195+
return clusterState.metadata.index(index).creationDate
196+
}
154197
}
155198
}

alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt

Lines changed: 152 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,13 @@ import java.time.format.DateTimeFormatter
7070
import java.time.temporal.ChronoUnit
7171
import java.util.Locale
7272
import java.util.UUID
73+
import java.util.stream.Collectors
7374
import javax.management.MBeanServerInvocationHandler
7475
import javax.management.ObjectName
7576
import javax.management.remote.JMXConnectorFactory
7677
import javax.management.remote.JMXServiceURL
78+
import kotlin.collections.ArrayList
79+
import kotlin.collections.HashMap
7780

7881
/**
7982
* Superclass for tests that interact with an external test cluster using OpenSearch's RestClient
@@ -774,14 +777,19 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
774777
private fun indexDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response {
775778
val requestBody = StringEntity(doc, APPLICATION_JSON)
776779
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
777-
val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody)
780+
val response = client.makeRequest("POST", "$index/_doc/$id?op_type=create", params, requestBody)
778781
assertTrue(
779782
"Unable to index doc: '${doc.take(15)}...' to index: '$index'",
780783
listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
781784
)
782785
return response
783786
}
784787

788+
protected fun createTestIndex(index: String, mapping: String?, alias: String): String {
789+
createIndex(index, Settings.EMPTY, mapping?.trimIndent(), alias)
790+
return index
791+
}
792+
785793
protected fun deleteDoc(index: String, id: String, refresh: Boolean = true): Response {
786794
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
787795
val response = client().makeRequest("DELETE", "$index/_doc/$id", params)
@@ -846,7 +854,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
846854
val indicesMap = mutableMapOf<String, Boolean>()
847855
val indicesJson = jsonBuilder().startObject().startArray("actions")
848856
indices.keys.map {
849-
val indexName = createTestIndex(index = it.lowercase(Locale.ROOT), mapping = "")
857+
val indexName = createTestIndex(index = it, mapping = "")
850858
val isWriteIndex = indices.getOrDefault(indexName, false)
851859
indicesMap[indexName] = isWriteIndex
852860
val indexMap = mapOf(
@@ -863,17 +871,155 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
863871
return mutableMapOf(alias to indicesMap)
864872
}
865873

874+
protected fun createDataStream(datastream: String, mappings: String?, useComponentTemplate: Boolean) {
875+
val indexPattern = "$datastream*"
876+
var componentTemplateMappings = "\"properties\": {" +
877+
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
878+
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
879+
"}"
880+
if (mappings != null) {
881+
componentTemplateMappings = mappings
882+
}
883+
if (useComponentTemplate) {
884+
// Setup index_template
885+
createComponentTemplateWithMappings(
886+
"my_ds_component_template-$datastream",
887+
componentTemplateMappings
888+
)
889+
}
890+
createComposableIndexTemplate(
891+
"my_index_template_ds-$datastream",
892+
listOf(indexPattern),
893+
(if (useComponentTemplate) "my_ds_component_template-$datastream" else null),
894+
mappings,
895+
true,
896+
0
897+
)
898+
createDataStream(datastream)
899+
}
900+
901+
protected fun createDataStream(datastream: String? = randomAlphaOfLength(10).lowercase(Locale.ROOT)) {
902+
client().makeRequest("PUT", "_data_stream/$datastream")
903+
}
904+
905+
protected fun deleteDataStream(datastream: String) {
906+
client().makeRequest("DELETE", "_data_stream/$datastream")
907+
}
908+
909+
protected fun createIndexAlias(alias: String, mappings: String?) {
910+
val indexPattern = "$alias*"
911+
var componentTemplateMappings = "\"properties\": {" +
912+
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
913+
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
914+
"}"
915+
if (mappings != null) {
916+
componentTemplateMappings = mappings
917+
}
918+
createComponentTemplateWithMappings(
919+
"my_alias_component_template-$alias",
920+
componentTemplateMappings
921+
)
922+
createComposableIndexTemplate(
923+
"my_index_template_alias-$alias",
924+
listOf(indexPattern),
925+
"my_alias_component_template-$alias",
926+
mappings,
927+
false,
928+
0
929+
)
930+
createTestIndex(
931+
"$alias-000001",
932+
null,
933+
"""
934+
"$alias": {
935+
"is_write_index": true
936+
}
937+
""".trimIndent()
938+
)
939+
}
940+
941+
protected fun deleteIndexAlias(alias: String) {
942+
client().makeRequest("DELETE", "$alias*/_alias/$alias")
943+
}
944+
945+
protected fun createComponentTemplateWithMappings(componentTemplateName: String, mappings: String?) {
946+
val body = """{"template" : { "mappings": {$mappings} }}"""
947+
client().makeRequest(
948+
"PUT",
949+
"_component_template/$componentTemplateName",
950+
emptyMap(),
951+
StringEntity(body, ContentType.APPLICATION_JSON),
952+
BasicHeader("Content-Type", "application/json")
953+
)
954+
}
955+
956+
protected fun createComposableIndexTemplate(
957+
templateName: String,
958+
indexPatterns: List<String>,
959+
componentTemplateName: String?,
960+
mappings: String?,
961+
isDataStream: Boolean,
962+
priority: Int
963+
) {
964+
var body = "{\n"
965+
if (isDataStream) {
966+
body += "\"data_stream\": { },"
967+
}
968+
body += "\"index_patterns\": [" +
969+
indexPatterns.stream().collect(
970+
Collectors.joining(",", "\"", "\"")
971+
) + "],"
972+
if (componentTemplateName == null) {
973+
body += "\"template\": {\"mappings\": {$mappings}},"
974+
}
975+
if (componentTemplateName != null) {
976+
body += "\"composed_of\": [\"$componentTemplateName\"],"
977+
}
978+
body += "\"priority\":$priority}"
979+
client().makeRequest(
980+
"PUT",
981+
"_index_template/$templateName",
982+
emptyMap(),
983+
StringEntity(body, APPLICATION_JSON),
984+
BasicHeader("Content-Type", "application/json")
985+
)
986+
}
987+
988+
protected fun getDatastreamWriteIndex(datastream: String): String {
989+
val response = client().makeRequest("GET", "_data_stream/$datastream", emptyMap(), null)
990+
var respAsMap = responseAsMap(response)
991+
if (respAsMap.containsKey("data_streams")) {
992+
respAsMap = (respAsMap["data_streams"] as ArrayList<HashMap<String, *>>)[0]
993+
val indices = respAsMap["indices"] as List<Map<String, Any>>
994+
val index = indices.last()
995+
return index["index_name"] as String
996+
} else {
997+
respAsMap = respAsMap[datastream] as Map<String, Object>
998+
}
999+
val indices = respAsMap["indices"] as Array<String>
1000+
return indices.last()
1001+
}
1002+
1003+
protected fun rolloverDatastream(datastream: String) {
1004+
client().makeRequest(
1005+
"POST",
1006+
datastream + "/_rollover",
1007+
emptyMap(),
1008+
null
1009+
)
1010+
}
1011+
8661012
protected fun randomAliasIndices(
8671013
alias: String,
8681014
num: Int = randomIntBetween(1, 10),
869-
includeWriteIndex: Boolean = true
1015+
includeWriteIndex: Boolean = true,
8701016
): Map<String, Boolean> {
8711017
val indices = mutableMapOf<String, Boolean>()
872-
val writeIndex = randomIntBetween(0, num)
1018+
val writeIndex = randomIntBetween(0, num - 1)
8731019
for (i: Int in 0 until num) {
874-
var indexName = randomAlphaOfLength(10)
1020+
var indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
8751021
while (indexName.equals(alias) || indices.containsKey(indexName))
876-
indexName = randomAlphaOfLength(10)
1022+
indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
8771023
indices[indexName] = includeWriteIndex && i == writeIndex
8781024
}
8791025
return indices

0 commit comments

Comments
 (0)