Skip to content

[backend] Various performance improvments #10971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions opencti-platform/opencti-graphql/src/config/conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ export const BUS_TOPICS = {
},
[M.ENTITY_TYPE_MARKING_DEFINITION]: {
EDIT_TOPIC: `${TOPIC_PREFIX}MARKING_DEFINITION_EDIT_TOPIC`,
DELETE_TOPIC: `${TOPIC_PREFIX}MARKING_DEFINITION_EDIT_TOPIC`,
ADDED_TOPIC: `${TOPIC_PREFIX}MARKING_DEFINITION_ADDED_TOPIC`,
},
[M.ENTITY_TYPE_LABEL]: {
Expand Down
36 changes: 22 additions & 14 deletions opencti-platform/opencti-graphql/src/database/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -1624,7 +1624,7 @@ export const computeQueryIndices = (indices, typeOrTypes, withInferences = true)
export const elFindByIds = async (context, user, ids, opts = {}) => {
const { indices, baseData = false, baseFields = BASE_FIELDS } = opts;
const { withoutRels = false, toMap = false, mapWithAllIds = false, type = null } = opts;
const { orderBy = 'created_at', orderMode = 'asc' } = opts;
const { orderBy = null, orderMode = 'asc' } = opts;
const idsArray = Array.isArray(ids) ? ids : [ids];
const types = (Array.isArray(type) || isEmptyField(type)) ? type : [type];
const processIds = R.filter((id) => isNotEmptyField(id), idsArray);
Expand Down Expand Up @@ -1653,6 +1653,7 @@ export const elFindByIds = async (context, user, ids, opts = {}) => {
};
mustTerms.push(should);
if (types && types.length > 0) {
// No cache management is possible, just put the type in the filtering
const shouldType = {
bool: {
should: [
Expand All @@ -1671,14 +1672,22 @@ export const elFindByIds = async (context, user, ids, opts = {}) => {
// Handle draft
const draftMust = buildDraftFilter(context, user, opts);
const body = {
sort: [{ [orderBy]: orderMode }],
query: {
bool: {
must: [...mustTerms, ...draftMust],
must_not: markingRestrictions.must_not,
},
// Put everything under filter to prevent score computation
// Search without score when no sort is applied is faster
filter: [{
bool: {
must: [...mustTerms, ...draftMust],
must_not: markingRestrictions.must_not,
},
}]
}
},
};
if (isNotEmptyField(orderBy)) {
body.sort = [{ [orderBy]: orderMode }];
}
let searchAfter;
let hasNextPage = true;
while (hasNextPage) {
Expand All @@ -1688,10 +1697,11 @@ export const elFindByIds = async (context, user, ids, opts = {}) => {
const query = {
index: computedIndices,
size: ES_MAX_PAGINATION,
track_total_hits: false,
_source: baseData ? baseFields : true,
body,
};
logApp.debug('[SEARCH] elInternalLoadById', { query });
logApp.debug('[SEARCH] elFindByIds', { query });
const searchType = `${ids} (${types ? types.join(', ') : 'Any'})`;
const data = await elRawSearch(context, user, searchType, query).catch((err) => {
throw DatabaseError('Find direct ids fail', { cause: err, query });
Expand Down Expand Up @@ -3033,13 +3043,13 @@ const elQueryBodyBuilder = async (context, user, options) => {
let scoreSearchOrder = orderMode;
if (search !== null && search.length > 0) {
const shouldSearch = elGenerateFullTextSearchShould(search, options);
const bool = {
const searchBool = {
bool: {
should: shouldSearch,
minimum_should_match: 1,
},
};
mustFilters.push(bool);
mustFilters.push(searchBool);
// When using a search, force a score ordering if nothing specified
if (orderCriterion.length === 0) {
orderCriterion.unshift('_score');
Expand All @@ -3059,9 +3069,7 @@ const elQueryBodyBuilder = async (context, user, options) => {
}
}
// Add standard_id if not specify to ensure ordering uniqueness
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this comment is not relevant anymore?

if (!orderCriterion.includes('standard_id')) {
ordering.push({ 'standard_id.keyword': 'asc' });
}
ordering.push({ _doc: 'asc' });
// Build runtime mappings
const runtime = RUNTIME_ATTRIBUTES[orderBy];
if (isNotEmptyField(runtime)) {
Expand All @@ -3072,8 +3080,8 @@ const elQueryBodyBuilder = async (context, user, options) => {
script: { source, params },
};
}
} else { // If not ordering criteria, order by standard_id
ordering.push({ 'standard_id.keyword': 'asc' });
} else { // If not ordering criteria, order by _doc
ordering.push({ _doc: 'asc' });
Comment on lines +3083 to +3084
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure to understand this else case. We are doing the same thing we did line 3072, so is it useful?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it could be simplified by adding ordering.push({ _doc: 'asc' }); all the time, by moving line 3072 to 3084 and removing the else condition.

}
// Handle draft
const draftMust = buildDraftFilter(context, user, options);
Expand Down Expand Up @@ -3405,7 +3413,7 @@ export const elAggregationsList = async (context, user, indexName, aggregations,
}
const query = {
index: getIndicesToQuery(context, user, indexName),
track_total_hits: true,
track_total_hits: false,
_source: false,
body,
};
Expand Down
20 changes: 11 additions & 9 deletions opencti-platform/opencti-graphql/src/database/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { ChainableCommander } from 'ioredis/built/utils/RedisCommander';
import type { ClusterOptions } from 'ioredis/built/cluster/ClusterOptions';
import type { SentinelConnectionOptions } from 'ioredis/built/connectors/SentinelConnector';
import conf, { booleanConf, configureCA, DEV_MODE, getStoppingState, loadCert, logApp, REDIS_PREFIX } from '../config/conf';
import { asyncListTransformation, EVENT_TYPE_CREATE, EVENT_TYPE_DELETE, EVENT_TYPE_MERGE, EVENT_TYPE_UPDATE, isEmptyField, isNotEmptyField, waitInSec } from './utils';
import { asyncListTransformation, EVENT_TYPE_CREATE, EVENT_TYPE_DELETE, EVENT_TYPE_MERGE, EVENT_TYPE_UPDATE, isEmptyField, isNotEmptyField, wait, waitInSec } from './utils';
import { isStixExportableData } from '../schema/stixCoreObject';
import { DatabaseError, LockTimeoutError, TYPE_LOCK_ERROR, UnsupportedError } from '../config/errors';
import { mergeDeepRightAll, now, utcDate } from '../utils/format';
Expand All @@ -29,6 +29,7 @@ import { enrichWithRemoteCredentials } from '../config/credentials';
import { getDraftContext } from '../utils/draftContext';
import type { ExclusionListCacheItem } from './exclusionListCache';
import { refreshLocalCacheForEntity } from './cache';
import { asyncMap } from '../utils/data-processing';

const USE_SSL = booleanConf('redis:use_ssl', false);
const REDIS_CA = conf.get('redis:ca').map((path: string) => loadCert(path));
Expand Down Expand Up @@ -674,12 +675,11 @@ export const fetchStreamInfo = async () => {
};

const processStreamResult = async (results: Array<any>, callback: any, withInternal: boolean | undefined) => {
const streamData = R.map((r) => mapStreamToJS(r), results);
const filteredEvents = streamData.filter((s) => {
return withInternal ? true : (s.data.scope ?? 'external') === 'external';
});
const lastEventId = filteredEvents.length > 0 ? R.last(filteredEvents)?.id : `${new Date().valueOf()}-0`;
await callback(filteredEvents, lastEventId);
const transform = (r: any) => mapStreamToJS(r);
const filter = (s: any) => (withInternal ? true : (s.data.scope ?? 'external') === 'external');
const events = await asyncMap(results, transform, filter);
const lastEventId = events.length > 0 ? R.last(events)?.id : `${new Date().valueOf()}-0`;
await callback(events, lastEventId);
return lastEventId;
};

Expand All @@ -695,12 +695,13 @@ export interface StreamProcessor {

interface StreamOption {
withInternal?: boolean;
bufferTime?: number;
autoReconnect?: boolean;
streamName?: string;
}

export const createStreamProcessor = <T extends BaseEvent> (
user: AuthUser,
_user: AuthUser,
provider: string,
callback: (events: Array<SseEvent<T>>, lastEventId: string) => void,
opts: StreamOption = {}
Expand Down Expand Up @@ -738,10 +739,11 @@ export const createStreamProcessor = <T extends BaseEvent> (
} else {
await processStreamResult([], callback, opts.withInternal);
}
await wait(opts.bufferTime ?? 50);
} catch (err) {
logApp.error('Redis stream consume fail', { cause: err, provider });
if (opts.autoReconnect) {
await waitInSec(2);
await waitInSec(5);
} else {
return false;
}
Expand Down
7 changes: 4 additions & 3 deletions opencti-platform/opencti-graphql/src/domain/connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { createEntity, deleteElementById, internalDeleteElementById, patchAttrib
import { type GetHttpClient, getHttpClient } from '../utils/http-client';
import { completeConnector, connector, connectors, connectorsFor } from '../database/repository';
import { getConnectorQueueDetails, purgeConnectorQueues, registerConnectorQueues, unregisterConnector, unregisterExchanges } from '../database/rabbitmq';
import { ENTITY_TYPE_CONNECTOR, ENTITY_TYPE_SYNC, ENTITY_TYPE_WORK } from '../schema/internalObject';
import { ENTITY_TYPE_CONNECTOR, ENTITY_TYPE_SYNC, ENTITY_TYPE_USER, ENTITY_TYPE_WORK } from '../schema/internalObject';
import { FunctionalError, ValidationError } from '../config/errors';
import { validateFilterGroupForStixMatch } from '../utils/filtering/filtering-stix/stix-filtering';
import { isFilterGroupNotEmpty } from '../utils/filtering/filtering-utils';
Expand All @@ -30,12 +30,12 @@ import {
import { BUS_TOPICS, logApp } from '../config/conf';
import { deleteWorkForConnector } from './work';
import { testSync as testSyncUtils } from './connector-utils';
import { findById } from './user';
import { defaultValidationMode, loadFile, uploadJobImport } from '../database/file-storage';
import { controlUserConfidenceAgainstElement } from '../utils/confidence-level';
import { extractEntityRepresentativeName } from '../database/entity-representative';
import type { BasicStoreCommon } from '../types/store';
import type { Connector } from '../connector/internalConnector';
import { getEntitiesMapFromCache } from '../database/cache';

// region connectors
export const connectorForWork = async (context: AuthContext, user: AuthUser, id: string) => {
Expand Down Expand Up @@ -374,7 +374,8 @@ export const queueDetails = async (connectorId: string) => {

export const connectorUser = async (context: AuthContext, user: AuthUser, userId: string) => {
if (isUserHasCapability(user, SETTINGS_SET_ACCESSES)) {
return findById(context, user, userId);
const platformUsers = await getEntitiesMapFromCache(context, SYSTEM_USER, ENTITY_TYPE_USER);
return platformUsers.get(userId);
}
return null;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export const markingDefinitionDelete = async (context, user, markingDefinitionId
const element = await deleteElementById(context, user, markingDefinitionId, ENTITY_TYPE_MARKING_DEFINITION);
// users of group impacted must be refreshed
await notifyMembersOfNewMarking(context, user, element);
return markingDefinitionId;
return notify(BUS_TOPICS[ENTITY_TYPE_MARKING_DEFINITION].DELETE_TOPIC, element, user).then(() => markingDefinitionId);
};

export const markingDefinitionEditField = async (context, user, markingDefinitionId, input, opts = {}) => {
Expand Down
15 changes: 9 additions & 6 deletions opencti-platform/opencti-graphql/src/graphql/telemetryPlugin.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { head, includes } from 'ramda';
import { stripIgnoredCharacters } from 'graphql/utilities';
import { meterManager } from '../config/tracing';
import { AUTH_FAILURE, AUTH_REQUIRED, FORBIDDEN_ACCESS } from '../config/errors';
import { isEmptyField } from '../database/utils';
import { logApp } from '../config/conf';

const getRequestError = (context) => {
const isSuccess = isEmptyField(context.errors) || context.errors.length === 0;
Expand All @@ -25,16 +27,17 @@ export default {
willSendResponse: async (sendContext) => {
const requestError = getRequestError(sendContext);
let operationAttributes;
const operationName = sendContext.operationName ?? 'Unspecified';
const operation = sendContext.operation?.operation ?? 'query';
if (operationName === 'Unspecified') {
logApp.error('TELEMETRY PLUGIN UNDEFINED OPERATION', { query: stripIgnoredCharacters(sendContext.request.query) });
}
if (requestError) {
const operation = sendContext.request.query.startsWith('mutation') ? 'mutation' : 'query';
const operationName = sendContext.request.operationName ?? 'Unspecified';
const type = sendContext.response.body.singleResult.errors.at(0)?.name ?? requestError.name;
operationAttributes = { operation, name: operationName, type };
operationAttributes = { operation, name: operationName, status: 'ERROR', type };
meterManager.error(operationAttributes);
} else {
const operation = sendContext.operation?.operation ?? 'query';
const operationName = sendContext.operationName ?? 'Unspecified';
operationAttributes = { operation, name: operationName };
operationAttributes = { operation, name: operationName, status: 'SUCCESS' };
meterManager.request(operationAttributes);
}
const stop = Date.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ const initActivityManager = () => {
lock = await lockResources([ACTIVITY_ENGINE_KEY], { retryCount: 0 });
running = true;
logApp.info('[OPENCTI-MODULE] Running activity manager');
const streamOpts = { streamName: ACTIVITY_STREAM_NAME };
const streamOpts = { streamName: ACTIVITY_STREAM_NAME, bufferTime: 5000 };
streamProcessor = createStreamProcessor(SYSTEM_USER, 'Activity manager', activityStreamHandler, streamOpts);
await streamProcessor.start(lastEventId);
while (!shutdown && streamProcessor.running()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ const initFileIndexManager = () => {
lock = await lockResources([FILE_INDEX_MANAGER_STREAM_KEY], { retryCount: 0 });
running = true;
logApp.info('[OPENCTI-MODULE] Running file index manager stream handler');
streamProcessor = createStreamProcessor(SYSTEM_USER, 'File index manager', handleStreamEvents);
streamProcessor = createStreamProcessor(SYSTEM_USER, 'File index manager', handleStreamEvents, { bufferTime: 5000 });
await streamProcessor.start('live');
while (!shutdown && streamProcessor.running()) {
lock.signal.throwIfAborted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ const initHistoryManager = () => {
lock = await lockResources([HISTORY_ENGINE_KEY], { retryCount: 0 });
running = true;
logApp.info('[OPENCTI-MODULE] Running history manager');
streamProcessor = createStreamProcessor(SYSTEM_USER, 'History manager', historyStreamHandler);
streamProcessor = createStreamProcessor(SYSTEM_USER, 'History manager', historyStreamHandler, { bufferTime: 5000 });
await streamProcessor.start(lastEventId);
while (!shutdown && streamProcessor.running()) {
lock.signal.throwIfAborted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface ManagerStreamScheduler {
handler: () => void;
interval: number;
lockKey: string;
streamOpts?: { withInternal: boolean, streamName: string };
streamOpts?: { withInternal: boolean, streamName: string, bufferTime: number };
streamProcessorStartFrom: () => string;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ const initPublisherManager = () => {
lock = await lockResources([PUBLISHER_ENGINE_KEY], { retryCount: 0 });
running = true;
logApp.info('[OPENCTI-PUBLISHER] Running publisher manager');
const opts = { withInternal: false, streamName: NOTIFICATION_STREAM_NAME };
const opts = { withInternal: false, streamName: NOTIFICATION_STREAM_NAME, bufferTime: 5000 };
streamProcessor = createStreamProcessor(SYSTEM_USER, 'Publisher manager', publisherStreamHandler, opts);
await streamProcessor.start('live');
while (!shutdown && streamProcessor.running()) {
Expand Down
20 changes: 20 additions & 0 deletions opencti-platform/opencti-graphql/src/utils/data-processing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,23 @@ export const asyncFilter = async <T>(elements: T[], predicate: (value: T, index:
}
return filtered;
};

export const asyncMap = async <T, Z>(elements: T[], transform: (value: T) => Z, filter?: (value: Z) => boolean) => {
const transformed: Z[] = [];
let startProcessingTime = new Date().getTime();
for (let index = 0; index < elements.length; index += 1) {
const element = elements[index];
const item = transform(element);
if (!filter || filter(item)) {
transformed.push(item);
}
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
startProcessingTime = new Date().getTime();
await new Promise((resolve) => {
setImmediate(resolve);
});
}
}
return transformed;
};
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,12 @@ describe('Elasticsearch pagination', () => {
expect(data.edges.length).toEqual(2);
});
it('should entity paginate everything after', async () => {
const page = await elPaginate(testContext, ADMIN_USER, READ_ENTITIES_INDICES, {
first: ES_MAX_PAGINATION,
});
const last = page.edges[page.edges.length - 3];
const data = await elPaginate(testContext, ADMIN_USER, READ_ENTITIES_INDICES, {
after: 'WyJ2b2NhYnVsYXJ5LS1mZGYyNTVhOC01ZjM3LTVmZWMtYWRmYS0xZGYwYjdkM2QwY2UiXQ==',
after: last.cursor,
first: ES_MAX_PAGINATION,
});
expect(data).not.toBeNull();
Expand All @@ -479,14 +483,17 @@ describe('Elasticsearch pagination', () => {
expect(data).not.toBeNull();
const entityTypeMap = mapEdgesCountPerEntityType(data);
expect(entityTypeMap.get('Report')).toBe(1);
expect(entityTypeMap.get('Attack-Pattern')).toBe(2);
expect(entityTypeMap.get('Campaign')).toBe(1);
expect(entityTypeMap.get('Threat-Actor-Individual')).toBe(2);
expect(entityTypeMap.get('Organization')).toBe(6);
expect(entityTypeMap.get('Sector')).toBe(2);
expect(entityTypeMap.get('Course-Of-Action')).toBe(1);
expect(entityTypeMap.get('Administrative-Area')).toBe(1);
expect(entityTypeMap.get('Opinion')).toBe(1);
expect(entityTypeMap.get('Malware-Analysis')).toBe(1);
expect(entityTypeMap.get('Malware')).toBe(1);
expect(entityTypeMap.get('Threat-Actor-Group')).toBe(1);
expect(entityTypeMap.get('Individual')).toBe(1);
expect(entityTypeMap.get('Sector')).toBe(3);
expect(entityTypeMap.get('Organization')).toBe(TESTING_ORGS.length + 6);
expect(entityTypeMap.get('Incident')).toBe(1);
expect(entityTypeMap.get('Indicator')).toBe(2);
expect(entityTypeMap.get('Region')).toBe(2);
expect(data.edges.length).toEqual(20);
expect(data.pageInfo.endCursor).toBeDefined();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { expect, it, describe } from 'vitest';
import { describe, expect, it } from 'vitest';
import gql from 'graphql-tag';
import { queryAsAdmin } from '../../utils/testQuery';

Expand Down Expand Up @@ -149,14 +149,14 @@ describe('AttackPattern resolver standard behavior', () => {
expect(queryResult.data.attackPatternsMatrix).not.toBeNull();
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases).not.toBeNull();
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases.length).toEqual(2);
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[0].kill_chain_name).toEqual('mitre-pre-attack');
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[0].phase_name).toEqual('launch');
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[0].x_opencti_order).toEqual(0);
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[0].attackPatterns.length).toEqual(2);
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[1].kill_chain_name).toEqual('mitre-attack');
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[1].phase_name).toEqual('persistence');
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[1].x_opencti_order).toEqual(20);
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[1].attackPatterns.length).toEqual(1);
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[0].kill_chain_name).toEqual('mitre-attack');
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[0].phase_name).toEqual('persistence');
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[0].x_opencti_order).toEqual(20);
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[0].attackPatterns.length).toEqual(1);
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[1].kill_chain_name).toEqual('mitre-pre-attack');
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[1].phase_name).toEqual('launch');
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[1].x_opencti_order).toEqual(0);
expect(queryResult.data.attackPatternsMatrix.attackPatternsOfPhases[1].attackPatterns.length).toEqual(2);
});
it('should update attackPattern', async () => {
const UPDATE_QUERY = gql`
Expand Down