Skip to content

Commit e772a64

Browse files
committed
use transport service timeout instead of custom impl
Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent 97c83ce commit e772a64

File tree

6 files changed

+125
-206
lines changed

6 files changed

+125
-206
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import org.opensearch.ExceptionsHelper
1010
import org.opensearch.Version
1111
import org.opensearch.action.ActionListenerResponseHandler
1212
import org.opensearch.action.support.GroupedActionListener
13-
import org.opensearch.alerting.listener.TimeOutWrappedListener
1413
import org.opensearch.alerting.util.IndexUtils
1514
import org.opensearch.cluster.metadata.IndexMetadata
1615
import org.opensearch.cluster.node.DiscoveryNode
@@ -37,9 +36,9 @@ import org.opensearch.core.rest.RestStatus
3736
import org.opensearch.index.IndexNotFoundException
3837
import org.opensearch.index.seqno.SequenceNumbers
3938
import org.opensearch.node.NodeClosedException
40-
import org.opensearch.threadpool.ThreadPool
4139
import org.opensearch.transport.ActionNotFoundTransportException
4240
import org.opensearch.transport.ConnectTransportException
41+
import org.opensearch.transport.ReceiveTimeoutTransportException
4342
import org.opensearch.transport.RemoteTransportException
4443
import org.opensearch.transport.TransportException
4544
import org.opensearch.transport.TransportRequestOptions
@@ -292,38 +291,37 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
292291
concreteIndicesSeenSoFar,
293292
workflowRunContext
294293
)
295-
val timeout = monitorCtx.docLevelMonitorExecutionMaxDuration
296-
val timeoutWrappedListener = TimeOutWrappedListener.wrapScheduledTimeout(
297-
threadPool = monitorCtx.threadPool!!,
298-
timeout = timeout,
299-
executor = ThreadPool.Names.SAME,
300-
actionListener = listener,
301-
timeoutConsumer = { timeoutListener ->
302-
logger.error(
303-
"Node ${node.key} timed out for doc level monitor id" +
304-
" ${monitor.id} named ${monitor.name} in fanout after {${timeout.minutes}} minutes"
305-
)
306-
listener.onResponse(
307-
DocLevelMonitorFanOutResponse(
308-
nodeId = node.key,
309-
executionId = executionId,
310-
monitorId = monitor.id,
311-
mutableMapOf()
312-
)
313-
)
314-
}
315-
)
316294

317295
transportService.sendRequest(
318296
node.value,
319297
DocLevelMonitorFanOutAction.NAME,
320298
docLevelMonitorFanOutRequest,
321-
TransportRequestOptions.EMPTY,
299+
TransportRequestOptions
300+
.builder()
301+
.withTimeout(monitorCtx.docLevelMonitorExecutionMaxDuration)
302+
.build(),
322303
object : ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(
323-
timeoutWrappedListener,
304+
listener,
324305
responseReader
325306
) {
326307
override fun handleException(e: TransportException) {
308+
if (
309+
e is ReceiveTimeoutTransportException
310+
) {
311+
logger.warn(
312+
"fan_out timed out in node ${localNode.id} for doc level monitor ${monitor.id}," +
313+
" attempting to collect partial results from other nodes. ExecutionId: $executionId"
314+
)
315+
listener.onResponse(
316+
DocLevelMonitorFanOutResponse(
317+
localNode.id,
318+
executionId,
319+
monitor.id,
320+
mutableMapOf()
321+
)
322+
)
323+
return
324+
}
327325
val cause = e.unwrapCause()
328326
if (cause is ConnectTransportException ||
329327
(
@@ -341,19 +339,18 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
341339
localNode,
342340
DocLevelMonitorFanOutAction.NAME,
343341
docLevelMonitorFanOutRequest,
344-
TransportRequestOptions.EMPTY,
342+
TransportRequestOptions
343+
.builder()
344+
.withTimeout(monitorCtx.docLevelMonitorExecutionMaxDuration)
345+
.build(),
345346
object :
346347
ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(
347-
timeoutWrappedListener,
348+
listener,
348349
responseReader
349350
) {
350351
override fun handleException(e: TransportException) {
351-
logger.error(
352-
"Fan out retry failed in node ${localNode.id} " +
353-
"for doc level monitor $monitor.id",
354-
e
355-
)
356-
timeoutWrappedListener.onResponse(
352+
logger.error("Fan out retry failed in node ${localNode.id}", e)
353+
listener.onResponse(
357354
DocLevelMonitorFanOutResponse(
358355
"",
359356
"",
@@ -369,13 +366,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
369366
}
370367

371368
override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
372-
timeoutWrappedListener.onResponse(response)
369+
listener.onResponse(response)
373370
}
374371
}
375372
)
376373
} else {
377374
logger.error("Fan out failed in node ${node.key}", e)
378-
timeoutWrappedListener.onResponse(
375+
listener.onResponse(
379376
DocLevelMonitorFanOutResponse(
380377
"",
381378
"",
@@ -392,7 +389,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
392389
}
393390

394391
override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
395-
timeoutWrappedListener.onResponse(response)
392+
listener.onResponse(response)
396393
}
397394
}
398395
)

alerting/src/main/kotlin/org/opensearch/alerting/listener/PrioritizedActionListener.kt

Lines changed: 0 additions & 7 deletions
This file was deleted.

alerting/src/main/kotlin/org/opensearch/alerting/listener/TimeOutWrappedListener.kt

Lines changed: 0 additions & 93 deletions
This file was deleted.

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ class TransportDocLevelMonitorFanOutAction
212212
listener: ActionListener<DocLevelMonitorFanOutResponse>
213213
) {
214214
try {
215+
log.info("Starting fan_out for doc level monitor ${request.monitor.id}. ExecutionId: ${request.executionId}")
216+
val startTime = System.currentTimeMillis()
215217
val endTime = Instant.now().plusMillis(docLevelMonitorFanoutMaxDuration.millis())
216218
val monitor = request.monitor
217219
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, Instant.now(), Instant.now())
@@ -364,6 +366,11 @@ class TransportDocLevelMonitorFanOutAction
364366
triggerResults
365367
)
366368
)
369+
val completedTime = System.currentTimeMillis()
370+
val fanoutDuration = completedTime - startTime
371+
log.info(
372+
"Completed fan_out for doc level monitor ${request.monitor.id} in $fanoutDuration ms. ExecutionId: ${request.executionId}"
373+
)
367374
} catch (e: Exception) {
368375
log.error(
369376
"${request.monitor.id} Failed to run fan_out on node ${clusterService.localNode().id}." +
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package org.opensearch.alerting
2+
3+
import org.junit.Assert
4+
import org.opensearch.alerting.settings.AlertingSettings
5+
import org.opensearch.common.unit.TimeValue
6+
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
7+
import org.opensearch.commons.alerting.model.DocLevelQuery
8+
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
9+
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
10+
import java.time.ZonedDateTime
11+
import java.time.format.DateTimeFormatter
12+
import java.time.temporal.ChronoUnit.MILLIS
13+
14+
class DocLeveFanOutIT : AlertingRestTestCase() {
15+
16+
fun `test execution reaches endtime before completing execution`() {
17+
var updateSettings =
18+
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_EXECUTION_MAX_DURATION.key, TimeValue.timeValueNanos(1))
19+
val updateSettings1 = adminClient().updateSettings(AlertingSettings.FINDING_HISTORY_ENABLED.key, false)
20+
logger.info(updateSettings1)
21+
logger.info(updateSettings)
22+
val testIndex = createTestIndex()
23+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
24+
val testDoc = """{
25+
"message" : "This is an error from IAD region",
26+
"test_strict_date_time" : "$testTime",
27+
"test_field" : "us-west-2"
28+
}"""
29+
30+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
31+
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))
32+
33+
val actionExecutionScope = PerExecutionActionScope()
34+
val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope)
35+
val actions = (0..randomInt(10)).map {
36+
randomActionWithPolicy(
37+
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
38+
destinationId = createDestination().id,
39+
actionExecutionPolicy = actionExecutionPolicy
40+
)
41+
}
42+
43+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
44+
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
45+
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
46+
val trigger3 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
47+
val trigger4 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
48+
val trigger5 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
49+
val trigger6 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
50+
51+
val monitor = createMonitor(
52+
randomDocumentLevelMonitor(
53+
inputs = listOf(docLevelInput),
54+
triggers = listOf(trigger, trigger1, trigger2, trigger3, trigger4, trigger5, trigger6)
55+
)
56+
)
57+
assertNotNull(monitor.id)
58+
59+
indexDoc(testIndex, "1", testDoc)
60+
indexDoc(testIndex, "5", testDoc)
61+
62+
var response = executeMonitor(monitor.id)
63+
64+
var output = entityAsMap(response)
65+
66+
assertEquals(monitor.name, output["monitor_name"])
67+
@Suppress("UNCHECKED_CAST")
68+
var inputResults = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
69+
Assert.assertTrue(inputResults.isEmpty())
70+
71+
updateSettings =
72+
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_EXECUTION_MAX_DURATION.key, TimeValue.timeValueMinutes(4))
73+
logger.info(updateSettings)
74+
75+
response = executeMonitor(monitor.id)
76+
output = entityAsMap(response)
77+
assertEquals(monitor.name, output["monitor_name"])
78+
inputResults = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
79+
@Suppress("UNCHECKED_CAST")
80+
val matchingDocsToQuery = inputResults[docQuery.id] as List<String>
81+
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
82+
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
83+
assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex"))
84+
}
85+
}

0 commit comments

Comments
 (0)