Skip to content

Commit 86d59bd

Browse files
committed
feat: delay lease acquisition by N poll requests (#23)
This is a more simplistic approach to https://github.com/ankorstore/gh-action-mq-lease-service/pull/21 Signed-off-by: Matthias Riegler <[email protected]>
1 parent 7ab4e05 commit 86d59bd

File tree

7 files changed

+62
-15
lines changed

7 files changed

+62
-15
lines changed

internal/config/load.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,5 @@ func load(path string, config interface{}) error {
2828
return err
2929
}
3030

31-
if err := yaml.Unmarshal([]byte(templated), config); err != nil {
32-
return err
33-
}
34-
35-
return nil
31+
return yaml.Unmarshal([]byte(templated), config)
3632
}

internal/config/server/latest/server_schema.go

+2
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@ type GithubRepositoryConfig struct {
1313
StabilizeDuration int `yaml:"stabilize_duration_seconds"`
1414
TTL int `yaml:"ttl_seconds"`
1515
ExpectedRequestCount int `yaml:"expected_request_count"`
16+
// DelayLEaseASsignmentBy is the number of times a lease can be delayed before it is assigned.
17+
DelayLeaseAssignmentBy int `yaml:"delay_lease_assignment_by"`
1618
}

internal/lease/leaseprovider.go

+25-7
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type ProviderOpts struct {
2929
StabilizeDuration time.Duration
3030
TTL time.Duration
3131
ExpectedRequestCount int
32+
DelayAssignmentCount int
3233
ID string
3334
Clock clock.PassiveClock
3435
Storage storage.Storage[*ProviderState]
@@ -46,11 +47,12 @@ const (
4647
)
4748

4849
type Request struct {
49-
HeadSHA string `json:"head_sha"`
50-
HeadRef string `json:"head_ref"`
51-
Priority int `json:"priority"`
52-
Status *string `json:"status,omitempty"`
53-
lastSeenAt *time.Time
50+
HeadSHA string `json:"head_sha"`
51+
HeadRef string `json:"head_ref"`
52+
Priority int `json:"priority"`
53+
Status *string `json:"status,omitempty"`
54+
lastSeenAt *time.Time
55+
acquireCountdown *int
5456
}
5557

5658
type StackedPullRequest struct {
@@ -75,6 +77,7 @@ func (lr *Request) MarshalZerologObject(e *zerolog.Event) {
7577
e.Str("lease_request_head_sha", lr.HeadSHA).
7678
Str("lease_request_head_ref", lr.HeadRef).
7779
Int("lease_request_priority", lr.Priority).
80+
Int("lease_request_acquire_countdown", pointer.IntDeref(lr.acquireCountdown, 0)).
7881
Str("lease_request_status", status)
7982
}
8083

@@ -457,12 +460,27 @@ func (lp *leaseProviderImpl) evaluateRequest(ctx context.Context, req *Request)
457460

458461
// Got the max priority, now check if we are the winner
459462
if req.Priority == maxPriority {
460-
req.Status = pointer.String(StatusAcquired)
461-
lp.state.acquired = req
463+
464+
// In order to prevent race conditions, there's the option to delay the lock acquisition
465+
// This is useful when the lock is acquired by a CI job that is canceled or restarted. There can be a short delay.
466+
req.acquireCountdown = pointer.Int(pointer.IntDeref(req.acquireCountdown, lp.opts.DelayAssignmentCount+1) - 1)
467+
if *req.acquireCountdown > 0 {
468+
log.Ctx(ctx).
469+
Debug().
470+
EmbedObject(req).
471+
Msg("Delaying lock acquisition")
472+
return req
473+
}
474+
462475
log.Ctx(ctx).
463476
Debug().
464477
EmbedObject(req).
465478
Msg("Current lease request has the higher priority. It then acquires the lock")
479+
480+
// Acquire lease
481+
req.Status = pointer.String(StatusAcquired)
482+
lp.state.acquired = req
483+
466484
log.Ctx(ctx).
467485
Info().
468486
EmbedObject(req).

internal/lease/leaseprovider_test.go

+31-1
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ type clearTestFakeStorage struct{ state *ProviderState }
453453
func (s *clearTestFakeStorage) Init() error { return nil }
454454
func (s *clearTestFakeStorage) Close() error { return nil }
455455
func (s *clearTestFakeStorage) Hydrate(context.Context, *ProviderState) error { return nil }
456-
func (s *clearTestFakeStorage) Save(ctx context.Context, obj *ProviderState) error {
456+
func (s *clearTestFakeStorage) Save(_ context.Context, obj *ProviderState) error {
457457
s.state = obj
458458
return nil
459459
}
@@ -743,3 +743,33 @@ func Test_leaseProviderImpl__FullLoop_ReleaseFromInvalidHeadSHA(t *testing.T) {
743743
})
744744
assert.Error(t, err)
745745
}
746+
747+
func Test_leaseProviderImpl__FullLoop_DelayedAcquisition(t *testing.T) {
748+
lp := NewLeaseProvider(ProviderOpts{TTL: 1 * time.Hour, StabilizeDuration: time.Minute, ExpectedRequestCount: 2, DelayAssignmentCount: 2})
749+
750+
req1 := &Request{
751+
HeadSHA: "sha1",
752+
Priority: 1,
753+
}
754+
req2 := &Request{
755+
HeadSHA: "sha2",
756+
Priority: 2,
757+
}
758+
759+
// Inject the two requests
760+
req1, err := lp.Acquire(context.Background(), req1)
761+
assert.NoError(t, err)
762+
assert.Equal(t, StatusPending, *req1.Status)
763+
764+
// this should be delayed and only acquire after the 3rd request
765+
req2, err = lp.Acquire(context.Background(), req2)
766+
assert.NoError(t, err)
767+
assert.Equal(t, StatusPending, *req2.Status)
768+
req2, err = lp.Acquire(context.Background(), req2)
769+
assert.NoError(t, err)
770+
assert.Equal(t, StatusPending, *req2.Status)
771+
// Now it should acquire the lease
772+
req2, err = lp.Acquire(context.Background(), req2)
773+
assert.NoError(t, err)
774+
assert.Equal(t, StatusAcquired, *req2.Status)
775+
}

internal/lease/leaseproviderorchestrator.go

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func NewProviderOrchestrator(opts NewProviderOrchestratorOpts) ProviderOrchestra
5454
StabilizeDuration: time.Second * time.Duration(repository.StabilizeDuration),
5555
TTL: time.Second * time.Duration(repository.TTL),
5656
ExpectedRequestCount: repository.ExpectedRequestCount,
57+
DelayAssignmentCount: repository.DelayLeaseAssignmentBy,
5758
ID: key,
5859
Clock: opts.Clock,
5960
Storage: opts.Storage,

internal/storage/storage.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (s *storageImpl[T]) Hydrate(ctx context.Context, defaultObj T) error {
133133

134134
// Save store the provided object in the storage
135135
// the provided object should at least be able to return a non-null and unique Identifier (via the GetIdentifier() method)
136-
func (s *storageImpl[T]) Save(ctx context.Context, obj T) error {
136+
func (s *storageImpl[T]) Save(_ context.Context, obj T) error {
137137
var err error
138138
id := obj.GetIdentifier()
139139
b, err := obj.Marshal()

pkg/util/logger/logger.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type AppInfo interface {
1818
// locationHook adds the log location
1919
type locationHook struct{}
2020

21-
func (h locationHook) Run(e *zerolog.Event, l zerolog.Level, msg string) {
21+
func (h locationHook) Run(e *zerolog.Event, _ zerolog.Level, _ string) {
2222
_, file, line, ok := runtime.Caller(3)
2323
if ok {
2424
e.Str("location", fmt.Sprintf("%s:%d", file, line))

0 commit comments

Comments
 (0)