From f054a28fabbd550e7be5b9b14172384799a0a71d Mon Sep 17 00:00:00 2001 From: Jordan Porter Date: Wed, 28 Aug 2024 16:07:25 -0600 Subject: [PATCH 1/3] fix duplicative payloads on unload --- src/common/constants/agent-constants.js | 2 + src/features/logging/aggregate/index.js | 58 +++++------ src/features/utils/event-buffer.js | 116 +++++++++++++++++++++ tests/assets/logs-redirect.html | 22 ++++ tests/components/logging/aggregate.test.js | 27 ++--- tests/specs/logging/harvesting.e2e.js | 14 +++ 6 files changed, 197 insertions(+), 42 deletions(-) create mode 100644 src/common/constants/agent-constants.js create mode 100644 src/features/utils/event-buffer.js create mode 100644 tests/assets/logs-redirect.html diff --git a/src/common/constants/agent-constants.js b/src/common/constants/agent-constants.js new file mode 100644 index 000000000..8e2c48edc --- /dev/null +++ b/src/common/constants/agent-constants.js @@ -0,0 +1,2 @@ +export const IDEAL_PAYLOAD_SIZE = 64000 +export const MAX_PAYLOAD_SIZE = 1000000 diff --git a/src/features/logging/aggregate/index.js b/src/features/logging/aggregate/index.js index 67585dacb..0a2213e82 100644 --- a/src/features/logging/aggregate/index.js +++ b/src/features/logging/aggregate/index.js @@ -6,10 +6,12 @@ import { warn } from '../../../common/util/console' import { stringify } from '../../../common/util/stringify' import { SUPPORTABILITY_METRIC_CHANNEL } from '../../metrics/constants' import { AggregateBase } from '../../utils/aggregate-base' -import { FEATURE_NAME, LOGGING_EVENT_EMITTER_CHANNEL, LOG_LEVELS, MAX_PAYLOAD_SIZE } from '../constants' +import { FEATURE_NAME, LOGGING_EVENT_EMITTER_CHANNEL, LOG_LEVELS } from '../constants' import { Log } from '../shared/log' import { isValidLogLevel } from '../shared/utils' import { applyFnToProps } from '../../../common/util/traverse' +import { MAX_PAYLOAD_SIZE } from '../../../common/constants/agent-constants' +import { EventBuffer } from '../../utils/event-buffer' export class Aggregate extends AggregateBase { static featureName = FEATURE_NAME @@ -19,11 +21,7 @@ export class Aggregate extends AggregateBase { super(agentIdentifier, aggregator, FEATURE_NAME) /** held logs before sending */ - this.bufferedLogs = [] - /** held logs during sending, for retries */ - this.outgoingLogs = [] - /** the estimated bytes of log data waiting to be sent -- triggers a harvest if adding a new log will exceed limit */ - this.estimatedBytes = 0 + this.bufferedLogs = new EventBuffer() this.#agentRuntime = getRuntime(this.agentIdentifier) this.#agentInfo = getInfo(this.agentIdentifier) @@ -37,11 +35,11 @@ export class Aggregate extends AggregateBase { getPayload: this.prepareHarvest.bind(this), raw: true }, this) - /** harvest immediately once started to purge pre-load logs collected */ - this.scheduler.startTimer(this.harvestTimeSeconds, 0) /** emitted by instrument class (wrapped loggers) or the api methods directly */ registerHandler(LOGGING_EVENT_EMITTER_CHANNEL, this.handleLog.bind(this), this.featureName, this.ee) this.drain() + /** harvest immediately once started to purge pre-load logs collected */ + this.scheduler.startTimer(this.harvestTimeSeconds, 0) }) } @@ -68,10 +66,6 @@ export class Aggregate extends AggregateBase { return } if (typeof message !== 'string' || !message) return warn(32) - if (message.length > MAX_PAYLOAD_SIZE) { - handle(SUPPORTABILITY_METRIC_CHANNEL, ['Logging/Harvest/Failed/Seen', message.length]) - return warn(31, message.slice(0, 25) + '...') - } const log = new Log( Math.floor(this.#agentRuntime.timeKeeper.correctAbsoluteTimestamp( @@ -82,26 +76,26 @@ export class Aggregate extends AggregateBase { level ) const logBytes = log.message.length + stringify(log.attributes).length + log.level.length + 10 // timestamp == 10 chars - if (logBytes > MAX_PAYLOAD_SIZE) { - handle(SUPPORTABILITY_METRIC_CHANNEL, ['Logging/Harvest/Failed/Seen', logBytes]) - return warn(31, log.message.slice(0, 25) + '...') - } - if (this.estimatedBytes + logBytes >= MAX_PAYLOAD_SIZE) { - handle(SUPPORTABILITY_METRIC_CHANNEL, ['Logging/Harvest/Early/Seen', this.estimatedBytes + logBytes]) - this.scheduler.runHarvest({}) + if (!this.bufferedLogs.canMerge(logBytes)) { + if (this.bufferedLogs.hasData) { + handle(SUPPORTABILITY_METRIC_CHANNEL, ['Logging/Harvest/Early/Seen', this.bufferedLogs.bytes + logBytes]) + this.scheduler.runHarvest({}) + if (logBytes < MAX_PAYLOAD_SIZE) this.bufferedLogs.add(log) + } else { + handle(SUPPORTABILITY_METRIC_CHANNEL, ['Logging/Harvest/Failed/Seen', logBytes]) + warn(31, log.message.slice(0, 25) + '...') + } + return } - this.estimatedBytes += logBytes - this.bufferedLogs.push(log) + + this.bufferedLogs.add(log) } - prepareHarvest () { - if (this.blocked || !(this.bufferedLogs.length || this.outgoingLogs.length)) return - /** populate outgoing array while also clearing main buffer */ - this.outgoingLogs.push(...this.bufferedLogs.splice(0)) - this.estimatedBytes = 0 + prepareHarvest (options = {}) { + if (this.blocked || !this.bufferedLogs.hasData) return /** see https://source.datanerd.us/agents/rum-specs/blob/main/browser/Log for logging spec */ - return { + const payload = { qs: { browser_monitoring_key: this.#agentInfo.licenseKey }, @@ -121,14 +115,20 @@ export class Aggregate extends AggregateBase { }, /** logs section contains individual unique log entries */ logs: applyFnToProps( - this.outgoingLogs, + this.bufferedLogs.buffer, this.obfuscator.obfuscateString.bind(this.obfuscator), 'string' ) }] } + + if (options.retry) this.bufferedLogs.hold() + else this.bufferedLogs.clear() + + return payload } onHarvestFinished (result) { - if (!result.retry) this.outgoingLogs = [] + if (result.retry) this.bufferedLogs.unhold() + else this.bufferedLogs.held.clear() } } diff --git a/src/features/utils/event-buffer.js b/src/features/utils/event-buffer.js new file mode 100644 index 000000000..9cc31da22 --- /dev/null +++ b/src/features/utils/event-buffer.js @@ -0,0 +1,116 @@ +import { stringify } from '../../common/util/stringify' +import { MAX_PAYLOAD_SIZE } from '../../common/constants/agent-constants' + +/** + * A container that keeps an event buffer and size with helper methods + * @typedef {Object} EventBuffer + * @property {number} size + * @property {*[]} buffer + */ + +/** + * A container that holds, evaluates, and merges event objects for harvesting + */ +export class EventBuffer { + /** @type {Object[]} */ + #buffer = [] + /** @type {number} */ + #bytes = 0 + /** @type {EventBuffer} */ + #held + + /** + * + * @param {number=} maxPayloadSize + */ + constructor (maxPayloadSize = MAX_PAYLOAD_SIZE) { + this.maxPayloadSize = maxPayloadSize + } + + /** + * buffer is read only, use the helper methods to add or clear buffer data + */ + get buffer () { + return this.#buffer + } + + /** + * bytes is read only, use the helper methods to add or clear buffer data + */ + get bytes () { + return this.#bytes + } + + /** + * held is another event buffer + */ + get held () { + this.#held ??= new EventBuffer(this.maxPayloadSize) + return this.#held + } + + /** + * Returns a boolean indicating whether the current size and buffer contain valid data + * @returns {boolean} + */ + get hasData () { + return this.buffer.length > 0 && this.bytes > 0 + } + + /** + * Adds an event object to the buffer while tallying size + * @param {Object} event the event object to add to the buffer + * @returns {EventBuffer} returns the event buffer for chaining + */ + add (event) { + const size = stringify(event).length + if (!this.canMerge(size)) return this + this.#buffer.push(event) + this.#bytes += size + return this + } + + /** + * clear the buffer data + * @returns {EventBuffer} + */ + clear () { + this.#bytes = 0 + this.#buffer = [] + return this + } + + hold () { + this.held.merge(this) + this.clear() + return this + } + + unhold () { + this.merge(this.held, true) + this.held.clear() + return this + } + + /** + * Merges an EventBuffer into this EventBuffer + * @param {EventBuffer} events an EventBuffer intended to merge with this EventBuffer + * @param {boolean} prepend if true, the supplied events will be prepended before the events of this class + * @returns {EventBuffer} returns the event buffer for chaining + */ + merge (eventBuffer, prepend = false) { + if (!this.canMerge(eventBuffer.bytes)) return + this.#buffer = prepend ? [...eventBuffer.buffer, ...this.#buffer] : [...this.#buffer, ...eventBuffer.buffer] + this.#bytes += eventBuffer.#bytes + return this + } + + /** + * Returns a boolean indicating the resulting size of a merge would be valid. Compares against the maxPayloadSize provided at initialization time. + * @param {number} size + * @returns {boolean} + */ + canMerge (size) { + return this.bytes + (size || Infinity) < this.maxPayloadSize + } +} diff --git a/tests/assets/logs-redirect.html b/tests/assets/logs-redirect.html new file mode 100644 index 000000000..cab999537 --- /dev/null +++ b/tests/assets/logs-redirect.html @@ -0,0 +1,22 @@ + + + + + RUM Unit Test + {init} {config} {loader} + + + Logs captured immediately followed by a redirect + + + + diff --git a/tests/components/logging/aggregate.test.js b/tests/components/logging/aggregate.test.js index 29b75490c..189613468 100644 --- a/tests/components/logging/aggregate.test.js +++ b/tests/components/logging/aggregate.test.js @@ -40,10 +40,9 @@ describe('logging aggregate component tests', () => { 'ee', 'featureName', 'blocked', + 'obfuscator', 'bufferedLogs', - 'outgoingLogs', - 'harvestTimeSeconds', - 'estimatedBytes' + 'harvestTimeSeconds' ])) }) @@ -72,7 +71,7 @@ describe('logging aggregate component tests', () => { { myAttributes: 1 }, 'error' ) - expect(loggingAgg.bufferedLogs[0]).toEqual(expectedLog) + expect(loggingAgg.bufferedLogs.buffer[0]).toEqual(expectedLog) expect(loggingAgg.prepareHarvest()).toEqual({ qs: { browser_monitoring_key: 1234 }, @@ -99,7 +98,7 @@ describe('logging aggregate component tests', () => { loggingAgg.ee.emit('rumresp', {}) await wait(1) loggingAgg.ee.emit(LOGGING_EVENT_EMITTER_CHANNEL, [1234, 'test message', { myAttributes: 1 }, 'error']) - expect(loggingAgg.bufferedLogs[0]).toEqual(new Log( + expect(loggingAgg.bufferedLogs.buffer[0]).toEqual(new Log( Math.floor(timeKeeper.correctAbsoluteTimestamp( timeKeeper.convertRelativeTimestamp(1234) )), @@ -154,17 +153,18 @@ describe('logging aggregate component tests', () => { loggingAgg.ee.emit('rumresp', {}) await wait(1) + const logs = loggingAgg.bufferedLogs.buffer loggingAgg.ee.emit(LOGGING_EVENT_EMITTER_CHANNEL, [1234, 'test message', [], 'ERROR']) - expect(loggingAgg.bufferedLogs.pop()).toEqual(expected) + expect(logs.pop()).toEqual(expected) loggingAgg.ee.emit(LOGGING_EVENT_EMITTER_CHANNEL, [1234, 'test message', true, 'ERROR']) - expect(loggingAgg.bufferedLogs.pop()).toEqual(expected) + expect(logs.pop()).toEqual(expected) loggingAgg.ee.emit(LOGGING_EVENT_EMITTER_CHANNEL, [1234, 'test message', 1, 'ERROR']) - expect(loggingAgg.bufferedLogs.pop()).toEqual(expected) + expect(logs.pop()).toEqual(expected) loggingAgg.ee.emit(LOGGING_EVENT_EMITTER_CHANNEL, [1234, 'test message', 'string', 'ERROR']) - expect(loggingAgg.bufferedLogs.pop()).toEqual(expected) + expect(logs.pop()).toEqual(expected) }) it('should work if log level is valid but wrong case', async () => { @@ -181,7 +181,7 @@ describe('logging aggregate component tests', () => { await wait(1) loggingAgg.ee.emit(LOGGING_EVENT_EMITTER_CHANNEL, [1234, 'test message', {}, 'ErRoR']) - expect(loggingAgg.bufferedLogs.pop()).toEqual(expected) + expect(loggingAgg.bufferedLogs.buffer[0]).toEqual(expected) }) it('should buffer logs with non-stringify-able message', async () => { @@ -189,14 +189,15 @@ describe('logging aggregate component tests', () => { loggingAgg.ee.emit('rumresp', {}) await wait(1) + const logs = loggingAgg.bufferedLogs.buffer loggingAgg.ee.emit(LOGGING_EVENT_EMITTER_CHANNEL, [1234, new Error('test'), {}, 'error']) - expect(loggingAgg.bufferedLogs.pop().message).toEqual('Error: test') + expect(logs.pop().message).toEqual('Error: test') loggingAgg.ee.emit(LOGGING_EVENT_EMITTER_CHANNEL, [1234, new SyntaxError('test'), {}, 'error']) - expect(loggingAgg.bufferedLogs.pop().message).toEqual('SyntaxError: test') + expect(logs.pop().message).toEqual('SyntaxError: test') loggingAgg.ee.emit(LOGGING_EVENT_EMITTER_CHANNEL, [1234, Symbol('test'), {}, 'error']) - expect(loggingAgg.bufferedLogs.pop().message).toEqual('Symbol(test)') + expect(logs.pop().message).toEqual('Symbol(test)') }) }) diff --git a/tests/specs/logging/harvesting.e2e.js b/tests/specs/logging/harvesting.e2e.js index acfd8dde4..c9428c286 100644 --- a/tests/specs/logging/harvesting.e2e.js +++ b/tests/specs/logging/harvesting.e2e.js @@ -77,6 +77,20 @@ describe('logging harvesting', () => { expect(JSON.parse(body)[0].logs[0].message).toEqual('Error: test') }) + it('should not double harvest on navigation logs', async () => { + const [logsRequests] = await Promise.all([ + logsCapture.waitForResult({ timeout: 15000 }), + browser.url(await browser.testHandle.assetURL('logs-redirect.html')) + ]) + + // 1 harvest + expect(logsRequests.length).toEqual(1) + const parsedBody = JSON.parse(logsRequests[0].request.body) + // 1 log in the 1 harvest + expect(parsedBody[0].logs.length).toEqual(1) + expect(parsedBody[0].logs[0].message).toEqual('redirect to https://gmail.com') + }) + it('should allow for re-wrapping and 3rd party wrapping', async () => { const [[{ request: { body } }]] = await Promise.all([ logsCapture.waitForResult({ totalCount: 1 }), From 8d291d31b17983ee748380f5765e519ec7eb0e84 Mon Sep 17 00:00:00 2001 From: Jordan Porter Date: Wed, 28 Aug 2024 17:07:10 -0600 Subject: [PATCH 2/3] comments and clean up --- src/features/utils/event-buffer.js | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/features/utils/event-buffer.js b/src/features/utils/event-buffer.js index 9cc31da22..9d98121ba 100644 --- a/src/features/utils/event-buffer.js +++ b/src/features/utils/event-buffer.js @@ -80,12 +80,22 @@ export class EventBuffer { return this } + /** + * Hold the buffer data in a new (child) EventBuffer (.held) to unblock the main buffer. + * This action clears the main buffer + * @returns {EventBuffer} + */ hold () { this.held.merge(this) this.clear() return this } + /** + * Prepend the held EventBuffer (.held) back into the main buffer + * This action clears the held buffer + * @returns {EventBuffer} + */ unhold () { this.merge(this.held, true) this.held.clear() @@ -99,7 +109,7 @@ export class EventBuffer { * @returns {EventBuffer} returns the event buffer for chaining */ merge (eventBuffer, prepend = false) { - if (!this.canMerge(eventBuffer.bytes)) return + if (!this.canMerge(eventBuffer.bytes)) return this this.#buffer = prepend ? [...eventBuffer.buffer, ...this.#buffer] : [...this.#buffer, ...eventBuffer.buffer] this.#bytes += eventBuffer.#bytes return this From ca325b2b9a56a90743cd7abccae13abe239129cb Mon Sep 17 00:00:00 2001 From: Jordan Porter Date: Wed, 28 Aug 2024 17:08:58 -0600 Subject: [PATCH 3/3] add unit test --- .../unit/features/utils/event-buffer.test.js | 173 ++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 tests/unit/features/utils/event-buffer.test.js diff --git a/tests/unit/features/utils/event-buffer.test.js b/tests/unit/features/utils/event-buffer.test.js new file mode 100644 index 000000000..66e18bb84 --- /dev/null +++ b/tests/unit/features/utils/event-buffer.test.js @@ -0,0 +1,173 @@ +import { EventBuffer } from '../../../../src/features/utils/event-buffer' + +let eventBuffer +describe('EventBuffer', () => { + beforeEach(() => { + eventBuffer = new EventBuffer() + }) + it('has default values', () => { + expect(eventBuffer).toMatchObject({ + bytes: 0, + buffer: [] + }) + }) + + describe('add', () => { + it('should add data to the buffer while maintaining size', () => { + expect(eventBuffer).toMatchObject({ + bytes: 0, + buffer: [] + }) + + const mockEvent = { test: 1 } + + eventBuffer.add(mockEvent) + expect(eventBuffer).toMatchObject({ + bytes: JSON.stringify(mockEvent).length, + buffer: [mockEvent] + }) + }) + + it('should not add if one event is too large', () => { + eventBuffer.add({ test: 'x'.repeat(1000000) }) + expect(eventBuffer.buffer).toEqual([]) + }) + + it('should not add if existing buffer would become too large', () => { + eventBuffer.add({ test: 'x'.repeat(999988) }) + expect(eventBuffer.bytes).toEqual(999999) + expect(eventBuffer.buffer.length).toEqual(1) + eventBuffer.add({ test2: 'testing' }) + expect(eventBuffer.bytes).toEqual(999999) + expect(eventBuffer.buffer.length).toEqual(1) + }) + + it('should be chainable', () => { + const mockEvent1 = { test: 1 } + const mockEvent2 = { test: 2 } + eventBuffer.add(mockEvent1).add(mockEvent2) + expect(eventBuffer).toMatchObject({ + bytes: JSON.stringify(mockEvent1).length + JSON.stringify(mockEvent2).length, + buffer: [mockEvent1, mockEvent2] + }) + }) + }) + + describe('merge', () => { + it('should merge two EventBuffers - append', () => { + const mockEvent1 = { test: 1 } + const mockEvent2 = { test: 2 } + eventBuffer.add(mockEvent1) + + const secondBuffer = new EventBuffer() + secondBuffer.add(mockEvent2) + eventBuffer.merge(secondBuffer) + expect(eventBuffer).toMatchObject({ + bytes: JSON.stringify({ test: 1 }).length + JSON.stringify({ test: 2 }).length, + buffer: [mockEvent1, mockEvent2] + }) + }) + + it('should merge two EventBuffers - prepend', () => { + const mockEvent1 = { test: 1 } + const mockEvent2 = { test: 2 } + eventBuffer.add(mockEvent1) + + const secondBuffer = new EventBuffer() + secondBuffer.add(mockEvent2) + eventBuffer.merge(secondBuffer, true) + expect(eventBuffer).toMatchObject({ + bytes: JSON.stringify({ test: 1 }).length + JSON.stringify({ test: 2 }).length, + buffer: [mockEvent2, mockEvent1] + }) + }) + + it('should not merge if not an EventBuffer', () => { + eventBuffer.add({ test: 1 }) + // not EventBuffer + eventBuffer.merge({ regular: 'object' }) + expect(eventBuffer.buffer).toEqual([{ test: 1 }]) + // not EventBuffer + eventBuffer.merge('string') + expect(eventBuffer.buffer).toEqual([{ test: 1 }]) + // not EventBuffer + eventBuffer.merge(123) + expect(eventBuffer.buffer).toEqual([{ test: 1 }]) + // not EventBuffer + eventBuffer.merge(true) + expect(eventBuffer.buffer).toEqual([{ test: 1 }]) + // not EventBuffer + eventBuffer.merge(Symbol('test')) + expect(eventBuffer.buffer).toEqual([{ test: 1 }]) + }) + + it('should not merge if too big', () => { + const mockEvent1 = { test: 'x'.repeat(999988) } + const mockEvent2 = { test2: 'testing' } + eventBuffer.add(mockEvent1) + + const secondBuffer = new EventBuffer() + secondBuffer.add(mockEvent2) + + eventBuffer.merge(secondBuffer) + expect(eventBuffer.buffer.length).toEqual(1) + expect(eventBuffer.bytes).toEqual(999999) + }) + + it('should be chainable', () => { + const mockEvent1 = { test1: 1 } + const mockEvent2 = { test2: 2 } + const mockEvent3 = { test3: 3 } + + const secondBuffer = new EventBuffer() + const thirdBuffer = new EventBuffer() + + eventBuffer.add(mockEvent1) + secondBuffer.add(mockEvent2) + thirdBuffer.add(mockEvent3) + + eventBuffer.merge(secondBuffer).merge(thirdBuffer) + expect(eventBuffer).toMatchObject({ + bytes: JSON.stringify(mockEvent1).length + JSON.stringify(mockEvent2).length + JSON.stringify(mockEvent3).length, + buffer: [mockEvent1, mockEvent2, mockEvent3] + }) + }) + }) + + describe('hasData', () => { + it('should return false if no events', () => { + jest.spyOn(eventBuffer, 'bytes', 'get').mockReturnValue(100) + expect(eventBuffer.hasData).toEqual(false) + }) + it('should return false if no bytes', () => { + jest.spyOn(eventBuffer, 'buffer', 'get').mockReturnValue({ test: 1 }) + expect(eventBuffer.hasData).toEqual(false) + }) + it('should return true if has a valid event and size', () => { + eventBuffer.add({ test: 1 }) + expect(eventBuffer.hasData).toEqual(true) + }) + }) + + describe('canMerge', () => { + it('should return false if would be too big', () => { + jest.spyOn(eventBuffer, 'bytes', 'get').mockReturnValue(999999) + expect(eventBuffer.canMerge(1)).toEqual(false) + }) + it('should return false if no size provided', () => { + eventBuffer.add({ test: 1 }) + expect(eventBuffer.canMerge()).toEqual(false) + }) + it('should return false if size is not a number', () => { + eventBuffer.add({ test: 1 }) + expect(eventBuffer.canMerge('test')).toEqual(false) + expect(eventBuffer.canMerge(false)).toEqual(false) + expect(eventBuffer.canMerge(['test'])).toEqual(false) + expect(eventBuffer.canMerge({ test: 1 })).toEqual(false) + }) + it('should return true if has a valid event and size', () => { + eventBuffer.add({ test: 1 }) + expect(eventBuffer.canMerge(20)).toEqual(true) + }) + }) +})