Skip to content

Commit c58b43d

Browse files
FS-1252; Streamline fleet-scheduler inventory task threading by removing threading (#36)
1 parent cc7e077 commit c58b43d

File tree

4 files changed

+45
-181
lines changed

4 files changed

+45
-181
lines changed

cmd/inventory.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ import (
1313
)
1414

1515
var (
16-
pageSize int
17-
inFlightPages int
16+
pageSize int
1817
)
1918

2019
var cmdInventory = &cobra.Command{
@@ -31,7 +30,6 @@ var cmdInventory = &cobra.Command{
3130

3231
func init() {
3332
rootCmd.PersistentFlags().IntVar(&pageSize, "page-size", 4, "Define how many servers to query per request")
34-
rootCmd.PersistentFlags().IntVar(&inFlightPages, "inflight-pages", 1, "Define how many server pages to queue up before waiting for the previous to finish creating the condition")
3533
rootCmd.AddCommand(cmdInventory)
3634
}
3735

@@ -66,7 +64,7 @@ func inventory(ctx context.Context) error {
6664
return err
6765
}
6866

69-
err = newClient.CreateConditionInventoryForAllServers(pageSize, inFlightPages)
67+
err = newClient.CreateConditionInventoryForAllServers(pageSize)
7068
if err != nil {
7169
return err
7270
}

cmd/test.go

Lines changed: 0 additions & 82 deletions
This file was deleted.

internal/client/fleetdb.go

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,10 @@
11
package client
22

33
import (
4-
"time"
5-
6-
"github.com/sirupsen/logrus"
7-
"golang.org/x/sync/semaphore"
8-
94
fleetdbapi "github.com/metal-toolbox/fleetdb/pkg/api/v1"
105
fleetDBRivets "github.com/metal-toolbox/rivets/serverservice"
116
)
127

13-
func (c *Client) gatherServers(pageSize int, serverCh chan *fleetdbapi.Server, concLimiter *semaphore.Weighted) {
14-
// signal to receiver that we are done
15-
defer close(serverCh)
16-
17-
// First page, use the response from it to figure out how many pages we have to loop through
18-
// Dont change page size
19-
servers, response, err := c.getServerPage(pageSize, 1)
20-
if err != nil {
21-
c.log.WithFields(logrus.Fields{
22-
"pageSize": pageSize,
23-
"pageIndex": 1,
24-
}).Logger.Errorf("Failed to get list of servers: %s", err.Error())
25-
return
26-
}
27-
totalPages := response.TotalPages
28-
29-
if !concLimiter.TryAcquire(int64(response.PageSize)) {
30-
c.log.Error("Failed to acquire semaphore! Going to attempt to continue.")
31-
}
32-
33-
// send first page of servers to the channel
34-
for i := range servers {
35-
serverCh <- &servers[i]
36-
}
37-
38-
c.log.WithFields(logrus.Fields{
39-
"index": 1,
40-
"iterations": totalPages,
41-
"got": len(servers),
42-
}).Trace("Got server page")
43-
44-
// Start the second page, and loop through rest the pages
45-
for i := 2; i <= totalPages; i++ {
46-
servers, response, err = c.getServerPage(pageSize, i)
47-
if err != nil {
48-
c.log.WithFields(logrus.Fields{
49-
"pageSize": pageSize,
50-
"pageIndex": i,
51-
}).Logger.Errorf("Failed to get page of servers, attempting to continue: %s", err.Error())
52-
53-
continue
54-
}
55-
56-
c.log.WithFields(logrus.Fields{
57-
"index": i,
58-
"iterations": totalPages,
59-
"got": len(servers),
60-
}).Trace("Got server page")
61-
62-
// throttle this loop
63-
// Doing a spinlock to prevent a permanent lock if the ctx gets canceled
64-
for !concLimiter.TryAcquire(int64(response.PageSize)) && c.ctx.Err() == nil {
65-
time.Sleep(time.Second)
66-
}
67-
68-
if c.ctx.Err() != nil {
69-
c.log.Warn("Context canceled, stopping server gathering")
70-
return
71-
}
72-
73-
for i := range servers {
74-
serverCh <- &servers[i]
75-
}
76-
}
77-
}
78-
798
func (c *Client) getServerPage(pageSize, page int) ([]fleetdbapi.Server, *fleetdbapi.ServerResponse, error) {
809
params := &fleetdbapi.ServerListParams{
8110
FacilityCode: c.cfg.FacilityCode,

internal/client/tasks.go

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,41 +2,60 @@ package client
22

33
import (
44
"github.com/sirupsen/logrus"
5-
"golang.org/x/sync/semaphore"
6-
7-
// "github.com/sirupsen/logrus"
8-
fleetdbapi "github.com/metal-toolbox/fleetdb/pkg/api/v1"
95
)
106

11-
func (c *Client) CreateConditionInventoryForAllServers(pageSize, inFlightPages int) error {
12-
// Start thread to start collecting servers
13-
serverCh, concLimiter, err := c.GatherServersNonBlocking(pageSize, inFlightPages)
7+
func (c *Client) CreateConditionInventoryForAllServers(pageSize int) error {
8+
// First page, use the response from it to figure out how many pages we have to loop through
9+
// Dont change page size
10+
servers, response, err := c.getServerPage(pageSize, 1)
1411
if err != nil {
12+
c.log.WithFields(logrus.Fields{
13+
"pageSize": pageSize,
14+
"pageIndex": 1,
15+
}).Logger.Errorf("Failed to get list of servers: %s", err.Error())
1516
return err
1617
}
18+
totalPages := response.TotalPages
1719

18-
// Loop through servers and create conditions
19-
for server := range serverCh {
20-
err := c.CreateConditionInventory(server.UUID)
20+
// send first page of servers to the channel
21+
for i := range servers {
22+
err = c.CreateConditionInventory(servers[i].UUID)
2123
if err != nil {
22-
c.log.WithFields(logrus.Fields{
23-
"server": server.UUID,
24-
}).Logger.Error("Failed to create condition")
24+
return err
2525
}
26-
27-
concLimiter.Release(1)
2826
}
2927

30-
return nil
31-
}
28+
c.log.WithFields(logrus.Fields{
29+
"index": 1,
30+
"iterations": totalPages,
31+
"got": len(servers),
32+
}).Trace("Got server page")
33+
34+
// Start the second page, and loop through rest the pages
35+
for i := 2; i <= totalPages; i++ {
36+
servers, _, err = c.getServerPage(pageSize, i)
37+
if err != nil {
38+
c.log.WithFields(logrus.Fields{
39+
"pageSize": pageSize,
40+
"pageIndex": i,
41+
}).Logger.Errorf("Failed to get page of servers, attempting to continue: %s", err.Error())
3242

33-
func (c *Client) GatherServersNonBlocking(pageSize, inFlightPages int) (chan *fleetdbapi.Server, *semaphore.Weighted, error) {
34-
serverCh := make(chan *fleetdbapi.Server)
35-
concLimiter := semaphore.NewWeighted(int64(inFlightPages * pageSize))
43+
continue
44+
}
3645

37-
go func() {
38-
c.gatherServers(pageSize, serverCh, concLimiter)
39-
}()
46+
c.log.WithFields(logrus.Fields{
47+
"index": i,
48+
"iterations": totalPages,
49+
"got": len(servers),
50+
}).Trace("Got server page")
51+
52+
for i := range servers {
53+
err = c.CreateConditionInventory(servers[i].UUID)
54+
if err != nil {
55+
return err
56+
}
57+
}
58+
}
4059

41-
return serverCh, concLimiter, nil
60+
return nil
4261
}

0 commit comments

Comments
 (0)