Skip to content

Commit 172d58d

Browse files
authored
Remove blocking calls and change threat intel feed flow to event driven (opensearch-project#871)
* remove actionGet() and change threat intel feed flow to event driven Signed-off-by: Surya Sashank Nistala <[email protected]> * fix javadocs Signed-off-by: Surya Sashank Nistala <[email protected]> * revert try catch removals Signed-off-by: Surya Sashank Nistala <[email protected]> * use action listener wrap() in detector threat intel code paths Signed-off-by: Surya Sashank Nistala <[email protected]> * add try catch Signed-off-by: Surya Sashank Nistala <[email protected]> --------- Signed-off-by: Surya Sashank Nistala <[email protected]>
1 parent 8ef0a3f commit 172d58d

File tree

10 files changed

+265
-370
lines changed

10 files changed

+265
-370
lines changed

src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java

+15-28
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import java.util.Map;
3333
import java.util.Set;
3434
import java.util.UUID;
35-
import java.util.concurrent.CountDownLatch;
36-
import java.util.concurrent.TimeUnit;
3735
import java.util.stream.Collectors;
3836

3937
import static org.opensearch.securityanalytics.model.Detector.DETECTORS_INDEX;
@@ -121,35 +119,24 @@ public void createDocLevelQueryFromThreatIntel(List<LogType.IocFields> iocFieldL
121119
listener.onResponse(Collections.emptyList());
122120
return;
123121
}
124-
125-
CountDownLatch latch = new CountDownLatch(1);
126-
threatIntelFeedDataService.getThreatIntelFeedData(new ActionListener<>() {
127-
@Override
128-
public void onResponse(List<ThreatIntelFeedData> threatIntelFeedData) {
129-
if (threatIntelFeedData.isEmpty()) {
130-
listener.onResponse(Collections.emptyList());
131-
} else {
132-
listener.onResponse(
133-
createDocLevelQueriesFromThreatIntelList(iocFieldList, threatIntelFeedData, detector)
134-
);
122+
threatIntelFeedDataService.getThreatIntelFeedData(ActionListener.wrap(
123+
threatIntelFeedData -> {
124+
if (threatIntelFeedData.isEmpty()) {
125+
listener.onResponse(Collections.emptyList());
126+
} else {
127+
listener.onResponse(
128+
createDocLevelQueriesFromThreatIntelList(iocFieldList, threatIntelFeedData, detector)
129+
);
130+
}
131+
}, e -> {
132+
log.error("Failed to get threat intel feeds for doc level query creation", e);
133+
listener.onFailure(e);
135134
}
136-
latch.countDown();
137-
}
138-
139-
@Override
140-
public void onFailure(Exception e) {
141-
log.error("Failed to get threat intel feeds for doc level query creation", e);
142-
listener.onFailure(e);
143-
latch.countDown();
144-
}
145-
});
146-
147-
latch.await(30, TimeUnit.SECONDS);
148-
} catch (InterruptedException e) {
149-
log.error("Failed to create doc level queries from threat intel feeds", e);
135+
));
136+
} catch (Exception e) {
137+
log.error("Failed to create doc level query from threat intel data", e);
150138
listener.onFailure(e);
151139
}
152-
153140
}
154141

155142
private static String constructId(Detector detector, String iocType) {

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

+59-86
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@
3434
import org.opensearch.core.xcontent.ToXContent;
3535
import org.opensearch.core.xcontent.XContentBuilder;
3636
import org.opensearch.securityanalytics.model.ThreatIntelFeedData;
37+
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
3738
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction;
3839
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest;
3940
import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse;
40-
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata;
4141
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext;
42-
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
42+
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata;
4343
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService;
4444
import org.opensearch.securityanalytics.util.IndexUtils;
4545
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
@@ -56,7 +56,6 @@
5656
import java.util.List;
5757
import java.util.Map;
5858
import java.util.Optional;
59-
import java.util.concurrent.CountDownLatch;
6059
import java.util.regex.Matcher;
6160
import java.util.regex.Pattern;
6261
import java.util.stream.Collectors;
@@ -104,21 +103,13 @@ public void getThreatIntelFeedData(
104103
ActionListener<List<ThreatIntelFeedData>> listener
105104
) {
106105
try {
107-
108106
String tifdIndex = getLatestIndexByCreationDate();
109107
if (tifdIndex == null) {
110108
createThreatIntelFeedData(listener);
111109
} else {
112-
SearchRequest searchRequest = new SearchRequest(tifdIndex);
113-
searchRequest.source().size(9999); //TODO: convert to scroll
114-
String finalTifdIndex = tifdIndex;
115-
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> {
116-
log.error(String.format(
117-
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e);
118-
listener.onFailure(e);
119-
}));
110+
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener);
120111
}
121-
} catch (InterruptedException e) {
112+
} catch (Exception e) {
122113
log.error("Failed to get threat intel feed data", e);
123114
listener.onFailure(e);
124115
}
@@ -150,21 +141,16 @@ public void createIndexIfNotExists(final String indexName, final ActionListener<
150141
.mapping(getIndexMapping()).timeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT));
151142
StashedThreadContext.run(
152143
client,
153-
() -> client.admin().indices().create(createIndexRequest, new ActionListener<>() {
154-
@Override
155-
public void onResponse(CreateIndexResponse response) {
156-
if (response.isAcknowledged()) {
157-
listener.onResponse(response);
158-
} else {
159-
onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR));
160-
}
161-
}
162-
163-
@Override
164-
public void onFailure(Exception e) {
165-
listener.onFailure(e);
166-
}
167-
})
144+
() -> client.admin().indices().create(createIndexRequest,
145+
ActionListener.wrap(
146+
response -> {
147+
if (response.isAcknowledged())
148+
listener.onResponse(response);
149+
else
150+
listener.onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR));
151+
152+
}, listener::onFailure
153+
))
168154
);
169155
}
170156

@@ -223,28 +209,20 @@ public void parseAndSaveThreatIntelFeedDataCSV(
223209
}
224210
bulkRequestList.add(bulkRequest);
225211

226-
GroupedActionListener<BulkResponse> bulkResponseListener = new GroupedActionListener<>(new ActionListener<>() {
227-
@Override
228-
public void onResponse(Collection<BulkResponse> bulkResponses) {
229-
int idx = 0;
230-
for (BulkResponse response: bulkResponses) {
231-
BulkRequest request = bulkRequestList.get(idx);
232-
if (response.hasFailures()) {
233-
throw new OpenSearchException(
234-
"error occurred while ingesting threat intel feed data in {} with an error {}",
235-
StringUtils.join(request.getIndices()),
236-
response.buildFailureMessage()
237-
);
238-
}
212+
GroupedActionListener<BulkResponse> bulkResponseListener = new GroupedActionListener<>(ActionListener.wrap(bulkResponses -> {
213+
int idx = 0;
214+
for (BulkResponse response : bulkResponses) {
215+
BulkRequest request = bulkRequestList.get(idx);
216+
if (response.hasFailures()) {
217+
throw new OpenSearchException(
218+
"error occurred while ingesting threat intel feed data in {} with an error {}",
219+
StringUtils.join(request.getIndices()),
220+
response.buildFailureMessage()
221+
);
239222
}
240-
listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName)));
241223
}
242-
243-
@Override
244-
public void onFailure(Exception e) {
245-
listener.onFailure(e);
246-
}
247-
}, bulkRequestList.size());
224+
listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName)));
225+
}, listener::onFailure), bulkRequestList.size());
248226

249227
for (int i = 0; i < bulkRequestList.size(); ++i) {
250228
saveTifds(bulkRequestList.get(i), timeout, bulkResponseListener);
@@ -291,52 +269,47 @@ public void deleteThreatIntelDataIndex(final List<String> indices) {
291269
.prepareDelete(indices.toArray(new String[0]))
292270
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
293271
.setTimeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT))
294-
.execute(new ActionListener<>() {
295-
@Override
296-
public void onResponse(AcknowledgedResponse response) {
297-
if (response.isAcknowledged() == false) {
298-
onFailure(new OpenSearchException("failed to delete data[{}]", String.join(",", indices)));
299-
}
300-
}
301-
302-
@Override
303-
public void onFailure(Exception e) {
304-
log.error("unknown exception:", e);
305-
}
306-
})
272+
.execute(ActionListener.wrap(
273+
response -> {
274+
if (response.isAcknowledged() == false) {
275+
log.error(new OpenSearchException("failed to delete threat intel feed index[{}]",
276+
String.join(",", indices)));
277+
}
278+
}, e -> log.error("failed to delete threat intel feed index [{}]", e)
279+
))
307280
);
308281
}
309282

310-
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) throws InterruptedException {
311-
CountDownLatch countDownLatch = new CountDownLatch(1);
283+
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) {
312284
client.execute(
313285
PutTIFJobAction.INSTANCE,
314286
new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL)),
315-
new ActionListener<>() {
316-
@Override
317-
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
318-
log.debug("Acknowledged threat intel feed updater job created");
319-
countDownLatch.countDown();
320-
String tifdIndex = getLatestIndexByCreationDate();
321-
322-
SearchRequest searchRequest = new SearchRequest(tifdIndex);
323-
searchRequest.source().size(9999); //TODO: convert to scroll
324-
String finalTifdIndex = tifdIndex;
325-
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> {
326-
log.error(String.format(
327-
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e);
287+
ActionListener.wrap(
288+
r -> {
289+
if (false == r.isAcknowledged()) {
290+
listener.onFailure(new Exception("Failed to acknowledge Put Tif job action"));
291+
return;
292+
}
293+
log.debug("Acknowledged threat intel feed updater job created");
294+
String tifdIndex = getLatestIndexByCreationDate();
295+
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener);
296+
}, e -> {
297+
log.debug("Failed to create threat intel feed updater job", e);
328298
listener.onFailure(e);
329-
}));
330-
}
331-
332-
@Override
333-
public void onFailure(Exception e) {
334-
log.debug("Failed to create threat intel feed updater job", e);
335-
countDownLatch.countDown();
336-
}
337-
}
299+
}
300+
)
338301
);
339-
countDownLatch.await();
302+
}
303+
304+
private void fetchThreatIntelFeedDataFromIndex(String tifdIndex, ActionListener<List<ThreatIntelFeedData>> listener) {
305+
SearchRequest searchRequest = new SearchRequest(tifdIndex);
306+
searchRequest.source().size(9999); //TODO: convert to scroll
307+
String finalTifdIndex = tifdIndex;
308+
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> {
309+
log.error(String.format(
310+
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e);
311+
listener.onFailure(e);
312+
}));
340313
}
341314

342315
private String getIndexMapping() {

0 commit comments

Comments
 (0)