Skip to content

Commit d24c23e

Browse files
authored
Merge pull request projectcalico#9936 from Brian-McM/bm-fix-race-condition-with-reading-backlog
Read all commands off backlog channel before executing backlog
2 parents 946f4db + 2b9e6ff commit d24c23e

File tree

3 files changed

+37
-5
lines changed

3 files changed

+37
-5
lines changed

guardian/Makefile

+11
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,14 @@ ci: static-checks ut
102102
## Run all CD steps, normally pushing images out to registries.
103103
cd: image-all cd-common
104104

105+
release-build: .release-$(VERSION).created
106+
.release-$(VERSION).created:
107+
$(MAKE) clean image-all RELEASE=true
108+
$(MAKE) retag-build-images-with-registries RELEASE=true IMAGETAG=$(VERSION)
109+
$(MAKE) retag-build-images-with-registries RELEASE=true IMAGETAG=latest
110+
touch $@
111+
112+
release-publish: release-prereqs .release-$(VERSION).published
113+
.release-$(VERSION).published:
114+
$(MAKE) push-images-to-registries push-manifests IMAGETAG=$(VERSION) RELEASE=$(RELEASE) CONFIRM=$(CONFIRM)
115+
touch $@

guardian/pkg/asyncutil/async.go

+21-3
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (executor *commandExecutor[C, R]) loop(shutdownCtx context.Context) {
147147
close(executor.backlogChan)
148148

149149
// Add all outstanding commands in the backlog or cmd channels to the backlog slice.
150-
executor.backlog = append(executor.backlog, ReadAll(executor.backlogChan)...)
150+
executor.drainBacklogChannel()
151151
executor.backlog = append(executor.backlog, ReadAll(executor.cmdChan)...)
152152

153153
logrus.Debug("Returning errors for outstanding requests due to shutdown.")
@@ -171,11 +171,14 @@ func (executor *commandExecutor[C, R]) loop(shutdownCtx context.Context) {
171171
case cmd := <-executor.cmdChan:
172172
logrus.Debugf("Received command.")
173173
if !executor.backLogCommands {
174+
logrus.Debugf("Executing command immediately..")
174175
executor.executeCommand(ctx, cmd)
175176
} else {
177+
logrus.Debugf("Backlog commands set, adding command to backlog (current backlog size: %d).", len(executor.backlog))
176178
executor.backlog = append(executor.backlog, cmd)
177179
}
178180
case cmd := <-executor.backlogChan:
181+
logrus.Debugf("Received backlog command (current backlog size: %d).", len(executor.backlog))
179182
executor.backlog = append(executor.backlog, cmd)
180183
case signal := <-executor.drainAndBacklogSig:
181184
logrus.Debugf("Received requeue signal.")
@@ -186,25 +189,30 @@ func (executor *commandExecutor[C, R]) loop(shutdownCtx context.Context) {
186189
defer signal.Send()
187190
defer close(draining)
188191

189-
logrus.Debugf("Waiting for inflight commands to finish...")
192+
logrus.Debug("Waiting for inflight commands to finish...")
190193
executor.inflightCmds.Wait()
191194
executor.errBuff.Clear()
192-
logrus.Debugf("Inflight commands finished, notifying listeners.")
195+
logrus.Debug("Inflight commands finished, notifying listeners.")
193196
}()
194197
case <-executor.resumeBackloggedSig.Receive():
195198
logrus.Debugf("Received resume signal.")
196199
if draining != nil {
200+
logrus.Debug("Waiting for drain to finish...")
197201
// If the draining channel is not nil and hasn't been closed then we're still draining. We need to
198202
// delay resuming so the backlog can be written too.
199203
if _, read := ReadNoWait(draining); !read {
204+
logrus.Debug("delay resume signal not set, setting it.")
200205
delayResume = draining
201206
continue
202207
}
208+
logrus.Debug("delay resume signal already set.")
203209
}
204210

205211
// Handle the backlog before resuming execution.
206212
ctx, stopCommands = executor.execBacklog(shutdownCtx)
207213
case <-delayResume:
214+
logrus.Debug("Delay resume signal received, handling backlog.")
215+
208216
// Handle the backlog before resuming execution.
209217
ctx, stopCommands = executor.execBacklog(shutdownCtx)
210218

@@ -214,7 +222,17 @@ func (executor *commandExecutor[C, R]) loop(shutdownCtx context.Context) {
214222
}
215223
}
216224

225+
func (executor *commandExecutor[C, R]) drainBacklogChannel() {
226+
logrus.Debugf("Backlog size: %d, adding to backlog.", len(executor.backlog))
227+
executor.backlog = append(executor.backlog, ReadAll(executor.backlogChan)...)
228+
}
229+
217230
func (executor *commandExecutor[C, R]) execBacklog(shutdownCtx context.Context) (context.Context, func()) {
231+
// Just in case there's anything left on the backlog channel ensure it's drained off and added to the backlog slice.
232+
if len(executor.backlog) > 0 {
233+
executor.drainBacklogChannel()
234+
}
235+
218236
ctx, stopCommands := context.WithCancel(shutdownCtx)
219237
for _, cmd := range executor.backlog {
220238
executor.executeCommand(ctx, cmd)

guardian/pkg/asyncutil/async_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ func TestRequestHandlerStopAndRequeue(t *testing.T) {
7777

7878
wg.Wait()
7979

80+
// We don't wait on the signal returned because we want to test the unhappy path where the user didn't wait on the
81+
// signal.
8082
cmdExec.DrainAndBacklog()
8183
pause = false
8284
cmdExec.Resume()
@@ -87,6 +89,7 @@ func TestRequestHandlerStopAndRequeue(t *testing.T) {
8789
Expect(err).Should(BeNil())
8890

8991
cancel()
90-
cmdExec.ShutdownSignaler().Receive()
91-
92+
logrus.Debug("Waiting for shutdown...")
93+
<-cmdExec.ShutdownSignaler().Receive()
94+
logrus.Debug("Finished waiting for shutdown.")
9295
}

0 commit comments

Comments
 (0)