-
Notifications
You must be signed in to change notification settings - Fork 85
Remove blocking calls and change threat intel feed flow to event driven #871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
58f9727
5dce731
b20270c
f222f41
3bfb29d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,12 +34,12 @@ | |
import org.opensearch.core.xcontent.ToXContent; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.securityanalytics.model.ThreatIntelFeedData; | ||
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; | ||
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction; | ||
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest; | ||
import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse; | ||
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; | ||
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; | ||
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; | ||
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; | ||
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService; | ||
import org.opensearch.securityanalytics.util.IndexUtils; | ||
import org.opensearch.securityanalytics.util.SecurityAnalyticsException; | ||
|
@@ -56,7 +56,6 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
import java.util.stream.Collectors; | ||
|
@@ -104,21 +103,13 @@ public void getThreatIntelFeedData( | |
ActionListener<List<ThreatIntelFeedData>> listener | ||
) { | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we no longer need this top-level try/catch? My observation has been that calls will hang it exceptions are not handled via the ActionListener There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. listener framework is event driven and no catch is required as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this code throws an exception, we never make a call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know if I am missing something There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. true. |
||
|
||
String tifdIndex = getLatestIndexByCreationDate(); | ||
if (tifdIndex == null) { | ||
createThreatIntelFeedData(listener); | ||
} else { | ||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
listener.onFailure(e); | ||
})); | ||
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); | ||
} | ||
} catch (InterruptedException e) { | ||
} catch (Exception e) { | ||
log.error("Failed to get threat intel feed data", e); | ||
listener.onFailure(e); | ||
} | ||
|
@@ -307,36 +298,35 @@ public void onFailure(Exception e) { | |
); | ||
} | ||
|
||
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) throws InterruptedException { | ||
CountDownLatch countDownLatch = new CountDownLatch(1); | ||
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) { | ||
client.execute( | ||
PutTIFJobAction.INSTANCE, | ||
new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL)), | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse(AcknowledgedResponse acknowledgedResponse) { | ||
log.debug("Acknowledged threat intel feed updater job created"); | ||
countDownLatch.countDown(); | ||
String tifdIndex = getLatestIndexByCreationDate(); | ||
|
||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
listener.onFailure(e); | ||
})); | ||
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
log.debug("Failed to create threat intel feed updater job", e); | ||
countDownLatch.countDown(); | ||
} | ||
} | ||
); | ||
countDownLatch.await(); | ||
} | ||
|
||
private void fetchThreatIntelFeedDataFromIndex(String tifdIndex, ActionListener<List<ThreatIntelFeedData>> listener) { | ||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
listener.onFailure(e); | ||
})); | ||
} | ||
|
||
private String getIndexMapping() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.OpenSearchStatusException; | ||
import org.opensearch.ResourceAlreadyExistsException; | ||
import org.opensearch.ResourceNotFoundException; | ||
import org.opensearch.action.DocWriteRequest; | ||
import org.opensearch.action.StepListener; | ||
import org.opensearch.action.admin.indices.create.CreateIndexRequest; | ||
|
@@ -84,6 +85,7 @@ public void onFailure(final Exception e) { | |
stepListener.onResponse(null); | ||
return; | ||
} | ||
log.error("Failed to create security analytics job index", e); | ||
stepListener.onFailure(e); | ||
} | ||
})); | ||
|
@@ -104,82 +106,72 @@ private String getIndexMapping() { | |
|
||
/** | ||
* Update jobSchedulerParameter in an index {@code TIFJobExtension.JOB_INDEX_NAME} | ||
* | ||
* @param jobSchedulerParameter the jobSchedulerParameter | ||
*/ | ||
public void updateJobSchedulerParameter(final TIFJobParameter jobSchedulerParameter, final ActionListener<ThreatIntelIndicesResponse> listener) { | ||
jobSchedulerParameter.setLastUpdateTime(Instant.now()); | ||
StashedThreadContext.run(client, () -> { | ||
try { | ||
if (listener != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this listener never be null? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed it to non-null in all invocations |
||
client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) | ||
.setId(jobSchedulerParameter.getName()) | ||
.setOpType(DocWriteRequest.OpType.INDEX) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) | ||
.execute(new ActionListener<>() { | ||
@Override | ||
public void onResponse(IndexResponse indexResponse) { | ||
if (indexResponse.status().getStatus() >= 200 && indexResponse.status().getStatus() < 300) { | ||
listener.onResponse(new ThreatIntelIndicesResponse(true, jobSchedulerParameter.getIndices())); | ||
} else { | ||
listener.onFailure(new OpenSearchStatusException("update of job scheduler parameter failed", RestStatus.INTERNAL_SERVER_ERROR)); | ||
} | ||
client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) | ||
.setId(jobSchedulerParameter.getName()) | ||
.setOpType(DocWriteRequest.OpType.INDEX) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) | ||
.execute(new ActionListener<>() { | ||
@Override | ||
public void onResponse(IndexResponse indexResponse) { | ||
if (indexResponse.status().getStatus() >= 200 && indexResponse.status().getStatus() < 300) { | ||
listener.onResponse(new ThreatIntelIndicesResponse(true, jobSchedulerParameter.getIndices())); | ||
} else { | ||
listener.onFailure(new OpenSearchStatusException("update of job scheduler parameter failed", RestStatus.INTERNAL_SERVER_ERROR)); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
listener.onFailure(e); | ||
} | ||
}); | ||
} else { | ||
client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) | ||
.setId(jobSchedulerParameter.getName()) | ||
.setOpType(DocWriteRequest.OpType.INDEX) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) | ||
.execute().actionGet(); | ||
} | ||
@Override | ||
public void onFailure(Exception e) { | ||
listener.onFailure(e); | ||
} | ||
}); | ||
} catch (IOException e) { | ||
throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); | ||
log.error("failed to update job scheduler param for tif job", e); | ||
listener.onFailure(e); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Get tif job from an index {@code TIFJobExtension.JOB_INDEX_NAME} | ||
* | ||
* @param name the name of a tif job | ||
* @return tif job | ||
* @throws IOException exception | ||
*/ | ||
public TIFJobParameter getJobParameter(final String name) throws IOException { | ||
public void getJobParameter(final String name, ActionListener<TIFJobParameter> listener) { | ||
GetRequest request = new GetRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME, name); | ||
GetResponse response; | ||
try { | ||
response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT))); | ||
if (response.isExists() == false) { | ||
log.error("TIF job[{}] does not exist in an index[{}]", name, SecurityAnalyticsPlugin.JOB_INDEX_NAME); | ||
return null; | ||
} | ||
} catch (IndexNotFoundException e) { | ||
log.error("Index[{}] is not found", SecurityAnalyticsPlugin.JOB_INDEX_NAME); | ||
return null; | ||
} | ||
|
||
XContentParser parser = XContentHelper.createParser( | ||
NamedXContentRegistry.EMPTY, | ||
LoggingDeprecationHandler.INSTANCE, | ||
response.getSourceAsBytesRef() | ||
); | ||
return TIFJobParameter.PARSER.parse(parser, null); | ||
StashedThreadContext.run(client, () -> client.get(request, ActionListener.wrap( | ||
response -> { | ||
if (response.isExists() == false) { | ||
log.error("TIF job[{}] does not exist in an index[{}]", name, SecurityAnalyticsPlugin.JOB_INDEX_NAME); | ||
listener.onFailure(new ResourceNotFoundException("name")); | ||
} | ||
XContentParser parser = XContentHelper.createParser( | ||
NamedXContentRegistry.EMPTY, | ||
LoggingDeprecationHandler.INSTANCE, | ||
response.getSourceAsBytesRef() | ||
); | ||
listener.onResponse(TIFJobParameter.PARSER.parse(parser, null)); | ||
}, e -> { | ||
log.error("Failed to fetch tif job document " + name, e); | ||
listener.onFailure(e); | ||
}))); | ||
} | ||
|
||
/** | ||
* Put tifJobParameter in an index {@code TIFJobExtension.JOB_INDEX_NAME} | ||
* | ||
* @param tifJobParameter the tifJobParameter | ||
* @param listener the listener | ||
* @param listener the listener | ||
*/ | ||
public void saveTIFJobParameter(final TIFJobParameter tifJobParameter, final ActionListener listener) { | ||
public void saveTIFJobParameter(final TIFJobParameter tifJobParameter, final ActionListener<IndexResponse> listener) { | ||
tifJobParameter.setLastUpdateTime(Instant.now()); | ||
StashedThreadContext.run(client, () -> { | ||
try { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know why the latches were initially implemented? Seems fine to remove them based on the testing performed but I'm puzzled as to why they would have been added in the first place if they are not required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad practice. The right construct to use is a
Countdown
but safer to do it the event-driven way