@@ -2,6 +2,7 @@ package messaging
2
2
3
3
import (
4
4
"fmt"
5
+ "sync"
5
6
"sync/atomic"
6
7
"time"
7
8
@@ -92,100 +93,29 @@ func (b *Broker) runWorkqueueProcessor(stopCh <-chan struct{}) {
92
93
93
94
// runProxyUpdateDispatcher runs the dispatcher responsible for batching
94
95
// proxy update events received in close proximity.
95
- // It batches proxy update events with the use of 2 timers:
96
- // 1. Sliding window timer that resets when a proxy update event is received
97
- // 2. Max window timer that caps the max duration a sliding window can be reset to
98
- // When either of the above timers expire, the proxy update event is published
99
- // on the dedicated pub-sub instance.
96
+ // It batches proxy update events with the use of a mutex, and TryLock.
100
97
func (b * Broker ) runProxyUpdateDispatcher (stopCh <- chan struct {}) {
101
- // batchTimer and maxTimer are updated by the dispatcher routine
102
- // when events are processed and timeouts expire. They are initialized
103
- // with a large timeout (a decade) so they don't time out till an event
104
- // is received.
105
- noTimeout := 87600 * time .Hour // A decade
106
- slidingTimer := time .NewTimer (noTimeout )
107
- maxTimer := time .NewTimer (noTimeout )
108
-
109
- // dispatchPending indicates whether a proxy update event is pending
110
- // from being published on the pub-sub. A proxy update event will
111
- // be held for 'proxyUpdateSlidingWindow' duration to be able to
112
- // coalesce multiple proxy update events within that duration, before
113
- // it is dispatched on the pub-sub. The 'proxyUpdateSlidingWindow' duration
114
- // is a sliding window, which means each event received within a window
115
- // slides the window further ahead in time, up to a max of 'proxyUpdateMaxWindow'.
116
- //
117
- // This mechanism is necessary to avoid triggering proxy update pub-sub events in
118
- // a hot loop, which would otherwise result in CPU spikes on the controller.
119
- // We want to coalesce as many proxy update events within the 'proxyUpdateMaxWindow'
120
- // duration.
121
- dispatchPending := false
98
+ var coalesce sync.Mutex
122
99
batchCount := 0 // number of proxy update events batched per dispatch
123
-
124
- var event proxyUpdateEvent
125
100
for {
126
101
select {
127
- case e , ok := <- b .proxyUpdateCh :
102
+ case event , ok := <- b .proxyUpdateCh :
128
103
if ! ok {
129
104
log .Warn ().Msgf ("Proxy update event chan closed, exiting dispatcher" )
130
105
return
131
106
}
132
- event = e
133
-
134
- if ! dispatchPending {
135
- // No proxy update events are pending send on the pub-sub.
136
- // Reset the dispatch timers. The events will be dispatched
137
- // when either of the timers expire.
138
- if ! slidingTimer .Stop () {
139
- <- slidingTimer .C
140
- }
141
- slidingTimer .Reset (proxyUpdateSlidingWindow )
142
- if ! maxTimer .Stop () {
143
- <- maxTimer .C
144
- }
145
- maxTimer .Reset (proxyUpdateMaxWindow )
146
- dispatchPending = true
147
- batchCount ++
148
- log .Trace ().Msgf ("Pending dispatch of msg kind %s" , event .msg .Kind )
149
- } else {
150
- // A proxy update event is pending dispatch. Update the sliding window.
151
- if ! slidingTimer .Stop () {
152
- <- slidingTimer .C
153
- }
154
- slidingTimer .Reset (proxyUpdateSlidingWindow )
155
- batchCount ++
156
- log .Trace ().Msgf ("Reset sliding window for msg kind %s" , event .msg .Kind )
107
+ batchCount ++
108
+ if coalesce .TryLock () {
109
+ log .Trace ().Msgf ("Coalescing proxy update event %s" , event .msg .Kind )
110
+ continue
157
111
}
112
+ log .Trace ().Msgf ("Batching, msg kind %s, batch size %d" , event .msg .Kind , batchCount )
158
113
159
- case <- slidingTimer .C :
160
- slidingTimer .Reset (noTimeout ) // 'slidingTimer' drained in this case statement
161
- // Stop and drain 'maxTimer' before Reset()
162
- if ! maxTimer .Stop () {
163
- // Drain channel. Refer to Reset() doc for more info.
164
- <- maxTimer .C
165
- }
166
- maxTimer .Reset (noTimeout )
167
- b .proxyUpdatePubSub .Pub (event .msg , event .topic )
168
114
atomic .AddUint64 (& b .totalDispatchedProxyEventCount , 1 )
169
115
metricsstore .DefaultMetricsStore .ProxyBroadcastEventCount .Inc ()
170
- log .Trace ().Msgf ("Sliding window expired, msg kind %s, batch size %d" , event .msg .Kind , batchCount )
171
- dispatchPending = false
172
- batchCount = 0
173
-
174
- case <- maxTimer .C :
175
- maxTimer .Reset (noTimeout ) // 'maxTimer' drained in this case statement
176
- // Stop and drain 'slidingTimer' before Reset()
177
- if ! slidingTimer .Stop () {
178
- // Drain channel. Refer to Reset() doc for more info.
179
- <- slidingTimer .C
180
- }
181
- slidingTimer .Reset (noTimeout )
182
116
b .proxyUpdatePubSub .Pub (event .msg , event .topic )
183
- atomic .AddUint64 (& b .totalDispatchedProxyEventCount , 1 )
184
- metricsstore .DefaultMetricsStore .ProxyBroadcastEventCount .Inc ()
185
- log .Trace ().Msgf ("Max window expired, msg kind %s, batch size %d" , event .msg .Kind , batchCount )
186
- dispatchPending = false
187
117
batchCount = 0
188
-
118
+ coalesce . Unlock ()
189
119
case <- stopCh :
190
120
log .Info ().Msg ("Proxy update dispatcher received stop signal, exiting" )
191
121
return
0 commit comments