Skip to content

Commit 5596b90

Browse files
committed
use a buffered channel to signal new items in the queue
Signed-off-by: Chetan Banavikalmutt <[email protected]>
1 parent 9fd365e commit 5596b90

File tree

1 file changed

+19
-34
lines changed

1 file changed

+19
-34
lines changed

internal/queue/queue.go

+19-34
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,16 @@ type queuepair struct {
4949
type boundedQueue struct {
5050
workqueue.TypedRateLimitingInterface[*event.Event]
5151
maxSize int
52-
notify *sync.Cond
52+
53+
notify chan struct{}
5354
}
5455

5556
func newBoundedQueue(maxSize int) *boundedQueue {
5657
rateLimiter := workqueue.DefaultTypedControllerRateLimiter[*event.Event]()
5758
return &boundedQueue{
5859
TypedRateLimitingInterface: workqueue.NewTypedRateLimitingQueue(rateLimiter),
5960
maxSize: maxSize,
60-
notify: sync.NewCond(&sync.Mutex{}),
61+
notify: make(chan struct{}, 10),
6162
}
6263
}
6364

@@ -71,9 +72,12 @@ func (bq *boundedQueue) Add(item *event.Event) {
7172
bq.TypedRateLimitingInterface.Add(item)
7273

7374
// Notify any waiting goroutines that an item has been added to the queue.
74-
bq.notify.L.Lock()
75-
defer bq.notify.L.Unlock()
76-
bq.notify.Broadcast()
75+
select {
76+
case bq.notify <- struct{}{}:
77+
default:
78+
// We don't want to block the caller if the notify channel is full.
79+
return
80+
}
7781
}
7882

7983
type SendRecvQueues struct {
@@ -177,44 +181,25 @@ func (q *SendRecvQueues) Delete(name string, shutdown bool) error {
177181
return nil
178182
}
179183

180-
// GetWithContext is a wrapper around the workqueue's Get method, and it
181-
// uses a condition variable to wait for an item to be available. It also
182-
// handles the case where the context is cancelled while waiting for an
183-
// item to be available.
184+
// GetWithContext is a wrapper around the workqueue's Get method.
185+
// It waits until an item is available in the queue or the context is Done
184186
func GetWithContext(q workqueue.TypedRateLimitingInterface[*event.Event], ctx context.Context) (*event.Event, bool) {
185187
bq, ok := q.(*boundedQueue)
186188
if !ok {
187189
return nil, false
188190
}
189191

190-
done := make(chan struct{})
191-
bq.notify.L.Lock()
192-
defer func() {
193-
bq.notify.L.Unlock()
194-
close(done)
195-
}()
192+
for {
193+
if bq.Len() > 0 {
194+
return bq.Get()
195+
}
196196

197-
// Goroutine to signal on context cancellation
198-
go func() {
197+
// Suspend until an item is available or context is cancelled
199198
select {
200199
case <-ctx.Done():
201-
bq.notify.L.Lock()
202-
bq.notify.Signal()
203-
bq.notify.L.Unlock()
204-
case <-done:
205-
// If function exits early, prevent leak
200+
return nil, false
201+
case <-bq.notify:
202+
// Wake up and re-check if an item is available
206203
}
207-
}()
208-
209-
// Suspend until an item is available or context is cancelled
210-
for q.Len() == 0 && ctx.Err() == nil {
211-
bq.notify.Wait()
212204
}
213-
214-
if ctx.Err() != nil {
215-
return nil, false
216-
}
217-
218-
ev, shutdown := bq.Get()
219-
return ev, shutdown
220205
}

0 commit comments

Comments
 (0)