Skip to content

Commit a8b3b3d

Browse files
chengshashacheng
andauthored
fix: add locking around broker throttle timer to prevent race condition (#2826)
Fixes #2823 Signed-off-by: shacheng <[email protected]> Co-authored-by: shacheng <[email protected]>
1 parent fd84c2b commit a8b3b3d

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

broker.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ type Broker struct {
5959
kerberosAuthenticator GSSAPIKerberosAuth
6060
clientSessionReauthenticationTimeMs int64
6161

62-
throttleTimer *time.Timer
62+
throttleTimer *time.Timer
63+
throttleTimerLock sync.Mutex
6364
}
6465

6566
// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
@@ -1697,6 +1698,8 @@ func (b *Broker) handleThrottledResponse(resp protocolBody) {
16971698
}
16981699

16991700
func (b *Broker) setThrottle(throttleTime time.Duration) {
1701+
b.throttleTimerLock.Lock()
1702+
defer b.throttleTimerLock.Unlock()
17001703
if b.throttleTimer != nil {
17011704
// if there is an existing timer stop/clear it
17021705
if !b.throttleTimer.Stop() {
@@ -1707,6 +1710,8 @@ func (b *Broker) setThrottle(throttleTime time.Duration) {
17071710
}
17081711

17091712
func (b *Broker) waitIfThrottled() {
1713+
b.throttleTimerLock.Lock()
1714+
defer b.throttleTimerLock.Unlock()
17101715
if b.throttleTimer != nil {
17111716
DebugLogger.Printf("broker/%d waiting for throttle timer\n", b.ID())
17121717
<-b.throttleTimer.C

0 commit comments

Comments
 (0)