Skip to content

Commit 3cda12f

Browse files
[backend] notif buffering implementation (#8685)
1 parent c77c80b commit 3cda12f

File tree

6 files changed

+123
-10
lines changed

6 files changed

+123
-10
lines changed

opencti-platform/opencti-graphql/config/default.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,9 @@
302302
"publisher_manager": {
303303
"enabled": true,
304304
"lock_key": "publisher_manager_lock",
305-
"interval": 10000
305+
"interval": 10000,
306+
"enable_buffering": true,
307+
"buffering_seconds": 60
306308
},
307309
"sync_manager": {
308310
"enabled": true,

opencti-platform/opencti-graphql/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"build:types": "graphql-codegen --config graphql-codegen.yml",
99
"build:schema": "yarn build:types && node builder/schema/schema.js && node build/script-generate-schema.js",
1010
"build:prod": "yarn build:types && yarn check-ts && node builder/prod/prod.js",
11-
"build:dev": "node builder/dev/dev.js",
11+
"build:dev": "yarn build:schema && node builder/dev/dev.js",
1212
"build:antlr": "java -jar ./builder/antlr/antlr-4.13.0-complete.jar -Dlanguage=JavaScript ./src/stixpattern/STIXPattern.g4 -o ./src/stixpattern/",
1313
"migrate:add": "migrate create --template-file src/utils/migration-template.js --migrations-dir=./src/migrations",
1414
"clean:relations": "yarn build:prod && node build/script-clean-relations.js",

opencti-platform/opencti-graphql/src/manager/publisherManager.ts

Lines changed: 116 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ import { findById } from '../domain/user';
4343

4444
const DOC_URI = 'https://docs.opencti.io';
4545
const PUBLISHER_ENGINE_KEY = conf.get('publisher_manager:lock_key');
46+
const PUBLISHER_ENABLE_BUFFERING = conf.get('publisher_manager:enable_buffering');
47+
const PUBLISHER_BUFFERING_SECONDS = conf.get('publisher_manager:buffering_seconds');
4648
const STREAM_SCHEDULE_TIME = 10000;
4749

4850
export const internalProcessNotification = async (
@@ -52,11 +54,13 @@ export const internalProcessNotification = async (
5254
user: NotificationUser,
5355
notifier: BasicStoreEntityNotifier | NotifierTestInput,
5456
data: NotificationData[],
55-
notification: BasicStoreEntityTrigger,
57+
triggers: BasicStoreEntityTrigger[],
5658
// eslint-disable-next-line consistent-return
5759
): Promise<{ error: string } | void> => {
5860
try {
59-
const { name: notification_name, id: trigger_id, trigger_type } = notification;
61+
const notification_name = triggers.map((t) => t?.name).join(';');
62+
const trigger_type = triggers.length > 1 ? 'buffer' : triggers[0].trigger_type;
63+
const trigger_id = triggers.map((t) => t?.id).filter((t) => t);
6064
const { notifier_connector_id, notifier_configuration: configuration } = notifier;
6165
const generatedContent: Record<string, Array<NotificationContentEvent>> = {};
6266
for (let index = 0; index < data.length; index += 1) {
@@ -78,7 +82,7 @@ export const internalProcessNotification = async (
7882
// region data generation
7983
const background_color = (settings.platform_theme_dark_background ?? '#0a1929').substring(1);
8084
const platformOpts = { doc_uri: DOC_URI, platform_uri: getBaseUrl(), background_color };
81-
const templateData = { content, notification_content: content, notification, settings, user, data, ...platformOpts };
85+
const templateData = { content, notification_content: content, notification: triggers[0], settings, user, data, ...platformOpts };
8286
// endregion
8387
if (notifier_connector_id === NOTIFIER_CONNECTOR_UI) {
8488
const createNotification = {
@@ -171,7 +175,7 @@ const processNotificationEvent = async (
171175
const notifier = userNotifiers[notifierIndex];
172176

173177
// There is no await in purpose, the goal is to send notification and continue without waiting result.
174-
internalProcessNotification(context, settings, notificationMap, user, notifierMap.get(notifier) ?? {} as BasicStoreEntityNotifier, data, notification).catch((reason) => logApp.error('[OPENCTI-MODULE] Publisher manager unknown error.', { cause: reason }));
178+
internalProcessNotification(context, settings, notificationMap, user, notifierMap.get(notifier) ?? {} as BasicStoreEntityNotifier, data, [notification]).catch((reason) => logApp.error('[OPENCTI-MODULE] Publisher manager unknown error.', { cause: reason }));
175179
}
176180
};
177181

@@ -216,6 +220,95 @@ const processDigestNotificationEvent = async (context: AuthContext, notification
216220
await processNotificationEvent(context, notificationMap, event.notification_id, user, dataWithFullMessage);
217221
};
218222

223+
const liveNotificationBufferPerEntity: Record<string, { timestamp: number, events: SseEvent<KnowledgeNotificationEvent>[] }> = {};
224+
225+
const processBufferedEvents = async (
226+
context: AuthContext,
227+
triggerMap: Map<string, BasicStoreEntityTrigger>,
228+
events: KnowledgeNotificationEvent[]
229+
) => {
230+
const usersFromCache = await getEntitiesMapFromCache<AuthUser>(context, SYSTEM_USER, ENTITY_TYPE_USER);
231+
const notifDataPerUser: Record<string, { user: NotificationUser, data: NotificationData }[]> = {};
232+
// We process all events to transform them into notification data per user
233+
for (let i = 0; i < events.length; i += 1) {
234+
const event = events[i];
235+
const { targets, data: instance, origin } = event;
236+
// For each event, transform it into NotificationData for all targets
237+
for (let index = 0; index < targets.length; index += 1) {
238+
const { user, type, message } = targets[index];
239+
const notificationMessage = createFullNotificationMessage(message, usersFromCache, event.streamMessage, origin);
240+
const currentData = { notification_id: event.notification_id, instance, type, message: notificationMessage };
241+
const currentNotifDataForUser = notifDataPerUser[user.user_id];
242+
if (currentNotifDataForUser) {
243+
currentNotifDataForUser.push({ user, data: currentData });
244+
} else {
245+
notifDataPerUser[user.user_id] = [{ user, data: currentData }];
246+
}
247+
}
248+
}
249+
const settings = await getEntityFromCache<BasicStoreSettings>(context, SYSTEM_USER, ENTITY_TYPE_SETTINGS);
250+
const allNotifiers = await getEntitiesListFromCache<BasicStoreEntityNotifier>(context, SYSTEM_USER, ENTITY_TYPE_NOTIFIER);
251+
const allNotifiersMap = new Map(allNotifiers.map((n) => [n.internal_id, n]));
252+
253+
const notifUsers = Object.keys(notifDataPerUser);
254+
// Handle notification data for each user
255+
for (let i = 0; i < notifUsers.length; i += 1) {
256+
const currentUserId = notifUsers[i];
257+
const userNotificationData = notifDataPerUser[currentUserId];
258+
const userNotifiers = [...new Set(userNotificationData.map((d) => d.user.notifiers).flat())];
259+
260+
// For each notifier of the user, filter the relevant notification data, and send it
261+
for (let notifierIndex = 0; notifierIndex < userNotifiers.length; notifierIndex += 1) {
262+
const notifier = userNotifiers[notifierIndex];
263+
264+
// Only include the notificationData that has the current notifier included in its trigger config
265+
const impactedData = userNotificationData.filter((d) => d.user.notifiers.includes(notifier));
266+
if (impactedData.length > 0) {
267+
const currentUser = impactedData[0].user;
268+
const dataToSend = impactedData.map((d) => d.data);
269+
const triggersInDataToSend = [...new Set(dataToSend.map((d) => triggerMap.get(d.notification_id)).filter((t) => t))];
270+
// If triggers can't be found, no need to send the data
271+
if (triggersInDataToSend.length >= 1) {
272+
// There is no await in purpose, the goal is to send notification and continue without waiting result.
273+
internalProcessNotification(
274+
context,
275+
settings,
276+
triggerMap,
277+
currentUser,
278+
allNotifiersMap.get(notifier) ?? {} as BasicStoreEntityNotifier,
279+
dataToSend,
280+
triggersInDataToSend as BasicStoreEntityTrigger[]
281+
).catch((reason) => logApp.error('[OPENCTI-MODULE] Publisher manager unknown error.', { cause: reason }));
282+
}
283+
}
284+
}
285+
}
286+
};
287+
288+
const handleEntityNotificationBuffer = async (forceSend = false) => {
289+
const dateNow = Date.now();
290+
const context = executionContext('publisher_manager');
291+
const bufferKeys = Object.keys(liveNotificationBufferPerEntity);
292+
// Iterate on all buffers to check if they need to be sent
293+
for (let i = 0; i < bufferKeys.length; i += 1) {
294+
const key = bufferKeys[i];
295+
const value = liveNotificationBufferPerEntity[key];
296+
if (value) {
297+
const isBufferingTimeElapsed = (dateNow - value.timestamp) > PUBLISHER_BUFFERING_SECONDS * 1000;
298+
// If buffer is older than configured buffering time length OR we want to forceSend, it needs to be sent
299+
if (forceSend || isBufferingTimeElapsed) {
300+
const bufferEvents = value.events.map((e) => e.data);
301+
// We remove current buffer from buffers map before processing buffer events, otherwise some new events coming in might be lost
302+
// This way, if new events are coming in from the stream, they will initiate a new buffer that will be handled later
303+
delete liveNotificationBufferPerEntity[key];
304+
const allExistingTriggers = await getNotifications(context);
305+
const allExistingTriggersMap = new Map(allExistingTriggers.map((n) => [n.trigger.internal_id, n.trigger]));
306+
await processBufferedEvents(context, allExistingTriggersMap, bufferEvents);
307+
}
308+
}
309+
}
310+
};
311+
219312
const publisherStreamHandler = async (streamEvents: Array<SseEvent<StreamNotifEvent>>) => {
220313
try {
221314
if (streamEvents.length === 0) {
@@ -229,7 +322,19 @@ const publisherStreamHandler = async (streamEvents: Array<SseEvent<StreamNotifEv
229322
const { data: { notification_id, type } } = streamEvent;
230323
if (type === 'live' || type === 'action') {
231324
const liveEvent = streamEvent as SseEvent<KnowledgeNotificationEvent>;
232-
await processLiveNotificationEvent(context, notificationMap, liveEvent.data);
325+
// If buffering is enabled, we store the event in local buffer instead of handling it directly
326+
if (PUBLISHER_ENABLE_BUFFERING) {
327+
const liveEventEntityId = liveEvent.data.data.id;
328+
const currentEntityBuffer = liveNotificationBufferPerEntity[liveEventEntityId];
329+
// If there are buffered events already, simply add current event to array of buffered events
330+
if (currentEntityBuffer) {
331+
currentEntityBuffer.events.push(liveEvent);
332+
} else { // If there are currently no buffered events for this entity, initialize them using current time as timestamp
333+
liveNotificationBufferPerEntity[liveEventEntityId] = { timestamp: Date.now(), events: [liveEvent] };
334+
}
335+
} else { // If no buffering is enabled, we handle the notification directly
336+
await processLiveNotificationEvent(context, notificationMap, liveEvent.data);
337+
}
233338
}
234339
if (type === 'digest') {
235340
const digestEvent = streamEvent as SseEvent<DigestEvent>;
@@ -269,6 +374,9 @@ const initPublisherManager = () => {
269374
await streamProcessor.start('live');
270375
while (!shutdown && streamProcessor.running()) {
271376
lock.signal.throwIfAborted();
377+
if (PUBLISHER_ENABLE_BUFFERING) {
378+
await handleEntityNotificationBuffer();
379+
}
272380
await wait(WAIT_TIME_ACTION);
273381
}
274382
logApp.info('[OPENCTI-MODULE] End of publisher manager processing');
@@ -280,6 +388,9 @@ const initPublisherManager = () => {
280388
}
281389
} finally {
282390
if (streamProcessor) await streamProcessor.shutdown();
391+
if (PUBLISHER_ENABLE_BUFFERING) {
392+
await handleEntityNotificationBuffer(true);
393+
}
283394
if (lock) await lock.unlock();
284395
}
285396
};

opencti-platform/opencti-graphql/src/modules/notification/notification-types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ export interface NotificationAddInput {
7373
title: string,
7474
events: Array<NotificationContentEvent>
7575
}>
76-
trigger_id?: string
76+
trigger_id?: string | string[]
7777
user_id: string
7878
}
7979

opencti-platform/opencti-graphql/src/modules/notification/notification.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ const NOTIFICATION_DEFINITION: ModuleDefinition<StoreEntityNotification, StixNot
7373
},
7474
attributes: [
7575
{ name: 'name', label: 'Trigger name', type: 'string', format: 'short', mandatoryType: 'internal', editDefault: false, multiple: false, upsert: false, isFilterable: true },
76-
{ name: 'trigger_id', label: 'Trigger', type: 'string', format: 'id', entityTypes: [ENTITY_TYPE_TRIGGER], mandatoryType: 'no', editDefault: false, multiple: false, upsert: false, isFilterable: true },
76+
{ name: 'trigger_id', label: 'Trigger', type: 'string', format: 'id', entityTypes: [ENTITY_TYPE_TRIGGER], mandatoryType: 'no', editDefault: false, multiple: true, upsert: false, isFilterable: true },
7777
{ name: 'notification_type', label: 'Notification type', type: 'string', format: 'enum', values: TRIGGER_TYPE_VALUES, mandatoryType: 'internal', editDefault: false, multiple: false, upsert: false, isFilterable: true },
7878
{
7979
name: 'notification_content',

opencti-platform/opencti-graphql/src/modules/notifier/notifier-domain.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,6 @@ export const testNotifier = async (context: AuthContext, user: AuthUser, notifie
189189
user_id: user.id,
190190
user_email: user.user_email,
191191
notifiers: [],
192-
}, notifier, MOCK_NOTIFICATIONS[notifier.notifier_test_id], { created: (new Date()).toISOString() } as unknown as BasicStoreEntityTrigger);
192+
}, notifier, MOCK_NOTIFICATIONS[notifier.notifier_test_id], [{ created: (new Date()).toISOString() }] as unknown as BasicStoreEntityTrigger[]);
193193
return result?.error;
194194
};

0 commit comments

Comments
 (0)