Skip to content

Commit f6bb9cb

Browse files
Swopxvzf
authored andcommitted
feat: provider clear endpoint
1 parent f21c656 commit f6bb9cb

File tree

5 files changed

+230
-71
lines changed

5 files changed

+230
-71
lines changed

e2e/api_test.go

+140-66
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package e2e_test
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"io"
@@ -187,6 +188,70 @@ var _ = Describe("API", Ordered, func() {
187188
})
188189
})
189190

191+
Describe("Provider clear endpoint", func() {
192+
BeforeEach(func() {
193+
clk.SetTime(now)
194+
})
195+
196+
Context("when the provider is unknown", func() {
197+
It("should return a 404 response", func() {
198+
resp, _ := apiCall(srv, providerClearReq("unknown", "unknown", "unknown"))
199+
Expect(resp.StatusCode).To(Equal(http.StatusNotFound))
200+
})
201+
})
202+
203+
Context("when the provider is known", func() {
204+
Context("when there is existing state", func() {
205+
var clearResp *http.Response
206+
var clearRespBody string
207+
checkStateAndExpectEmptyPayload := func(resp *http.Response, respBody string) {
208+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
209+
expectedPayload := fmt.Sprintf(`{
210+
"last_updated_at": "%s",
211+
"acquired": null,
212+
"known": {}
213+
}`, clk.Now().Format(time.RFC3339))
214+
Expect(respBody).To(MatchJSON(expectedPayload))
215+
}
216+
217+
JustBeforeEach(func() {
218+
clearResp, clearRespBody = apiCall(srv, providerClearReq(owner, repo, baseRef))
219+
// should always succeed
220+
Expect(clearResp.StatusCode).To(Equal(http.StatusOK))
221+
})
222+
223+
BeforeEach(func() {
224+
providerState, opts := generateProviderState(now, owner, repo, baseRef, map[int]lease.Status{
225+
1: lease.StatusPending,
226+
2: lease.StatusAcquired,
227+
}, pointer.Int(2))
228+
storage.PrefillStorage(storageDir, providerState)
229+
currentTime := opts.LastUpdatedAt
230+
currentTime = currentTime.Add(time.Second)
231+
clk.SetTime(currentTime)
232+
})
233+
It("should return an empty list of requests", func() {
234+
checkStateAndExpectEmptyPayload(clearResp, clearRespBody)
235+
})
236+
It("should empty the local (in-memory) state", func() {
237+
// try to get the new state again, should be empty
238+
providerDetailsResp, providerDetailsRespBody := apiCall(srv, providerDetailsReq(owner, repo, baseRef))
239+
checkStateAndExpectEmptyPayload(providerDetailsResp, providerDetailsRespBody)
240+
})
241+
It("should empty the persisted (storage) state", func() {
242+
provider, err := srv.GetOrchestrator().Get(owner, repo, baseRef)
243+
Expect(err).To(BeNil())
244+
// fore hydration again
245+
err = provider.HydrateFromState(context.Background())
246+
Expect(err).To(BeNil())
247+
248+
providerDetailsResp, providerDetailsRespBody := apiCall(srv, providerDetailsReq(owner, repo, baseRef))
249+
checkStateAndExpectEmptyPayload(providerDetailsResp, providerDetailsRespBody)
250+
})
251+
})
252+
})
253+
})
254+
190255
Describe("Acquire endpoint", func() {
191256
BeforeEach(func() {
192257
clk.SetTime(now)
@@ -583,113 +648,113 @@ var _ = Describe("API", Ordered, func() {
583648
})
584649
})
585650
})
586-
})
587651

588-
Context("maximum request reached, Success build", func() {
589-
BeforeEach(func() {
590-
clk.SetTime(now)
591-
})
652+
Context("maximum request reached, Success build", func() {
653+
BeforeEach(func() {
654+
clk.SetTime(now)
655+
})
592656

593-
It("should complete the flow successfully", func() {
594-
max := configHelper.DefaultConfigRepoExpectedRequestCount
595-
for i := 1; i <= max-1; i++ {
596-
By(fmt.Sprintf("test acquire, request %d => should be pending", i), func() {
597-
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(i), i))
598-
Expect(resp.StatusCode).To(Equal(http.StatusOK))
599-
Expect(body).To(MatchJSON(fmt.Sprintf(`{
657+
It("should complete the flow successfully", func() {
658+
max := configHelper.DefaultConfigRepoExpectedRequestCount
659+
for i := 1; i <= max-1; i++ {
660+
By(fmt.Sprintf("test acquire, request %d => should be pending", i), func() {
661+
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(i), i))
662+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
663+
Expect(body).To(MatchJSON(fmt.Sprintf(`{
600664
"head_sha": "xxx-%d",
601665
"priority": %d,
602666
"status": "pending"
603667
}`, i, i)))
604-
})
605-
}
606-
By(fmt.Sprintf("test acquire, request %d => should be acquired", max), func() {
607-
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(max), max))
608-
Expect(resp.StatusCode).To(Equal(http.StatusOK))
609-
Expect(body).To(MatchJSON(fmt.Sprintf(`{
668+
})
669+
}
670+
By(fmt.Sprintf("test acquire, request %d => should be acquired", max), func() {
671+
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(max), max))
672+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
673+
Expect(body).To(MatchJSON(fmt.Sprintf(`{
610674
"head_sha": "xxx-%d",
611675
"priority": %d,
612676
"status": "acquired"
613677
}`, max, max)))
614-
})
615-
By(fmt.Sprintf("test release (success), request %d => should be completed", max), func() {
616-
resp, body := apiCall(srv, releaseReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(max), max, lease.StatusSuccess))
617-
Expect(resp.StatusCode).To(Equal(http.StatusOK))
618-
Expect(body).To(MatchJSON(fmt.Sprintf(`{
678+
})
679+
By(fmt.Sprintf("test release (success), request %d => should be completed", max), func() {
680+
resp, body := apiCall(srv, releaseReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(max), max, lease.StatusSuccess))
681+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
682+
Expect(body).To(MatchJSON(fmt.Sprintf(`{
619683
"head_sha": "xxx-%d",
620684
"priority": %d,
621685
"status": "completed"
622686
}`, max, max)))
623-
})
624-
for i := 1; i <= max-1; i++ {
625-
By(fmt.Sprintf("test acquire, request %d => should be completed", i), func() {
626-
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(i), i))
627-
Expect(resp.StatusCode).To(Equal(http.StatusOK))
628-
Expect(body).To(MatchJSON(fmt.Sprintf(`{
687+
})
688+
for i := 1; i <= max-1; i++ {
689+
By(fmt.Sprintf("test acquire, request %d => should be completed", i), func() {
690+
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(i), i))
691+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
692+
Expect(body).To(MatchJSON(fmt.Sprintf(`{
629693
"head_sha": "xxx-%d",
630694
"priority": %d,
631695
"status": "completed"
632696
}`, i, i)))
633-
})
634-
}
697+
})
698+
}
699+
})
635700
})
636-
})
637701

638-
Context("maximum request reached, Failed build", func() {
639-
BeforeEach(func() {
640-
clk.SetTime(now)
641-
})
702+
Context("maximum request reached, Failed build", func() {
703+
BeforeEach(func() {
704+
clk.SetTime(now)
705+
})
642706

643-
It("should complete the flow successfully", func() {
644-
max := configHelper.DefaultConfigRepoExpectedRequestCount
645-
for i := 1; i <= max-1; i++ {
646-
By(fmt.Sprintf("test acquire, request %d => should be pending", i), func() {
647-
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(i), i))
648-
Expect(resp.StatusCode).To(Equal(http.StatusOK))
649-
Expect(body).To(MatchJSON(fmt.Sprintf(`{
707+
It("should complete the flow successfully", func() {
708+
max := configHelper.DefaultConfigRepoExpectedRequestCount
709+
for i := 1; i <= max-1; i++ {
710+
By(fmt.Sprintf("test acquire, request %d => should be pending", i), func() {
711+
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(i), i))
712+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
713+
Expect(body).To(MatchJSON(fmt.Sprintf(`{
650714
"head_sha": "xxx-%d",
651715
"priority": %d,
652716
"status": "pending"
653717
}`, i, i)))
654-
})
655-
}
656-
By(fmt.Sprintf("test acquire, request %d => should be acquired", max), func() {
657-
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(max), max))
658-
Expect(resp.StatusCode).To(Equal(http.StatusOK))
659-
Expect(body).To(MatchJSON(fmt.Sprintf(`{
718+
})
719+
}
720+
By(fmt.Sprintf("test acquire, request %d => should be acquired", max), func() {
721+
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(max), max))
722+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
723+
Expect(body).To(MatchJSON(fmt.Sprintf(`{
660724
"head_sha": "xxx-%d",
661725
"priority": %d,
662726
"status": "acquired"
663727
}`, max, max)))
664-
})
665-
By(fmt.Sprintf("test release (failure), request %d => should be failure", max), func() {
666-
resp, body := apiCall(srv, releaseReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(max), max, lease.StatusFailure))
667-
Expect(resp.StatusCode).To(Equal(http.StatusOK))
668-
Expect(body).To(MatchJSON(fmt.Sprintf(`{
728+
})
729+
By(fmt.Sprintf("test release (failure), request %d => should be failure", max), func() {
730+
resp, body := apiCall(srv, releaseReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(max), max, lease.StatusFailure))
731+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
732+
Expect(body).To(MatchJSON(fmt.Sprintf(`{
669733
"head_sha": "xxx-%d",
670734
"priority": %d,
671735
"status": "failure"
672736
}`, max, max)))
673-
})
674-
for i := 1; i <= max-2; i++ {
675-
By(fmt.Sprintf("test acquire, request %d => should be pending", i), func() {
676-
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(i), i))
677-
Expect(resp.StatusCode).To(Equal(http.StatusOK))
678-
Expect(body).To(MatchJSON(fmt.Sprintf(`{
737+
})
738+
for i := 1; i <= max-2; i++ {
739+
By(fmt.Sprintf("test acquire, request %d => should be pending", i), func() {
740+
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(i), i))
741+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
742+
Expect(body).To(MatchJSON(fmt.Sprintf(`{
679743
"head_sha": "xxx-%d",
680744
"priority": %d,
681745
"status": "pending"
682746
}`, i, i)))
683-
})
684-
}
685-
By(fmt.Sprintf("test acquire, request %d => should be acquired", max-1), func() {
686-
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(max-1), max-1))
687-
Expect(resp.StatusCode).To(Equal(http.StatusOK))
688-
Expect(body).To(MatchJSON(fmt.Sprintf(`{
747+
})
748+
}
749+
By(fmt.Sprintf("test acquire, request %d => should be acquired", max-1), func() {
750+
resp, body := apiCall(srv, acquireReq(owner, repo, baseRef, "xxx-"+strconv.Itoa(max-1), max-1))
751+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
752+
Expect(body).To(MatchJSON(fmt.Sprintf(`{
689753
"head_sha": "xxx-%d",
690754
"priority": %d,
691755
"status": "acquired"
692756
}`, max-1, max-1)))
757+
})
693758
})
694759
})
695760
})
@@ -713,6 +778,15 @@ func providerDetailsReq(owner string, repo string, baseRef string) *http.Request
713778
)
714779
}
715780

781+
// providerClearReq returns a pre-configured request for the "DELETE /:owner/:repo/:baseRef" endpoint
782+
func providerClearReq(owner string, repo string, baseRef string) *http.Request {
783+
return httptest.NewRequest(
784+
"DELETE",
785+
fmt.Sprintf("/%s/%s/%s", owner, repo, baseRef),
786+
nil,
787+
)
788+
}
789+
716790
// acquireReq returns a pre-configured request for the "POST /:owner/:repo/:baseRef/acquire" endpoint
717791
func acquireReq(owner string, repo string, baseRef string, headSha string, priority int) *http.Request {
718792
req := httptest.NewRequest(

internal/lease/leaseprovider.go

+21-5
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ type NewProviderStateOpts struct {
7474
}
7575

7676
func NewProviderState(opts NewProviderStateOpts) *ProviderState {
77+
if opts.Known == nil {
78+
opts.Known = make(map[string]*Request)
79+
}
7780
return &ProviderState{
7881
id: opts.ID,
7982
lastUpdatedAt: opts.LastUpdatedAt,
@@ -155,6 +158,7 @@ type Provider interface {
155158
Acquire(ctx context.Context, leaseRequest *Request) (*Request, error)
156159
Release(ctx context.Context, leaseRequest *Request) (*Request, error)
157160
HydrateFromState(ctx context.Context) error
161+
Clear(ctx context.Context)
158162
}
159163

160164
type leaseProviderImpl struct {
@@ -184,11 +188,10 @@ func NewLeaseProvider(opts ProviderOpts) Provider {
184188
clock: cl,
185189
storage: st,
186190
metrics: opts.Metrics,
187-
state: &ProviderState{
188-
id: opts.ID,
189-
lastUpdatedAt: cl.Now(),
190-
known: make(map[string]*Request),
191-
},
191+
state: NewProviderState(NewProviderStateOpts{
192+
ID: opts.ID,
193+
LastUpdatedAt: cl.Now(),
194+
}),
192195
}
193196
}
194197

@@ -500,3 +503,16 @@ func (lp *leaseProviderImpl) Release(ctx context.Context, leaseRequest *Request)
500503

501504
return req, fmt.Errorf("unknown condition for commit %s", leaseRequest.HeadSHA)
502505
}
506+
507+
func (lp *leaseProviderImpl) Clear(ctx context.Context) {
508+
lp.mutex.Lock()
509+
defer lp.mutex.Unlock()
510+
defer lp.updateMetrics()
511+
512+
lp.state = NewProviderState(NewProviderStateOpts{
513+
ID: lp.state.id,
514+
LastUpdatedAt: lp.clock.Now(),
515+
})
516+
517+
lp.saveState(ctx)
518+
}

internal/lease/leaseprovider_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,57 @@ func Test_leaseProviderImpl_evaluateRequest_errorNoLeaseAssigned(t *testing.T) {
440440
assert.Equal(t, req1copy, req1copy)
441441
}
442442

443+
type clearTestFakeStorage struct{ state *ProviderState }
444+
445+
func (s *clearTestFakeStorage) Init() error { return nil }
446+
func (s *clearTestFakeStorage) Close() error { return nil }
447+
func (s *clearTestFakeStorage) Hydrate(context.Context, *ProviderState) error { return nil }
448+
func (s *clearTestFakeStorage) Save(ctx context.Context, obj *ProviderState) error {
449+
s.state = obj
450+
return nil
451+
}
452+
func (s *clearTestFakeStorage) HealthCheck(context.Context, func() *ProviderState) bool { return true }
453+
454+
func Test_leaseProviderImpl_Clear(t *testing.T) {
455+
id := "provider-id"
456+
now := time.Now()
457+
clk := clocktesting.NewFakePassiveClock(now)
458+
storage := &clearTestFakeStorage{}
459+
lp := NewLeaseProvider(ProviderOpts{TTL: 1 * time.Hour, StabilizeDuration: time.Minute, ExpectedRequestCount: 2, ID: id, Clock: clk, Storage: storage})
460+
lpImpl, ok := lp.(*leaseProviderImpl)
461+
assert.True(t, ok)
462+
463+
req1 := &Request{
464+
HeadSHA: "sha1",
465+
Priority: 1,
466+
}
467+
req2 := &Request{
468+
HeadSHA: "sha2",
469+
Priority: 2,
470+
}
471+
472+
// Inject the two requests
473+
req1, err := lp.Acquire(context.Background(), req1)
474+
assert.NoError(t, err)
475+
assert.Equal(t, StatusPending, *req1.Status)
476+
req2, err = lp.Acquire(context.Background(), req2)
477+
assert.NoError(t, err)
478+
assert.Equal(t, StatusAcquired, *req2.Status)
479+
480+
// Try to clear
481+
lp.Clear(context.Background())
482+
483+
expectedState := &ProviderState{
484+
id: id,
485+
lastUpdatedAt: now,
486+
acquired: nil,
487+
known: make(map[string]*Request),
488+
}
489+
assert.NotNil(t, t, lpImpl.state)
490+
assert.Equal(t, expectedState, lpImpl.state)
491+
assert.Equal(t, expectedState, storage.state)
492+
}
493+
443494
func Test_leaseProviderImpl__FullLoop_ReleaseSuccess(t *testing.T) {
444495
lp := NewLeaseProvider(ProviderOpts{TTL: 1 * time.Hour, StabilizeDuration: 1 * time.Minute, ExpectedRequestCount: 3})
445496
req1 := &Request{

0 commit comments

Comments
 (0)