diff --git a/src/main/java/org/opensearch/securityanalytics/mapper/MapperService.java b/src/main/java/org/opensearch/securityanalytics/mapper/MapperService.java index 5616fdbe0..7760a4ac1 100644 --- a/src/main/java/org/opensearch/securityanalytics/mapper/MapperService.java +++ b/src/main/java/org/opensearch/securityanalytics/mapper/MapperService.java @@ -78,9 +78,11 @@ public void createMappingAction(String indexName, String logType, String aliasMa // since you can't update documents in non-write indices String index = indexName; boolean shouldUpsertIndexTemplate = IndexUtils.isConcreteIndex(indexName, this.clusterService.state()) == false; - if (IndexUtils.isDataStream(indexName, this.clusterService.state())) { + if (IndexUtils.isDataStream(indexName, this.clusterService.state()) || IndexUtils.isAlias(indexName, this.clusterService.state())) { + log.debug("{} is an alias or datastream. Fetching write index for create mapping action.", indexName); String writeIndex = IndexUtils.getWriteIndex(indexName, this.clusterService.state()); if (writeIndex != null) { + log.debug("Write index for {} is {}", indexName, writeIndex); index = writeIndex; } } @@ -92,6 +94,7 @@ public void onResponse(GetMappingsResponse getMappingsResponse) { applyAliasMappings(getMappingsResponse.getMappings(), logType, aliasMappings, partial, new ActionListener<>() { @Override public void onResponse(Collection createMappingResponse) { + log.debug("Completed create mappings for {}", indexName); // We will return ack==false if one of the requests returned that // else return ack==true Optional notAckd = createMappingResponse.stream() @@ -110,6 +113,7 @@ public void onResponse(Collection createMappingResponse) { @Override public void onFailure(Exception e) { + log.debug("Failed to create mappings for {}", indexName ); actionListener.onFailure(e); } }); diff --git a/src/main/java/org/opensearch/securityanalytics/mapper/MapperUtils.java b/src/main/java/org/opensearch/securityanalytics/mapper/MapperUtils.java index 72dd36d11..8c8bf353f 100644 --- a/src/main/java/org/opensearch/securityanalytics/mapper/MapperUtils.java +++ b/src/main/java/org/opensearch/securityanalytics/mapper/MapperUtils.java @@ -5,6 +5,10 @@ package org.opensearch.securityanalytics.mapper; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.securityanalytics.util.SecurityAnalyticsException; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -12,9 +16,6 @@ import java.util.Locale; import java.util.Map; import java.util.Set; -import org.apache.commons.lang3.tuple.Pair; -import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.securityanalytics.util.SecurityAnalyticsException; public class MapperUtils { @@ -246,7 +247,6 @@ public void onError(String error) { } }); mappingsTraverser.traverse(); - return presentPathsMappings; } } diff --git a/src/main/java/org/opensearch/securityanalytics/rules/backend/OSQueryBackend.java b/src/main/java/org/opensearch/securityanalytics/rules/backend/OSQueryBackend.java index 2d1763a43..ec7b09505 100644 --- a/src/main/java/org/opensearch/securityanalytics/rules/backend/OSQueryBackend.java +++ b/src/main/java/org/opensearch/securityanalytics/rules/backend/OSQueryBackend.java @@ -331,9 +331,12 @@ public Object convertConditionFieldEqValQueryExpr(ConditionFieldEqualsValueExpre @Override public Object convertConditionValStr(ConditionValueExpression condition) throws SigmaValueError { + String field = getFinalValueField(); + ruleQueryFields.put(field, Map.of("type", "text", "analyzer", "rule_analyzer")); SigmaString value = (SigmaString) condition.getValue(); boolean containsWildcard = value.containsWildcard(); - return String.format(Locale.getDefault(), (containsWildcard? this.unboundWildcardExpression: this.unboundValueStrExpression), this.convertValueStr((SigmaString) condition.getValue())); + return String.format(Locale.getDefault(), (containsWildcard? this.unboundWildcardExpression: this.unboundValueStrExpression), + this.convertValueStr((SigmaString) condition.getValue())); } @Override diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java index 883bf8ee7..ad90b795f 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java @@ -110,15 +110,17 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.CountDownLatch; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -220,19 +222,22 @@ private void checkIndicesAndExecute( ActionListener listener, User user ) { + log.debug("check indices and execute began"); String [] detectorIndices = request.getDetector().getInputs().stream().flatMap(detectorInput -> detectorInput.getIndices().stream()).toArray(String[]::new); SearchRequest searchRequest = new SearchRequest(detectorIndices) - .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) - .preference(Preference.PRIMARY_FIRST.type()); + .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())); + searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30)); client.search(searchRequest, new ActionListener<>() { @Override public void onResponse(SearchResponse searchResponse) { + log.debug("check indices and execute completed. Took {} millis", searchResponse.getTook().millis()); AsyncIndexDetectorsAction asyncAction = new AsyncIndexDetectorsAction(user, task, request, listener); asyncAction.start(); } @Override public void onFailure(Exception e) { + log.debug("check indices and execute failed", e); if (e instanceof OpenSearchStatusException) { listener.onFailure(SecurityAnalyticsException.wrap( new OpenSearchStatusException(String.format(Locale.getDefault(), "User doesn't have read permissions for one or more configured index %s", detectorIndices), RestStatus.FORBIDDEN) @@ -249,7 +254,8 @@ public void onFailure(Exception e) { }); } - private void createMonitorFromQueries(List> rulesById, Detector detector, ActionListener> listener, WriteRequest.RefreshPolicy refreshPolicy) { + private void createMonitorFromQueries(List> rulesById, Detector detector, ActionListener> listener, WriteRequest.RefreshPolicy refreshPolicy, + List queryFieldNames) { List> docLevelRules = rulesById.stream().filter(it -> !it.getRight().isAggregationRule()).collect( Collectors.toList()); List> bucketLevelRules = rulesById.stream().filter(it -> it.getRight().isAggregationRule()).collect( @@ -262,13 +268,14 @@ public void onResponse(List dlqs) { List monitorRequests = new ArrayList<>(); if (!docLevelRules.isEmpty() || detector.getThreatIntelEnabled()) { - monitorRequests.add(createDocLevelMonitorRequest(docLevelRules, dlqs != null ? dlqs : List.of(), detector, refreshPolicy, Monitor.NO_ID, Method.POST)); + monitorRequests.add(createDocLevelMonitorRequest(docLevelRules, dlqs != null ? dlqs : List.of(), detector, refreshPolicy, Monitor.NO_ID, Method.POST, queryFieldNames)); } if (!bucketLevelRules.isEmpty()) { StepListener> bucketLevelMonitorRequests = new StepListener<>(); buildBucketLevelMonitorRequests(bucketLevelRules, detector, refreshPolicy, Monitor.NO_ID, Method.POST, bucketLevelMonitorRequests); bucketLevelMonitorRequests.whenComplete(indexMonitorRequests -> { + log.debug("bucket level monitor request built"); monitorRequests.addAll(indexMonitorRequests); // Do nothing if detector doesn't have any monitor if (monitorRequests.isEmpty()) { @@ -283,6 +290,7 @@ public void onResponse(List dlqs) { // https://github.com/opensearch-project/alerting/issues/646 AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(0), namedWriteableRegistry, addFirstMonitorStep); addFirstMonitorStep.whenComplete(addedFirstMonitorResponse -> { + log.debug("first monitor created id {} of type {}", addedFirstMonitorResponse.getId(), addedFirstMonitorResponse.getMonitor().getMonitorType()); monitorResponses.add(addedFirstMonitorResponse); StepListener> indexMonitorsStep = new StepListener<>(); @@ -416,7 +424,12 @@ public void onFailure(Exception e) { } } - private void updateMonitorFromQueries(String index, List> rulesById, Detector detector, ActionListener> listener, WriteRequest.RefreshPolicy refreshPolicy) throws Exception { + private void updateMonitorFromQueries(String index, + List> rulesById, + Detector detector, + ActionListener> listener, + WriteRequest.RefreshPolicy refreshPolicy, + List queryFieldNames) { List monitorsToBeUpdated = new ArrayList<>(); List> bucketLevelRules = rulesById.stream().filter(it -> it.getRight().isAggregationRule()).collect( @@ -442,47 +455,78 @@ public void onResponse(Map> ruleFieldMappings) { // Pair of RuleId - MonitorId for existing monitors of the detector Map monitorPerRule = detector.getRuleIdMonitorIdMap(); + GroupedActionListener groupedActionListener = new GroupedActionListener<>( + new ActionListener<>() { + @Override + public void onResponse(Collection indexMonitorRequests) { + onIndexMonitorRequestCreation( + monitorsToBeUpdated, + monitorsToBeAdded, + rulesById, + detector, + refreshPolicy, + docLevelQueries, + queryFieldNames, + listener + ); + } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }, bucketLevelRules.size() + ); for (Pair query : bucketLevelRules) { Rule rule = query.getRight(); if (rule.getAggregationQueries() != null) { // Detect if the monitor should be added or updated if (monitorPerRule.containsKey(rule.getId())) { String monitorId = monitorPerRule.get(rule.getId()); - monitorsToBeUpdated.add(createBucketLevelMonitorRequest(query.getRight(), + createBucketLevelMonitorRequest(query.getRight(), detector, refreshPolicy, monitorId, Method.PUT, - queryBackendMap.get(rule.getCategory()))); + queryBackendMap.get(rule.getCategory()), + new ActionListener<>() { + @Override + public void onResponse(IndexMonitorRequest indexMonitorRequest) { + monitorsToBeUpdated.add(indexMonitorRequest); + groupedActionListener.onResponse(indexMonitorRequest); + } + + @Override + public void onFailure(Exception e) { + log.error("Failed to create bucket level monitor request", e); + listener.onFailure(e); + } + }); } else { - monitorsToBeAdded.add(createBucketLevelMonitorRequest(query.getRight(), + createBucketLevelMonitorRequest(query.getRight(), detector, refreshPolicy, Monitor.NO_ID, Method.POST, - queryBackendMap.get(rule.getCategory()))); + queryBackendMap.get(rule.getCategory()), + new ActionListener<>() { + @Override + public void onResponse(IndexMonitorRequest indexMonitorRequest) { + monitorsToBeAdded.add(indexMonitorRequest); + groupedActionListener.onResponse(indexMonitorRequest); + + } + + @Override + public void onFailure(Exception e) { + log.error("Failed to create bucket level monitor request", e); + listener.onFailure(e); + } + }); } } } - List> docLevelRules = rulesById.stream().filter(it -> !it.getRight().isAggregationRule()).collect( - Collectors.toList()); - - // Process doc level monitors - if (!docLevelRules.isEmpty() || detector.getThreatIntelEnabled()) { - if (detector.getDocLevelMonitorId() == null) { - monitorsToBeAdded.add(createDocLevelMonitorRequest(docLevelRules, docLevelQueries != null? docLevelQueries: List.of(), detector, refreshPolicy, Monitor.NO_ID, Method.POST)); - } else { - monitorsToBeUpdated.add(createDocLevelMonitorRequest(docLevelRules, docLevelQueries != null? docLevelQueries: List.of(), detector, refreshPolicy, detector.getDocLevelMonitorId(), Method.PUT)); - } - } - - List monitorIdsToBeDeleted = detector.getRuleIdMonitorIdMap().values().stream().collect(Collectors.toList()); - monitorIdsToBeDeleted.removeAll(monitorsToBeUpdated.stream().map(IndexMonitorRequest::getMonitorId).collect( - Collectors.toList())); - - updateAlertingMonitors(rulesById, detector, monitorsToBeAdded, monitorsToBeUpdated, monitorIdsToBeDeleted, refreshPolicy, listener); } catch (Exception ex) { listener.onFailure(ex); } @@ -494,23 +538,16 @@ public void onFailure(Exception e) { } }); } else { - List> docLevelRules = rulesById.stream().filter(it -> !it.getRight().isAggregationRule()).collect( - Collectors.toList()); - - // Process doc level monitors - if (!docLevelRules.isEmpty() || detector.getThreatIntelEnabled()) { - if (detector.getDocLevelMonitorId() == null) { - monitorsToBeAdded.add(createDocLevelMonitorRequest(docLevelRules, docLevelQueries != null? docLevelQueries: List.of(), detector, refreshPolicy, Monitor.NO_ID, Method.POST)); - } else { - monitorsToBeUpdated.add(createDocLevelMonitorRequest(docLevelRules, docLevelQueries != null? docLevelQueries: List.of(), detector, refreshPolicy, detector.getDocLevelMonitorId(), Method.PUT)); - } - } - - List monitorIdsToBeDeleted = detector.getRuleIdMonitorIdMap().values().stream().collect(Collectors.toList()); - monitorIdsToBeDeleted.removeAll(monitorsToBeUpdated.stream().map(IndexMonitorRequest::getMonitorId).collect( - Collectors.toList())); - - updateAlertingMonitors(rulesById, detector, monitorsToBeAdded, monitorsToBeUpdated, monitorIdsToBeDeleted, refreshPolicy, listener); + onIndexMonitorRequestCreation( + monitorsToBeUpdated, + monitorsToBeAdded, + rulesById, + detector, + refreshPolicy, + docLevelQueries, + queryFieldNames, + listener + ); } } @@ -521,6 +558,33 @@ public void onFailure(Exception e) { }); } + private void onIndexMonitorRequestCreation(List monitorsToBeUpdated, + List monitorsToBeAdded, + List> rulesById, + Detector detector, + RefreshPolicy refreshPolicy, + List docLevelQueries, + List queryFieldNames, + ActionListener> listener) { + List> docLevelRules = rulesById.stream().filter(it -> !it.getRight().isAggregationRule()).collect( + Collectors.toList()); + + // Process doc level monitors + if (!docLevelRules.isEmpty() || detector.getThreatIntelEnabled()) { + if (detector.getDocLevelMonitorId() == null) { + monitorsToBeAdded.add(createDocLevelMonitorRequest(docLevelRules, docLevelQueries != null? docLevelQueries: List.of(), detector, refreshPolicy, Monitor.NO_ID, Method.POST, queryFieldNames)); + } else { + monitorsToBeUpdated.add(createDocLevelMonitorRequest(docLevelRules, docLevelQueries != null? docLevelQueries: List.of(), detector, refreshPolicy, detector.getDocLevelMonitorId(), Method.PUT, queryFieldNames)); + } + } + + List monitorIdsToBeDeleted = detector.getRuleIdMonitorIdMap().values().stream().collect(Collectors.toList()); + monitorIdsToBeDeleted.removeAll(monitorsToBeUpdated.stream().map(IndexMonitorRequest::getMonitorId).collect( + Collectors.toList())); + + updateAlertingMonitors(rulesById, detector, monitorsToBeAdded, monitorsToBeUpdated, monitorIdsToBeDeleted, refreshPolicy, listener); + } + /** * Update list of monitors for the given detector * Executed in a steps: @@ -663,7 +727,7 @@ public void onFailure(Exception e) { } } - private IndexMonitorRequest createDocLevelMonitorRequest(List> queries, List threatIntelQueries, Detector detector, WriteRequest.RefreshPolicy refreshPolicy, String monitorId, RestRequest.Method restMethod) { + private IndexMonitorRequest createDocLevelMonitorRequest(List> queries, List threatIntelQueries, Detector detector, RefreshPolicy refreshPolicy, String monitorId, Method restMethod, List queryFieldNames) { List docLevelMonitorInputs = new ArrayList<>(); List docLevelQueries = new ArrayList<>(); @@ -673,7 +737,6 @@ private IndexMonitorRequest createDocLevelMonitorRequest(List Rule rule = query.getRight(); String name = query.getLeft(); - String actualQuery = rule.getQueries().get(0).getValue(); List tags = new ArrayList<>(); @@ -681,7 +744,7 @@ private IndexMonitorRequest createDocLevelMonitorRequest(List tags.add(rule.getCategory()); tags.addAll(rule.getTags().stream().map(Value::getValue).collect(Collectors.toList())); - DocLevelQuery docLevelQuery = new DocLevelQuery(id, name, Collections.emptyList(), actualQuery, tags); + DocLevelQuery docLevelQuery = new DocLevelQuery(id, name, Collections.emptyList(), actualQuery, tags, queryFieldNames); docLevelQueries.add(docLevelQuery); } docLevelQueries.addAll(threatIntelQueries); @@ -788,43 +851,75 @@ private IndexMonitorRequest createDocLevelMonitorMatchAllRequest( } private void buildBucketLevelMonitorRequests(List> queries, Detector detector, WriteRequest.RefreshPolicy refreshPolicy, String monitorId, RestRequest.Method restMethod, ActionListener> listener) throws Exception { - + log.debug("bucket level monitor request starting"); + log.debug("get rule field mappings request being made"); logTypeService.getRuleFieldMappings(new ActionListener<>() { @Override public void onResponse(Map> ruleFieldMappings) { - try { + log.debug("got rule field mapping success"); List ruleCategories = queries.stream().map(Pair::getRight).map(Rule::getCategory).distinct().collect( Collectors.toList()); Map queryBackendMap = new HashMap<>(); for(String category: ruleCategories) { Map fieldMappings = ruleFieldMappings.get(category); - queryBackendMap.put(category, new OSQueryBackend(fieldMappings, true, true)); + try { + queryBackendMap.put(category, new OSQueryBackend(fieldMappings, true, true)); + } catch (IOException e) { + logger.error("Failed to create OSQueryBackend from field mappings", e); + listener.onFailure(e); + } } List monitorRequests = new ArrayList<>(); + GroupedActionListener bucketLevelMonitorRequestsListener = new GroupedActionListener<>( + new ActionListener<>() { + @Override + public void onResponse(Collection indexMonitorRequests) { + // if workflow usage enabled, add chained findings monitor request if there are bucket level requests and if the detector triggers have any group by rules configured to trigger + if (enabledWorkflowUsage && !monitorRequests.isEmpty() && !DetectorUtils.getAggRuleIdsConfiguredToTrigger(detector, queries).isEmpty()) { + monitorRequests.add(createDocLevelMonitorMatchAllRequest(detector, RefreshPolicy.IMMEDIATE, detector.getId() + "_chained_findings", Method.POST)); + } + listener.onResponse(monitorRequests); + } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }, queries.size() + ); for (Pair query: queries) { Rule rule = query.getRight(); // Creating bucket level monitor per each aggregation rule - if (rule.getAggregationQueries() != null){ - monitorRequests.add(createBucketLevelMonitorRequest( + if (rule.getAggregationQueries() != null) { + createBucketLevelMonitorRequest( query.getRight(), detector, refreshPolicy, - Monitor.NO_ID, - Method.POST, - queryBackendMap.get(rule.getCategory()))); + monitorId, + restMethod, + queryBackendMap.get(rule.getCategory()), + new ActionListener<>() { + @Override + public void onResponse(IndexMonitorRequest indexMonitorRequest) { + monitorRequests.add(indexMonitorRequest); + bucketLevelMonitorRequestsListener.onResponse(indexMonitorRequest); + } + + + @Override + public void onFailure(Exception e) { + logger.error("Failed to build bucket level monitor requests", e); + bucketLevelMonitorRequestsListener.onFailure(e); + } + }); + + } else { + log.debug("Aggregation query is null in rule {}", rule.getId()); + bucketLevelMonitorRequestsListener.onResponse(null); } } - // if workflow usage enabled, add chained findings monitor request if there are bucket level requests and if the detector triggers have any group by rules configured to trigger - if (enabledWorkflowUsage && !monitorRequests.isEmpty() && !DetectorUtils.getAggRuleIdsConfiguredToTrigger(detector, queries).isEmpty()) { - monitorRequests.add(createDocLevelMonitorMatchAllRequest(detector, RefreshPolicy.IMMEDIATE, detector.getId()+"_chained_findings", Method.POST)); - } - listener.onResponse(monitorRequests); - } catch (Exception ex) { - listener.onFailure(ex); - } } @Override @@ -834,94 +929,110 @@ public void onFailure(Exception e) { }); } - private IndexMonitorRequest createBucketLevelMonitorRequest( + private void createBucketLevelMonitorRequest( Rule rule, Detector detector, WriteRequest.RefreshPolicy refreshPolicy, String monitorId, RestRequest.Method restMethod, - QueryBackend queryBackend - ) throws SigmaError { - + QueryBackend queryBackend, + ActionListener listener + ) { + log.debug(":create bucket level monitor response starting"); List indices = detector.getInputs().get(0).getIndices(); - - AggregationItem aggItem = rule.getAggregationItemsFromRule().get(0); - AggregationQueries aggregationQueries = queryBackend.convertAggregation(aggItem); - - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() - .seqNoAndPrimaryTerm(true) - .version(true) - // Build query string filter - .query(QueryBuilders.queryStringQuery(rule.getQueries().get(0).getValue())) - .aggregation(aggregationQueries.getAggBuilder()); - // input index can also be an index pattern or alias so we have to resolve it to concrete index - String concreteIndex = IndexUtils.getNewIndexByCreationDate( - clusterService.state(), - indexNameExpressionResolver, - indices.get(0) // taking first one is fine because we expect that all indices in list share same mappings - ); try { - GetIndexMappingsResponse getIndexMappingsResponse = client.execute( + AggregationItem aggItem = rule.getAggregationItemsFromRule().get(0); + AggregationQueries aggregationQueries = queryBackend.convertAggregation(aggItem); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .seqNoAndPrimaryTerm(true) + .version(true) + // Build query string filter + .query(QueryBuilders.queryStringQuery(rule.getQueries().get(0).getValue())) + .aggregation(aggregationQueries.getAggBuilder()); + // input index can also be an index pattern or alias so we have to resolve it to concrete index + String concreteIndex = IndexUtils.getNewIndexByCreationDate( + clusterService.state(), + indexNameExpressionResolver, + indices.get(0) // taking first one is fine because we expect that all indices in list share same mappings + ); + client.execute( GetIndexMappingsAction.INSTANCE, - new GetIndexMappingsRequest(concreteIndex)) - .actionGet(); - MappingMetadata mappingMetadata = getIndexMappingsResponse.mappings().get(concreteIndex); - List> pairs = MapperUtils.getAllAliasPathPairs(mappingMetadata); - boolean timeStampAliasPresent = pairs. - stream() - .anyMatch(p -> - TIMESTAMP_FIELD_ALIAS.equals(p.getLeft()) || TIMESTAMP_FIELD_ALIAS.equals(p.getRight())); - if(timeStampAliasPresent) { - BoolQueryBuilder boolQueryBuilder = searchSourceBuilder.query() == null - ? new BoolQueryBuilder() - : QueryBuilders.boolQuery().must(searchSourceBuilder.query()); - RangeQueryBuilder timeRangeFilter = QueryBuilders.rangeQuery(TIMESTAMP_FIELD_ALIAS) - .gt("{{period_end}}||-" + (aggItem.getTimeframe() != null? aggItem.getTimeframe(): "1h")) - .lte("{{period_end}}") - .format("epoch_millis"); - boolQueryBuilder.must(timeRangeFilter); - searchSourceBuilder.query(boolQueryBuilder); - } - } catch (Exception e) { - log.error( - String.format(Locale.getDefault(), - "Unable to verify presence of timestamp alias for index [%s] in detector [%s]. Not setting time range filter for bucket level monitor.", - concreteIndex, detector.getName()), e); - } - - List bucketLevelMonitorInputs = new ArrayList<>(); - bucketLevelMonitorInputs.add(new SearchInput(indices, searchSourceBuilder)); - - List triggers = new ArrayList<>(); - BucketLevelTrigger bucketLevelTrigger = new BucketLevelTrigger(rule.getId(), rule.getTitle(), rule.getLevel(), aggregationQueries.getCondition(), - Collections.emptyList()); - triggers.add(bucketLevelTrigger); - - /** TODO - Think how to use detector trigger - List detectorTriggers = detector.getTriggers(); - for (DetectorTrigger detectorTrigger: detectorTriggers) { - String id = detectorTrigger.getId(); - String name = detectorTrigger.getName(); - String severity = detectorTrigger.getSeverity(); - List actions = detectorTrigger.getActions(); - Script condition = detectorTrigger.convertToCondition(); - - BucketLevelTrigger bucketLevelTrigger1 = new BucketLevelTrigger(id, name, severity, condition, actions); - triggers.add(bucketLevelTrigger1); - } **/ - - Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, detector.getName(), false, detector.getSchedule(), detector.getLastUpdateTime(), null, - MonitorType.BUCKET_LEVEL_MONITOR, detector.getUser(), 1, bucketLevelMonitorInputs, triggers, Map.of(), - new DataSources(detector.getRuleIndex(), - detector.getFindingsIndex(), - detector.getFindingsIndexPattern(), - detector.getAlertsIndex(), - detector.getAlertsHistoryIndex(), - detector.getAlertsHistoryIndexPattern(), - DetectorMonitorConfig.getRuleIndexMappingsByType(), - true), PLUGIN_OWNER_FIELD); + new GetIndexMappingsRequest(concreteIndex), + new ActionListener() { + @Override + public void onResponse(GetIndexMappingsResponse getIndexMappingsResponse) { + MappingMetadata mappingMetadata = getIndexMappingsResponse.mappings().get(concreteIndex); + List> pairs = null; + try { + pairs = MapperUtils.getAllAliasPathPairs(mappingMetadata); + } catch (IOException e) { + logger.debug("Failed to get alias path pairs from mapping metadata", e); + onFailure(e); + } + boolean timeStampAliasPresent = pairs. + stream() + .anyMatch(p -> + TIMESTAMP_FIELD_ALIAS.equals(p.getLeft()) || TIMESTAMP_FIELD_ALIAS.equals(p.getRight())); + if (timeStampAliasPresent) { + BoolQueryBuilder boolQueryBuilder = searchSourceBuilder.query() == null + ? new BoolQueryBuilder() + : QueryBuilders.boolQuery().must(searchSourceBuilder.query()); + RangeQueryBuilder timeRangeFilter = QueryBuilders.rangeQuery(TIMESTAMP_FIELD_ALIAS) + .gt("{{period_end}}||-" + (aggItem.getTimeframe() != null ? aggItem.getTimeframe() : "1h")) + .lte("{{period_end}}") + .format("epoch_millis"); + boolQueryBuilder.must(timeRangeFilter); + searchSourceBuilder.query(boolQueryBuilder); + } + List bucketLevelMonitorInputs = new ArrayList<>(); + bucketLevelMonitorInputs.add(new SearchInput(indices, searchSourceBuilder)); + + List triggers = new ArrayList<>(); + BucketLevelTrigger bucketLevelTrigger = new BucketLevelTrigger(rule.getId(), rule.getTitle(), rule.getLevel(), aggregationQueries.getCondition(), + Collections.emptyList()); + triggers.add(bucketLevelTrigger); + + /** TODO - Think how to use detector trigger + List detectorTriggers = detector.getTriggers(); + for (DetectorTrigger detectorTrigger: detectorTriggers) { + String id = detectorTrigger.getId(); + String name = detectorTrigger.getName(); + String severity = detectorTrigger.getSeverity(); + List actions = detectorTrigger.getActions(); + Script condition = detectorTrigger.convertToCondition(); + + BucketLevelTrigger bucketLevelTrigger1 = new BucketLevelTrigger(id, name, severity, condition, actions); + triggers.add(bucketLevelTrigger1); + } **/ + + Monitor monitor = new Monitor(monitorId, Monitor.NO_VERSION, detector.getName(), false, detector.getSchedule(), detector.getLastUpdateTime(), null, + MonitorType.BUCKET_LEVEL_MONITOR, detector.getUser(), 1, bucketLevelMonitorInputs, triggers, Map.of(), + new DataSources(detector.getRuleIndex(), + detector.getFindingsIndex(), + detector.getFindingsIndexPattern(), + detector.getAlertsIndex(), + detector.getAlertsHistoryIndex(), + detector.getAlertsHistoryIndexPattern(), + DetectorMonitorConfig.getRuleIndexMappingsByType(), + true), PLUGIN_OWNER_FIELD); + + listener.onResponse(new IndexMonitorRequest(monitorId, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, refreshPolicy, restMethod, monitor, null)); + } - return new IndexMonitorRequest(monitorId, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, refreshPolicy, restMethod, monitor, null); + @Override + public void onFailure(Exception e) { + log.error( + String.format(Locale.getDefault(), + "Unable to verify presence of timestamp alias for index [%s] in detector [%s]. Not setting time range filter for bucket level monitor.", + concreteIndex, detector.getName()), e); + listener.onFailure(e); + } + }); + } catch (SigmaError e) { + log.error("Failed to create bucket level monitor request", e); + listener.onFailure(e); + } } /** @@ -996,21 +1107,27 @@ class AsyncIndexDetectorsAction { } void start() { + log.debug("stash context"); TransportIndexDetectorAction.this.threadPool.getThreadContext().stashContext(); - + log.debug("log type check : {}", request.getDetector().getDetectorType()); logTypeService.doesLogTypeExist(request.getDetector().getDetectorType().toLowerCase(Locale.ROOT), new ActionListener<>() { @Override public void onResponse(Boolean exist) { if (exist) { + log.debug("log type exists : {}", request.getDetector().getDetectorType()); try { if (!detectorIndices.detectorIndexExists()) { + log.debug("detector index creation"); detectorIndices.initDetectorIndex(new ActionListener<>() { @Override public void onResponse(CreateIndexResponse response) { try { + log.debug("detector index created in {}"); + onCreateMappingsResponse(response); prepareDetectorIndexing(); } catch (Exception e) { + log.debug("detector index creation failed", e); onFailures(e); } } @@ -1021,16 +1138,19 @@ public void onFailure(Exception e) { } }); } else if (!IndexUtils.detectorIndexUpdated) { + log.debug("detector index update mapping"); IndexUtils.updateIndexMapping( Detector.DETECTORS_INDEX, DetectorIndices.detectorMappings(), clusterService.state(), client.admin().indices(), new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse response) { + log.debug("detector index mapping updated"); onUpdateMappingsResponse(response); try { prepareDetectorIndexing(); } catch (Exception e) { + log.debug("detector index mapping FAILED updation", e); onFailures(e); } } @@ -1088,24 +1208,28 @@ void createDetector() { if (!detector.getInputs().isEmpty()) { try { + log.debug("init rule index template"); ruleTopicIndices.initRuleTopicIndexTemplate(new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { - + log.debug("init rule index template ack"); initRuleIndexAndImportRules(request, new ActionListener<>() { @Override public void onResponse(List monitorResponses) { + log.debug("monitors indexed"); request.getDetector().setMonitorIds(getMonitorIds(monitorResponses)); request.getDetector().setRuleIdMonitorIdMap(mapMonitorIds(monitorResponses)); try { indexDetector(); } catch (Exception e) { + logger.debug("create detector failed", e); onFailures(e); } } @Override public void onFailure(Exception e) { + logger.debug("import rules failed", e); onFailures(e); } }); @@ -1113,10 +1237,12 @@ public void onFailure(Exception e) { @Override public void onFailure(Exception e) { + logger.debug("init rules index failed", e); onFailures(e); } }); } catch (Exception e) { + logger.debug("init rules index failed", e); onFailures(e); } } @@ -1233,11 +1359,13 @@ public void initRuleIndexAndImportRules(IndexDetectorRequest request, ActionList new ActionListener<>() { @Override public void onResponse(CreateIndexResponse response) { + log.debug("prepackaged rule index created"); ruleIndices.onCreateMappingsResponse(response, true); ruleIndices.importRules(RefreshPolicy.IMMEDIATE, indexTimeout, new ActionListener<>() { @Override public void onResponse(BulkResponse response) { + log.debug("rules imported"); if (!response.hasFailures()) { importRules(request, listener); } else { @@ -1247,6 +1375,7 @@ public void onResponse(BulkResponse response) { @Override public void onFailure(Exception e) { + log.debug("failed to import rules", e); onFailures(e); } }); @@ -1358,13 +1487,14 @@ public void importRules(IndexDetectorRequest request, ActionListener() { @Override public void onResponse(SearchResponse response) { if (response.isTimedOut()) { onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); } + logger.debug("prepackaged rules fetch success"); SearchHits hits = response.getHits(); List> queries = new ArrayList<>(); @@ -1387,13 +1517,10 @@ public void onResponse(SearchResponse response) { } else if (detectorInput.getCustomRules().size() > 0) { onFailures(new OpenSearchStatusException("Custom Rule Index not found", RestStatus.NOT_FOUND)); } else { - if (request.getMethod() == RestRequest.Method.POST) { - createMonitorFromQueries(queries, detector, listener, request.getRefreshPolicy()); - } else if (request.getMethod() == RestRequest.Method.PUT) { - updateMonitorFromQueries(logIndex, queries, detector, listener, request.getRefreshPolicy()); - } + resolveRuleFieldNamesAndUpsertMonitorFromQueries(queries, detector, logIndex, listener); } } catch (Exception e) { + logger.debug("failed to fetch prepackaged rules", e); onFailures(e); } } @@ -1405,6 +1532,56 @@ public void onFailure(Exception e) { }); } + private void resolveRuleFieldNamesAndUpsertMonitorFromQueries(List> queries, Detector detector, String logIndex, ActionListener> listener) { + logger.error("PERF_DEBUG_SAP: Fetching alias path pairs to construct rule_field_names"); + long start = System.currentTimeMillis(); + Set ruleFieldNames = new HashSet<>(); + for (Pair query : queries) { + List queryFieldNames = query.getValue().getQueryFieldNames().stream().map(Value::getValue).collect(Collectors.toList()); + ruleFieldNames.addAll(queryFieldNames); + } + client.execute(GetIndexMappingsAction.INSTANCE, new GetIndexMappingsRequest(logIndex), new ActionListener<>() { + @Override + public void onResponse(GetIndexMappingsResponse getMappingsViewResponse) { + try { + List> aliasPathPairs; + + aliasPathPairs = MapperUtils.getAllAliasPathPairs(getMappingsViewResponse.getMappings().get(logIndex)); + for (Pair aliasPathPair : aliasPathPairs) { + if (ruleFieldNames.contains(aliasPathPair.getLeft())) { + ruleFieldNames.remove(aliasPathPair.getLeft()); + ruleFieldNames.add(aliasPathPair.getRight()); + } + } + long took = System.currentTimeMillis() - start; + log.debug("completed collecting rule_field_names in {} millis", took); + + } catch (Exception e) { + logger.error("Failure in parsing rule field names/aliases while " + + detector.getId() == null ? "creating" : "updating" + + " detector. Not optimizing detector queries with relevant fields", e); + ruleFieldNames.clear(); + } + upsertMonitorQueries(queries, detector, listener, ruleFieldNames, logIndex); + + } + + @Override + public void onFailure(Exception e) { + log.error("Failed to fetch mappings view response for log index " + logIndex, e); + listener.onFailure(e); + } + }); + } + + private void upsertMonitorQueries(List> queries, Detector detector, ActionListener> listener, Set ruleFieldNames, String logIndex) { + if (request.getMethod() == Method.POST) { + createMonitorFromQueries(queries, detector, listener, request.getRefreshPolicy(), new ArrayList<>(ruleFieldNames)); + } else if (request.getMethod() == Method.PUT) { + updateMonitorFromQueries(logIndex, queries, detector, listener, request.getRefreshPolicy(), new ArrayList<>(ruleFieldNames)); + } + } + @SuppressWarnings("unchecked") public void importCustomRules(Detector detector, DetectorInput detectorInput, List> queries, ActionListener> listener) { final String logIndex = detectorInput.getIndices().get(0); @@ -1418,14 +1595,14 @@ public void importCustomRules(Detector detector, DetectorInput detectorInput, Li .query(queryBuilder) .size(10000)) .preference(Preference.PRIMARY_FIRST.type()); - + logger.debug("importing custom rules"); client.search(searchRequest, new ActionListener<>() { @Override public void onResponse(SearchResponse response) { if (response.isTimedOut()) { onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); } - + logger.debug("custom rules fetch successful"); SearchHits hits = response.getHits(); try { @@ -1441,11 +1618,7 @@ public void onResponse(SearchResponse response) { queries.add(Pair.of(id, rule)); } - if (request.getMethod() == RestRequest.Method.POST) { - createMonitorFromQueries(queries, detector, listener, request.getRefreshPolicy()); - } else if (request.getMethod() == RestRequest.Method.PUT) { - updateMonitorFromQueries(logIndex, queries, detector, listener, request.getRefreshPolicy()); - } + resolveRuleFieldNamesAndUpsertMonitorFromQueries(queries, detector, logIndex, listener); } catch (Exception ex) { onFailures(ex); } @@ -1473,10 +1646,11 @@ public void indexDetector() throws Exception { .id(request.getDetectorId()) .timeout(indexTimeout); } - + log.debug("indexing detector"); client.index(indexRequest, new ActionListener<>() { @Override public void onResponse(IndexResponse response) { + log.debug("detector indexed success."); Detector responseDetector = request.getDetector(); responseDetector.setId(response.getId()); onOperation(response, responseDetector); diff --git a/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorMonitorRestApiIT.java b/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorMonitorRestApiIT.java index 3a11300ee..8de88a717 100644 --- a/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorMonitorRestApiIT.java +++ b/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorMonitorRestApiIT.java @@ -2056,7 +2056,7 @@ public void testCreateDetectorWithCloudtrailAggrRuleWithEcsFields() throws IOExc // both req params and req body are supported createMappingRequest.setJsonEntity( "{\n" + - " \"index_name\": \"" + index + "\",\n" + + " \"index_name\": \"cloudtrail\",\n" + " \"rule_topic\": \"cloudtrail\",\n" + " \"partial\": true,\n" + " \"alias_mappings\": {\n" +