-
Notifications
You must be signed in to change notification settings - Fork 85
Remove blocking calls and change threat intel feed flow to event driven #871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
58f9727
5dce731
b20270c
f222f41
3bfb29d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,12 +34,12 @@ | |
import org.opensearch.core.xcontent.ToXContent; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.securityanalytics.model.ThreatIntelFeedData; | ||
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; | ||
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction; | ||
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest; | ||
import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse; | ||
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; | ||
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; | ||
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; | ||
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; | ||
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService; | ||
import org.opensearch.securityanalytics.util.IndexUtils; | ||
import org.opensearch.securityanalytics.util.SecurityAnalyticsException; | ||
|
@@ -56,7 +56,6 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
import java.util.stream.Collectors; | ||
|
@@ -104,21 +103,13 @@ public void getThreatIntelFeedData( | |
ActionListener<List<ThreatIntelFeedData>> listener | ||
) { | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we no longer need this top-level try/catch? My observation has been that calls will hang it exceptions are not handled via the ActionListener There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. listener framework is event driven and no catch is required as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this code throws an exception, we never make a call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know if I am missing something There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. true. |
||
|
||
String tifdIndex = getLatestIndexByCreationDate(); | ||
if (tifdIndex == null) { | ||
createThreatIntelFeedData(listener); | ||
} else { | ||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
listener.onFailure(e); | ||
})); | ||
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); | ||
} | ||
} catch (InterruptedException e) { | ||
} catch (Exception e) { | ||
log.error("Failed to get threat intel feed data", e); | ||
listener.onFailure(e); | ||
} | ||
|
@@ -150,21 +141,16 @@ public void createIndexIfNotExists(final String indexName, final ActionListener< | |
.mapping(getIndexMapping()).timeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)); | ||
StashedThreadContext.run( | ||
client, | ||
() -> client.admin().indices().create(createIndexRequest, new ActionListener<>() { | ||
@Override | ||
public void onResponse(CreateIndexResponse response) { | ||
if (response.isAcknowledged()) { | ||
listener.onResponse(response); | ||
} else { | ||
onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR)); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
listener.onFailure(e); | ||
} | ||
}) | ||
() -> client.admin().indices().create(createIndexRequest, | ||
ActionListener.wrap( | ||
response -> { | ||
if (response.isAcknowledged()) | ||
listener.onResponse(response); | ||
else | ||
listener.onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR)); | ||
|
||
}, listener::onFailure | ||
)) | ||
); | ||
} | ||
|
||
|
@@ -223,28 +209,20 @@ public void parseAndSaveThreatIntelFeedDataCSV( | |
} | ||
bulkRequestList.add(bulkRequest); | ||
|
||
GroupedActionListener<BulkResponse> bulkResponseListener = new GroupedActionListener<>(new ActionListener<>() { | ||
@Override | ||
public void onResponse(Collection<BulkResponse> bulkResponses) { | ||
int idx = 0; | ||
for (BulkResponse response: bulkResponses) { | ||
BulkRequest request = bulkRequestList.get(idx); | ||
if (response.hasFailures()) { | ||
throw new OpenSearchException( | ||
"error occurred while ingesting threat intel feed data in {} with an error {}", | ||
StringUtils.join(request.getIndices()), | ||
response.buildFailureMessage() | ||
); | ||
} | ||
GroupedActionListener<BulkResponse> bulkResponseListener = new GroupedActionListener<>(ActionListener.wrap(bulkResponses -> { | ||
int idx = 0; | ||
for (BulkResponse response : bulkResponses) { | ||
BulkRequest request = bulkRequestList.get(idx); | ||
if (response.hasFailures()) { | ||
throw new OpenSearchException( | ||
"error occurred while ingesting threat intel feed data in {} with an error {}", | ||
StringUtils.join(request.getIndices()), | ||
response.buildFailureMessage() | ||
); | ||
} | ||
listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName))); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
listener.onFailure(e); | ||
} | ||
}, bulkRequestList.size()); | ||
listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName))); | ||
}, listener::onFailure), bulkRequestList.size()); | ||
|
||
for (int i = 0; i < bulkRequestList.size(); ++i) { | ||
saveTifds(bulkRequestList.get(i), timeout, bulkResponseListener); | ||
|
@@ -291,52 +269,47 @@ public void deleteThreatIntelDataIndex(final List<String> indices) { | |
.prepareDelete(indices.toArray(new String[0])) | ||
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) | ||
.setTimeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)) | ||
.execute(new ActionListener<>() { | ||
@Override | ||
public void onResponse(AcknowledgedResponse response) { | ||
if (response.isAcknowledged() == false) { | ||
onFailure(new OpenSearchException("failed to delete data[{}]", String.join(",", indices))); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
log.error("unknown exception:", e); | ||
} | ||
}) | ||
.execute(ActionListener.wrap( | ||
response -> { | ||
if (response.isAcknowledged() == false) { | ||
log.error(new OpenSearchException("failed to delete threat intel feed index[{}]", | ||
String.join(",", indices))); | ||
} | ||
}, e -> log.error("failed to delete threat intel feed index [{}]", e) | ||
)) | ||
); | ||
} | ||
|
||
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) throws InterruptedException { | ||
CountDownLatch countDownLatch = new CountDownLatch(1); | ||
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) { | ||
client.execute( | ||
PutTIFJobAction.INSTANCE, | ||
new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL)), | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse(AcknowledgedResponse acknowledgedResponse) { | ||
log.debug("Acknowledged threat intel feed updater job created"); | ||
countDownLatch.countDown(); | ||
String tifdIndex = getLatestIndexByCreationDate(); | ||
|
||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
ActionListener.wrap( | ||
r -> { | ||
if (false == r.isAcknowledged()) { | ||
listener.onFailure(new Exception("Failed to acknowledge Put Tif job action")); | ||
return; | ||
} | ||
log.debug("Acknowledged threat intel feed updater job created"); | ||
String tifdIndex = getLatestIndexByCreationDate(); | ||
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); | ||
}, e -> { | ||
log.debug("Failed to create threat intel feed updater job", e); | ||
listener.onFailure(e); | ||
})); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
log.debug("Failed to create threat intel feed updater job", e); | ||
countDownLatch.countDown(); | ||
} | ||
} | ||
} | ||
) | ||
); | ||
countDownLatch.await(); | ||
} | ||
|
||
private void fetchThreatIntelFeedDataFromIndex(String tifdIndex, ActionListener<List<ThreatIntelFeedData>> listener) { | ||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
listener.onFailure(e); | ||
})); | ||
} | ||
|
||
private String getIndexMapping() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know why the latches were initially implemented? Seems fine to remove them based on the testing performed but I'm puzzled as to why they would have been added in the first place if they are not required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad practice. The right construct to use is a
Countdown
but safer to do it the event-driven way