Skip to content

Commit 2c6bc1d

Browse files
authored
Introduce separate slot supplier for session activities (#1736)
1 parent 158b823 commit 2c6bc1d

File tree

4 files changed

+64
-22
lines changed

4 files changed

+64
-22
lines changed

contrib/resourcetuner/resourcetuner.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,14 @@ func NewResourceBasedTuner(opts ResourceBasedTunerOptions) (worker.WorkerTuner,
7777
}
7878
nexusSS := &ResourceBasedSlotSupplier{controller: controller,
7979
options: defaultWorkflowResourceBasedSlotSupplierOptions()}
80+
sessSS := &ResourceBasedSlotSupplier{controller: controller,
81+
options: defaultActivityResourceBasedSlotSupplierOptions()}
8082
compositeTuner, err := worker.NewCompositeTuner(worker.CompositeTunerOptions{
81-
WorkflowSlotSupplier: wfSS,
82-
ActivitySlotSupplier: actSS,
83-
LocalActivitySlotSupplier: laSS,
84-
NexusSlotSupplier: nexusSS,
83+
WorkflowSlotSupplier: wfSS,
84+
ActivitySlotSupplier: actSS,
85+
LocalActivitySlotSupplier: laSS,
86+
NexusSlotSupplier: nexusSS,
87+
SessionActivitySlotSupplier: sessSS,
8588
})
8689
if err != nil {
8790
return nil, err

internal/internal_worker.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ func (ww *workflowWorker) Stop() {
399399
ww.worker.Stop()
400400
}
401401

402-
func newSessionWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, overrides *workerOverrides, env *registry, maxConcurrentSessionExecutionSize int) *sessionWorker {
402+
func newSessionWorker(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, env *registry, maxConcurrentSessionExecutionSize int) *sessionWorker {
403403
if params.Identity == "" {
404404
params.Identity = getWorkerIdentity(params.TaskQueue)
405405
}
@@ -412,15 +412,14 @@ func newSessionWorker(service workflowservice.WorkflowServiceClient, params work
412412
creationTaskqueue := getCreationTaskqueue(params.TaskQueue)
413413
params.UserContext = context.WithValue(params.UserContext, sessionEnvironmentContextKey, sessionEnvironment)
414414
params.TaskQueue = sessionEnvironment.GetResourceSpecificTaskqueue()
415-
activityWorker := newActivityWorker(service, params, overrides, env, nil)
415+
activityWorker := newActivityWorker(service, params,
416+
&workerOverrides{slotSupplier: params.Tuner.GetSessionActivitySlotSupplier()}, env, nil)
416417

417418
params.MaxConcurrentActivityTaskQueuePollers = 1
418419
params.TaskQueue = creationTaskqueue
419-
if overrides == nil {
420-
overrides = &workerOverrides{}
421-
}
422420
// Although we have session token bucket to limit session size across creation
423421
// and recreation, we also limit it here for creation only
422+
overrides := &workerOverrides{}
424423
overrides.slotSupplier, _ = NewFixedSizeSlotSupplier(maxConcurrentSessionExecutionSize)
425424
creationWorker := newActivityWorker(service, params, overrides, env, sessionEnvironment.GetTokenBucket())
426425

@@ -1758,7 +1757,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
17581757

17591758
var sessionWorker *sessionWorker
17601759
if options.EnableSessionWorker && !options.LocalActivityWorkerOnly {
1761-
sessionWorker = newSessionWorker(client.workflowService, workerParams, nil, registry, options.MaxConcurrentSessionExecutionSize)
1760+
sessionWorker = newSessionWorker(client.workflowService, workerParams, registry, options.MaxConcurrentSessionExecutionSize)
17621761
registry.RegisterActivityWithOptions(sessionCreationActivity, RegisterActivityOptions{
17631762
Name: sessionCreationActivityName,
17641763
})

internal/tuning.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ type WorkerTuner interface {
4646
GetLocalActivitySlotSupplier() SlotSupplier
4747
// GetNexusSlotSupplier returns the SlotSupplier used for nexus tasks.
4848
GetNexusSlotSupplier() SlotSupplier
49+
// GetSessionActivitySlotSupplier returns the SlotSupplier used for activities within sessions.
50+
GetSessionActivitySlotSupplier() SlotSupplier
4951
}
5052

5153
// SlotPermit is a permit to use a slot.
@@ -150,10 +152,11 @@ type SlotSupplier interface {
150152
//
151153
// WARNING: Custom implementations of SlotSupplier are currently experimental.
152154
type CompositeTuner struct {
153-
workflowSlotSupplier SlotSupplier
154-
activitySlotSupplier SlotSupplier
155-
localActivitySlotSupplier SlotSupplier
156-
nexusSlotSupplier SlotSupplier
155+
workflowSlotSupplier SlotSupplier
156+
activitySlotSupplier SlotSupplier
157+
localActivitySlotSupplier SlotSupplier
158+
nexusSlotSupplier SlotSupplier
159+
sessionActivitySlotSupplier SlotSupplier
157160
}
158161

159162
func (c *CompositeTuner) GetWorkflowTaskSlotSupplier() SlotSupplier {
@@ -168,6 +171,9 @@ func (c *CompositeTuner) GetLocalActivitySlotSupplier() SlotSupplier {
168171
func (c *CompositeTuner) GetNexusSlotSupplier() SlotSupplier {
169172
return c.nexusSlotSupplier
170173
}
174+
func (c *CompositeTuner) GetSessionActivitySlotSupplier() SlotSupplier {
175+
return c.sessionActivitySlotSupplier
176+
}
171177

172178
// CompositeTunerOptions are the options used by NewCompositeTuner.
173179
type CompositeTunerOptions struct {
@@ -179,17 +185,20 @@ type CompositeTunerOptions struct {
179185
LocalActivitySlotSupplier SlotSupplier
180186
// NexusSlotSupplier is the SlotSupplier used for nexus tasks.
181187
NexusSlotSupplier SlotSupplier
188+
// SessionActivitySlotSupplier is the SlotSupplier used for activities within sessions.
189+
SessionActivitySlotSupplier SlotSupplier
182190
}
183191

184192
// NewCompositeTuner creates a WorkerTuner that uses a combination of slot suppliers.
185193
//
186194
// WARNING: Custom implementations of SlotSupplier are currently experimental.
187195
func NewCompositeTuner(options CompositeTunerOptions) (WorkerTuner, error) {
188196
return &CompositeTuner{
189-
workflowSlotSupplier: options.WorkflowSlotSupplier,
190-
activitySlotSupplier: options.ActivitySlotSupplier,
191-
localActivitySlotSupplier: options.LocalActivitySlotSupplier,
192-
nexusSlotSupplier: options.NexusSlotSupplier,
197+
workflowSlotSupplier: options.WorkflowSlotSupplier,
198+
activitySlotSupplier: options.ActivitySlotSupplier,
199+
localActivitySlotSupplier: options.LocalActivitySlotSupplier,
200+
nexusSlotSupplier: options.NexusSlotSupplier,
201+
sessionActivitySlotSupplier: options.SessionActivitySlotSupplier,
193202
}, nil
194203
}
195204

@@ -235,11 +244,16 @@ func NewFixedSizeTuner(options FixedSizeTunerOptions) (WorkerTuner, error) {
235244
if err != nil {
236245
return nil, err
237246
}
247+
sessSS, err := NewFixedSizeSlotSupplier(options.NumActivitySlots)
248+
if err != nil {
249+
return nil, err
250+
}
238251
return &CompositeTuner{
239-
workflowSlotSupplier: wfSS,
240-
activitySlotSupplier: actSS,
241-
localActivitySlotSupplier: laSS,
242-
nexusSlotSupplier: nexusSS,
252+
workflowSlotSupplier: wfSS,
253+
activitySlotSupplier: actSS,
254+
localActivitySlotSupplier: laSS,
255+
nexusSlotSupplier: nexusSS,
256+
sessionActivitySlotSupplier: sessSS,
243257
}, nil
244258
}
245259

test/integration_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,11 @@ func (ts *IntegrationTestSuite) SetupTest() {
274274
ts.NoError(err)
275275
options.Tuner = tuner
276276
}
277+
if strings.Contains(ts.T().Name(), "SlotSuppliersWithSession") {
278+
options.MaxConcurrentActivityExecutionSize = 1
279+
// Apparently this is on by default in these tests anyway, but to be explicit
280+
options.EnableSessionWorker = true
281+
}
277282

278283
ts.worker = worker.New(ts.client, ts.taskQueueName, options)
279284
ts.workerStopped = false
@@ -3269,6 +3274,27 @@ func (ts *IntegrationTestSuite) TestResourceBasedSlotSupplierManyActs() {
32693274
ts.assertMetricGaugeEventually(metrics.WorkerTaskSlotsUsed, wfWorkertags, 0)
32703275
}
32713276

3277+
func (ts *IntegrationTestSuite) TestSlotSuppliersWithSessionAndOneConcurrentMax() {
3278+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
3279+
defer cancel()
3280+
3281+
// Activities time out without the fix, since obtaining a slot takes too long
3282+
wfRuns := make([]client.WorkflowRun, 0)
3283+
for i := 0; i < 3; i++ {
3284+
opts := ts.startWorkflowOptions("slot-suppliers-with-session" + strconv.Itoa(i))
3285+
opts.WorkflowExecutionTimeout = 1 * time.Minute
3286+
run, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.Echo, "hi")
3287+
ts.NoError(err)
3288+
ts.NotNil(run)
3289+
ts.NoError(err)
3290+
wfRuns = append(wfRuns, run)
3291+
}
3292+
3293+
for _, run := range wfRuns {
3294+
ts.NoError(run.Get(ctx, nil))
3295+
}
3296+
}
3297+
32723298
func (ts *IntegrationTestSuite) TestTooFewParams() {
32733299
var res ParamsValue
32743300
// Only give first param

0 commit comments

Comments
 (0)