Skip to content

Commit 2dcb1b9

Browse files
authored
Fix possibly failing to reserve slots due to stale issued number (#1870)
1 parent 76cebd8 commit 2dcb1b9

File tree

2 files changed

+35
-5
lines changed

2 files changed

+35
-5
lines changed

internal/tuning.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ type SlotReservationInfo interface {
7878
WorkerBuildId() string
7979
// WorkerBuildId returns the build ID of the worker that is reserving the slot.
8080
WorkerIdentity() string
81-
// NumIssuedSlots returns the number of slots that have already been issued by the supplier.
81+
// NumIssuedSlots returns the current number of slots that have already been issued by the
82+
// supplier. This value may change over the course of the reservation.
8283
NumIssuedSlots() int
8384
// Logger returns an appropriately tagged logger.
8485
Logger() log.Logger
@@ -328,7 +329,7 @@ type slotReserveInfoImpl struct {
328329
taskQueue string
329330
workerBuildId string
330331
workerIdentity string
331-
issuedSlots int
332+
issuedSlots *atomic.Int32
332333
logger log.Logger
333334
metrics metrics.Handler
334335
}
@@ -346,7 +347,7 @@ func (s slotReserveInfoImpl) WorkerIdentity() string {
346347
}
347348

348349
func (s slotReserveInfoImpl) NumIssuedSlots() int {
349-
return s.issuedSlots
350+
return int(s.issuedSlots.Load())
350351
}
351352

352353
func (s slotReserveInfoImpl) Logger() log.Logger {
@@ -442,7 +443,7 @@ func (t *trackingSlotSupplier) ReserveSlot(
442443
taskQueue: data.taskQueue,
443444
workerBuildId: t.workerBuildId,
444445
workerIdentity: t.workerIdentity,
445-
issuedSlots: int(t.issuedSlotsAtomic.Load()),
446+
issuedSlots: &t.issuedSlotsAtomic,
446447
logger: t.logger,
447448
metrics: t.metrics,
448449
})
@@ -465,7 +466,7 @@ func (t *trackingSlotSupplier) TryReserveSlot(data *slotReservationData) *SlotPe
465466
taskQueue: data.taskQueue,
466467
workerBuildId: t.workerBuildId,
467468
workerIdentity: t.workerIdentity,
468-
issuedSlots: int(t.issuedSlotsAtomic.Load()),
469+
issuedSlots: &t.issuedSlotsAtomic,
469470
logger: t.logger,
470471
metrics: t.metrics,
471472
})

test/worker_tuner_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,35 @@ func (ts *WorkerTunerTestSuite) TestCompositeWorkerTuner() {
100100
ts.runTheWorkflow(tuner, ctx)
101101
}
102102

103+
func (ts *WorkerTunerTestSuite) TestResourceBasedSmallSlots() {
104+
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
105+
defer cancel()
106+
107+
wfSS, err := worker.NewFixedSizeSlotSupplier(10)
108+
ts.NoError(err)
109+
controllerOpts := resourcetuner.DefaultResourceControllerOptions()
110+
controllerOpts.MemTargetPercent = 0.8
111+
controllerOpts.CpuTargetPercent = 0.9
112+
controller := resourcetuner.NewResourceController(controllerOpts)
113+
actSS, err := resourcetuner.NewResourceBasedSlotSupplier(controller,
114+
resourcetuner.ResourceBasedSlotSupplierOptions{
115+
MinSlots: 1,
116+
MaxSlots: 4,
117+
RampThrottle: 0,
118+
})
119+
ts.NoError(err)
120+
laCss, err := worker.NewFixedSizeSlotSupplier(5)
121+
ts.NoError(err)
122+
tuner, err := worker.NewCompositeTuner(worker.CompositeTunerOptions{
123+
WorkflowSlotSupplier: wfSS, ActivitySlotSupplier: actSS, LocalActivitySlotSupplier: laCss})
124+
ts.NoError(err)
125+
126+
// The bug this is verifying was triggered by a race, so run this a bunch to verify it's not hit
127+
for i := 0; i < 10; i++ {
128+
ts.runTheWorkflow(tuner, ctx)
129+
}
130+
}
131+
103132
func (ts *WorkerTunerTestSuite) runTheWorkflow(tuner worker.WorkerTuner, ctx context.Context) {
104133
workerOptions := worker.Options{Tuner: tuner}
105134
myWorker := worker.New(ts.client, ts.taskQueueName, workerOptions)

0 commit comments

Comments
 (0)