@@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob
12
12
import org.apache.logging.log4j.LogManager
13
13
import org.opensearch.ExceptionsHelper
14
14
import org.opensearch.OpenSearchSecurityException
15
+ import org.opensearch.OpenSearchStatusException
15
16
import org.opensearch.action.DocWriteRequest
16
17
import org.opensearch.action.DocWriteResponse
17
18
import org.opensearch.action.admin.indices.get.GetIndexRequest
@@ -78,35 +79,51 @@ object MonitorMetadataService :
78
79
@Suppress(" ComplexMethod" , " ReturnCount" )
79
80
suspend fun upsertMetadata (metadata : MonitorMetadata , updating : Boolean ): MonitorMetadata {
80
81
try {
81
- val indexRequest = IndexRequest (ScheduledJob .SCHEDULED_JOBS_INDEX )
82
- .setRefreshPolicy(WriteRequest .RefreshPolicy .IMMEDIATE )
83
- .source(metadata.toXContent(XContentFactory .jsonBuilder(), ToXContent .MapParams (mapOf (" with_type" to " true" ))))
84
- .id(metadata.id)
85
- .routing(metadata.monitorId)
86
- .setIfSeqNo(metadata.seqNo)
87
- .setIfPrimaryTerm(metadata.primaryTerm)
88
- .timeout(indexTimeout)
82
+ if (clusterService.state().routingTable.hasIndex(ScheduledJob .SCHEDULED_JOBS_INDEX )) {
83
+ val indexRequest = IndexRequest (ScheduledJob .SCHEDULED_JOBS_INDEX )
84
+ .setRefreshPolicy(WriteRequest .RefreshPolicy .IMMEDIATE )
85
+ .source(
86
+ metadata.toXContent(
87
+ XContentFactory .jsonBuilder(),
88
+ ToXContent .MapParams (mapOf (" with_type" to " true" ))
89
+ )
90
+ )
91
+ .id(metadata.id)
92
+ .routing(metadata.monitorId)
93
+ .setIfSeqNo(metadata.seqNo)
94
+ .setIfPrimaryTerm(metadata.primaryTerm)
95
+ .timeout(indexTimeout)
89
96
90
- if (updating) {
91
- indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm)
92
- } else {
93
- indexRequest.opType(DocWriteRequest .OpType .CREATE )
94
- }
95
- val response: IndexResponse = client.suspendUntil { index(indexRequest, it) }
96
- when (response.result) {
97
- DocWriteResponse .Result .DELETED , DocWriteResponse .Result .NOOP , DocWriteResponse .Result .NOT_FOUND , null -> {
98
- val failureReason = " The upsert metadata call failed with a ${response.result?.lowercase} result"
99
- log.error(failureReason)
100
- throw AlertingException (failureReason, RestStatus .INTERNAL_SERVER_ERROR , IllegalStateException (failureReason))
97
+ if (updating) {
98
+ indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm)
99
+ } else {
100
+ indexRequest.opType(DocWriteRequest .OpType .CREATE )
101
101
}
102
- DocWriteResponse .Result .CREATED , DocWriteResponse .Result .UPDATED -> {
103
- log.debug(" Successfully upserted MonitorMetadata:${metadata.id} " )
102
+ val response: IndexResponse = client.suspendUntil { index(indexRequest, it) }
103
+ when (response.result) {
104
+ DocWriteResponse .Result .DELETED , DocWriteResponse .Result .NOOP , DocWriteResponse .Result .NOT_FOUND , null -> {
105
+ val failureReason =
106
+ " The upsert metadata call failed with a ${response.result?.lowercase} result"
107
+ log.error(failureReason)
108
+ throw AlertingException (
109
+ failureReason,
110
+ RestStatus .INTERNAL_SERVER_ERROR ,
111
+ IllegalStateException (failureReason)
112
+ )
113
+ }
114
+
115
+ DocWriteResponse .Result .CREATED , DocWriteResponse .Result .UPDATED -> {
116
+ log.debug(" Successfully upserted MonitorMetadata:${metadata.id} " )
117
+ }
104
118
}
119
+ return metadata.copy(
120
+ seqNo = response.seqNo,
121
+ primaryTerm = response.primaryTerm
122
+ )
123
+ } else {
124
+ val failureReason = " Job index ${ScheduledJob .SCHEDULED_JOBS_INDEX } does not exist to update monitor metadata"
125
+ throw OpenSearchStatusException (failureReason, RestStatus .INTERNAL_SERVER_ERROR )
105
126
}
106
- return metadata.copy(
107
- seqNo = response.seqNo,
108
- primaryTerm = response.primaryTerm
109
- )
110
127
} catch (e: Exception ) {
111
128
throw AlertingException .wrap(e)
112
129
}
0 commit comments