Skip to content

Commit 75c4429

Browse files
authored
Add search request timeouts for correlations workflows (opensearch-project#893)
* Reinstating more leaks plugged-in for correlations workflows Signed-off-by: Megha Goyal <[email protected]> * Add search timeouts to all correlation searches Signed-off-by: Megha Goyal <[email protected]> * Fix logging and exception messages Signed-off-by: Megha Goyal <[email protected]> * Change search timeout to 30 seconds Signed-off-by: Megha Goyal <[email protected]> --------- Signed-off-by: Megha Goyal <[email protected]>
1 parent 656a5fe commit 75c4429

File tree

3 files changed

+69
-48
lines changed

3 files changed

+69
-48
lines changed

src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java

+7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.lucene.search.join.ScoreMode;
1111
import org.opensearch.OpenSearchStatusException;
1212
import org.opensearch.cluster.routing.Preference;
13+
import org.opensearch.common.unit.TimeValue;
1314
import org.opensearch.commons.alerting.model.DocLevelQuery;
1415
import org.opensearch.core.action.ActionListener;
1516
import org.opensearch.action.search.MultiSearchRequest;
@@ -132,6 +133,7 @@ private void generateAutoCorrelations(Detector detector, Finding finding) throws
132133
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(logTypeName));
133134
searchRequest.source(sourceBuilder);
134135
searchRequest.preference(Preference.PRIMARY_FIRST.type());
136+
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
135137
mSearchRequest.add(searchRequest);
136138
}
137139

@@ -214,6 +216,7 @@ private void onAutoCorrelations(Detector detector, Finding finding, Map<String,
214216
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
215217
searchRequest.source(searchSourceBuilder);
216218
searchRequest.preference(Preference.PRIMARY_FIRST.type());
219+
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
217220

218221
client.search(searchRequest, ActionListener.wrap(response -> {
219222
if (response.isTimedOut()) {
@@ -277,6 +280,7 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
277280
searchRequest.indices(indices.toArray(new String[]{}));
278281
searchRequest.source(searchSourceBuilder);
279282
searchRequest.preference(Preference.PRIMARY_FIRST.type());
283+
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
280284

281285
validCorrelationRules.add(rule);
282286
validFields.add(query.get().getField());
@@ -377,6 +381,7 @@ private void searchFindingsByTimestamp(String detectorType, Map<String, List<Cor
377381
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(categoryToQueries.getKey()));
378382
searchRequest.source(searchSourceBuilder);
379383
searchRequest.preference(Preference.PRIMARY_FIRST.type());
384+
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
380385
mSearchRequest.add(searchRequest);
381386
categoryToQueriesPairs.add(Pair.of(categoryToQueries.getKey(), categoryToQueries.getValue()));
382387
}
@@ -441,6 +446,7 @@ private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearch
441446
searchRequest.indices(docSearchCriteria.getValue().indices.toArray(new String[]{}));
442447
searchRequest.source(searchSourceBuilder);
443448
searchRequest.preference(Preference.PRIMARY_FIRST.type());
449+
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
444450

445451
categories.add(docSearchCriteria.getKey());
446452
mSearchRequest.add(searchRequest);
@@ -502,6 +508,7 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
502508
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(relatedDocIds.getKey()));
503509
searchRequest.source(searchSourceBuilder);
504510
searchRequest.preference(Preference.PRIMARY_FIRST.type());
511+
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
505512

506513
categories.add(relatedDocIds.getKey());
507514
mSearchRequest.add(searchRequest);

src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction;
3333
import org.opensearch.securityanalytics.util.CorrelationIndices;
3434

35+
import java.util.Arrays;
3536
import java.util.List;
3637
import java.util.Locale;
3738
import java.util.Map;
@@ -94,6 +95,7 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
9495
request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
9596
request.source(searchSourceBuilder);
9697
request.preference(Preference.PRIMARY_FIRST.type());
98+
request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
9799

98100
mSearchRequest.add(request);
99101
}
@@ -195,6 +197,12 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
195197
}
196198

197199
public void insertOrphanFindings(String detectorType, Finding finding, float timestampFeature, Map<String, CustomLogType> logTypes) {
200+
if (logTypes.get(detectorType) == null ) {
201+
log.debug("Missing detector type {} in the log types index for finding id {}. Keys in the index: {}",
202+
detectorType, finding.getId(), Arrays.toString(logTypes.keySet().toArray()));
203+
onFailure(new OpenSearchStatusException("insertOrphanFindings null log types for detector type: " + detectorType, RestStatus.INTERNAL_SERVER_ERROR));
204+
}
205+
198206
SearchRequest searchRequest = getSearchMetadataIndexRequest(detectorType, finding, logTypes);
199207
Map<String, Object> tags = logTypes.get(detectorType).getTags();
200208
String correlationId = tags.get("correlation_id").toString();
@@ -251,7 +259,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
251259
onFailure(ex);
252260
}
253261
} else {
254-
onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR));
262+
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
263+
indexResponse.status(), indexResponse.toString()));
255264
}
256265
}, this::onFailure));
257266
} else {
@@ -297,7 +306,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
297306
onFailure(ex);
298307
}
299308
} else {
300-
onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR));
309+
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
310+
indexResponse.status(), indexResponse.toString()));
301311
}
302312
}, this::onFailure));
303313
} else {
@@ -323,6 +333,7 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
323333
request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
324334
request.source(searchSourceBuilder);
325335
request.preference(Preference.PRIMARY_FIRST.type());
336+
request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
326337

327338
client.search(request, ActionListener.wrap(searchResponse -> {
328339
if (searchResponse.isTimedOut()) {
@@ -407,6 +418,9 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
407418
} catch (Exception ex) {
408419
onFailure(ex);
409420
}
421+
} else {
422+
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
423+
indexResponse.status(), indexResponse.toString()));
410424
}
411425
}, this::onFailure));
412426
} catch (Exception ex) {
@@ -432,7 +446,7 @@ private void indexCorrelatedFindings(XContentBuilder builder) {
432446
if (response.status().equals(RestStatus.CREATED)) {
433447
correlateFindingAction.onOperation();
434448
} else {
435-
onFailure(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR));
449+
onFailure(new OpenSearchStatusException("Indexing failed with response {} ", response.status(), response.toString()));
436450
}
437451
}, this::onFailure));
438452
}
@@ -454,6 +468,7 @@ private SearchRequest getSearchMetadataIndexRequest(String detectorType, Finding
454468
searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);
455469
searchRequest.source(searchSourceBuilder);
456470
searchRequest.preference(Preference.PRIMARY_FIRST.type());
471+
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
457472
return searchRequest;
458473
}
459474

0 commit comments

Comments
 (0)