@@ -8,10 +8,12 @@ import { warn } from '../../../common/util/console'
8
8
import { stringify } from '../../../common/util/stringify'
9
9
import { SUPPORTABILITY_METRIC_CHANNEL } from '../../metrics/constants'
10
10
import { AggregateBase } from '../../utils/aggregate-base'
11
- import { FEATURE_NAME , LOGGING_EVENT_EMITTER_CHANNEL , LOG_LEVELS , MAX_PAYLOAD_SIZE } from '../constants'
11
+ import { FEATURE_NAME , LOGGING_EVENT_EMITTER_CHANNEL , LOG_LEVELS } from '../constants'
12
12
import { Log } from '../shared/log'
13
13
import { isValidLogLevel } from '../shared/utils'
14
14
import { applyFnToProps } from '../../../common/util/traverse'
15
+ import { MAX_PAYLOAD_SIZE } from '../../../common/constants/agent-constants'
16
+ import { EventBuffer } from '../../utils/event-buffer'
15
17
16
18
export class Aggregate extends AggregateBase {
17
19
static featureName = FEATURE_NAME
@@ -21,11 +23,7 @@ export class Aggregate extends AggregateBase {
21
23
super ( agentIdentifier , aggregator , FEATURE_NAME )
22
24
23
25
/** held logs before sending */
24
- this . bufferedLogs = [ ]
25
- /** held logs during sending, for retries */
26
- this . outgoingLogs = [ ]
27
- /** the estimated bytes of log data waiting to be sent -- triggers a harvest if adding a new log will exceed limit */
28
- this . estimatedBytes = 0
26
+ this . bufferedLogs = new EventBuffer ( )
29
27
30
28
this . #agentRuntime = getRuntime ( this . agentIdentifier )
31
29
this . #agentInfo = getInfo ( this . agentIdentifier )
@@ -39,11 +37,11 @@ export class Aggregate extends AggregateBase {
39
37
getPayload : this . prepareHarvest . bind ( this ) ,
40
38
raw : true
41
39
} , this )
42
- /** harvest immediately once started to purge pre-load logs collected */
43
- this . scheduler . startTimer ( this . harvestTimeSeconds , 0 )
44
40
/** emitted by instrument class (wrapped loggers) or the api methods directly */
45
41
registerHandler ( LOGGING_EVENT_EMITTER_CHANNEL , this . handleLog . bind ( this ) , this . featureName , this . ee )
46
42
this . drain ( )
43
+ /** harvest immediately once started to purge pre-load logs collected */
44
+ this . scheduler . startTimer ( this . harvestTimeSeconds , 0 )
47
45
} )
48
46
}
49
47
@@ -70,10 +68,6 @@ export class Aggregate extends AggregateBase {
70
68
return
71
69
}
72
70
if ( typeof message !== 'string' || ! message ) return warn ( 32 )
73
- if ( message . length > MAX_PAYLOAD_SIZE ) {
74
- handle ( SUPPORTABILITY_METRIC_CHANNEL , [ 'Logging/Harvest/Failed/Seen' , message . length ] )
75
- return warn ( 31 , message . slice ( 0 , 25 ) + '...' )
76
- }
77
71
78
72
const log = new Log (
79
73
Math . floor ( this . #agentRuntime. timeKeeper . correctAbsoluteTimestamp (
@@ -84,26 +78,26 @@ export class Aggregate extends AggregateBase {
84
78
level
85
79
)
86
80
const logBytes = log . message . length + stringify ( log . attributes ) . length + log . level . length + 10 // timestamp == 10 chars
87
- if ( logBytes > MAX_PAYLOAD_SIZE ) {
88
- handle ( SUPPORTABILITY_METRIC_CHANNEL , [ 'Logging/Harvest/Failed/Seen' , logBytes ] )
89
- return warn ( 31 , log . message . slice ( 0 , 25 ) + '...' )
90
- }
91
81
92
- if ( this . estimatedBytes + logBytes >= MAX_PAYLOAD_SIZE ) {
93
- handle ( SUPPORTABILITY_METRIC_CHANNEL , [ 'Logging/Harvest/Early/Seen' , this . estimatedBytes + logBytes ] )
94
- this . scheduler . runHarvest ( { } )
82
+ if ( ! this . bufferedLogs . canMerge ( logBytes ) ) {
83
+ if ( this . bufferedLogs . hasData ) {
84
+ handle ( SUPPORTABILITY_METRIC_CHANNEL , [ 'Logging/Harvest/Early/Seen' , this . bufferedLogs . bytes + logBytes ] )
85
+ this . scheduler . runHarvest ( { } )
86
+ if ( logBytes < MAX_PAYLOAD_SIZE ) this . bufferedLogs . add ( log )
87
+ } else {
88
+ handle ( SUPPORTABILITY_METRIC_CHANNEL , [ 'Logging/Harvest/Failed/Seen' , logBytes ] )
89
+ warn ( 31 , log . message . slice ( 0 , 25 ) + '...' )
90
+ }
91
+ return
95
92
}
96
- this . estimatedBytes += logBytes
97
- this . bufferedLogs . push ( log )
93
+
94
+ this . bufferedLogs . add ( log )
98
95
}
99
96
100
- prepareHarvest ( ) {
101
- if ( this . blocked || ! ( this . bufferedLogs . length || this . outgoingLogs . length ) ) return
102
- /** populate outgoing array while also clearing main buffer */
103
- this . outgoingLogs . push ( ...this . bufferedLogs . splice ( 0 ) )
104
- this . estimatedBytes = 0
97
+ prepareHarvest ( options = { } ) {
98
+ if ( this . blocked || ! this . bufferedLogs . hasData ) return
105
99
/** see https://source.datanerd.us/agents/rum-specs/blob/main/browser/Log for logging spec */
106
- return {
100
+ const payload = {
107
101
qs : {
108
102
browser_monitoring_key : this . #agentInfo. licenseKey
109
103
} ,
@@ -123,14 +117,20 @@ export class Aggregate extends AggregateBase {
123
117
} ,
124
118
/** logs section contains individual unique log entries */
125
119
logs : applyFnToProps (
126
- this . outgoingLogs ,
120
+ this . bufferedLogs . buffer ,
127
121
this . obfuscator . obfuscateString . bind ( this . obfuscator ) , 'string'
128
122
)
129
123
} ]
130
124
}
125
+
126
+ if ( options . retry ) this . bufferedLogs . hold ( )
127
+ else this . bufferedLogs . clear ( )
128
+
129
+ return payload
131
130
}
132
131
133
132
onHarvestFinished ( result ) {
134
- if ( ! result . retry ) this . outgoingLogs = [ ]
133
+ if ( result . retry ) this . bufferedLogs . unhold ( )
134
+ else this . bufferedLogs . held . clear ( )
135
135
}
136
136
}
0 commit comments