Skip to content

Commit de5f0fd

Browse files
committed
Add controller package that schedules collection and listens for events
1 parent 7f1659c commit de5f0fd

File tree

5 files changed

+758
-0
lines changed

5 files changed

+758
-0
lines changed

internal/controller/checkpoint.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"time"
9+
10+
"github.com/google/uuid"
11+
v1cclient "github.com/metal-toolbox/conditionorc/pkg/api/v1/client"
12+
v1ctypes "github.com/metal-toolbox/conditionorc/pkg/api/v1/types"
13+
cptypes "github.com/metal-toolbox/conditionorc/pkg/types"
14+
)
15+
16+
var (
17+
ErrCheckpointSet = errors.New("error setting task checkpoint")
18+
ErrConditionOrcResp = errors.New("error in conditionorc response")
19+
)
20+
21+
// TaskCheckpointer checkpoints a task by updating its status
22+
//
23+
// Going ahead long running tasks are to set agreed 'checkpoints' where it can resume from in cases of failure.
24+
type TaskCheckpointer interface {
25+
Set(ctx context.Context, task *Task, state cptypes.ConditionState, status string) error
26+
Get() error
27+
}
28+
29+
// OrcCheckpointer implements the TaskCheckpointer interface for the conditionorc-api and the events subsystem.
30+
type OrcCheckpointer struct {
31+
// tasksLocker is the list of tasks being worked on by the orchestrator under a mutex.
32+
tasksLocker *TasksLocker
33+
34+
// orcclient is the condition orchestrator client
35+
orcclient *v1cclient.Client // condition orchestrator client
36+
}
37+
38+
// NewTaskCheckpointer returns a task checkpointer to persist task progress.
39+
func NewTaskCheckpointer(serverAddress string, tasks *TasksLocker) (TaskCheckpointer, error) {
40+
orcclient, err := v1cclient.NewClient(serverAddress)
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
return &OrcCheckpointer{tasksLocker: tasks, orcclient: orcclient}, nil
46+
}
47+
48+
// Get returns current checkpoint information on a task
49+
//
50+
// TO be implemented.
51+
func (c *OrcCheckpointer) Get() error {
52+
return nil
53+
}
54+
55+
// SetTaskProgress updates task progress in the events subsystem, the condition orchestrator and the local TasksLocker data under mutex.
56+
//
57+
// The NATS streaming subsystem has to be acked to makes sure it does not redeliver the same message.
58+
func (c *OrcCheckpointer) Set(ctx context.Context, task *Task, state cptypes.ConditionState, status string) error {
59+
previousState := task.State
60+
task.State = state
61+
task.UpdatedAt = time.Now()
62+
63+
if status != "" {
64+
task.Status = status
65+
}
66+
67+
// update condition orchestrator
68+
if err := c.updateCondition(
69+
ctx,
70+
task.Urn.ResourceID,
71+
cptypes.ConditionKind(task.Data.EventType),
72+
state,
73+
statusInfoJSON(status),
74+
); err != nil {
75+
return err
76+
}
77+
78+
switch task.State {
79+
case cptypes.Pending:
80+
// mark task as in progress in the events subsystem
81+
// resetting the event subsystem timer for this task.
82+
if err := task.Msg.InProgress(); err != nil {
83+
return err
84+
}
85+
86+
c.tasksLocker.Add(*task)
87+
88+
case cptypes.Active:
89+
// mark task as in progress in the events subsystem
90+
// resetting the event subsystem timer for this task.
91+
if err := task.Msg.InProgress(); err != nil {
92+
return err
93+
}
94+
95+
if previousState != cptypes.Active {
96+
c.tasksLocker.Add(*task)
97+
} else {
98+
c.tasksLocker.Update(*task)
99+
}
100+
101+
case cptypes.Succeeded, cptypes.Failed:
102+
if err := task.Msg.Ack(); err != nil {
103+
return err
104+
}
105+
106+
c.tasksLocker.Purge(task.ID)
107+
}
108+
109+
return nil
110+
}
111+
112+
func statusInfoJSON(s string) json.RawMessage {
113+
return []byte(fmt.Sprintf("{%q: %q}", "output", s))
114+
}
115+
116+
func (c *OrcCheckpointer) updateCondition(ctx context.Context, serverID uuid.UUID, kind cptypes.ConditionKind, state cptypes.ConditionState, status json.RawMessage) error {
117+
response, err := c.orcclient.ServerConditionGet(ctx, serverID, kind)
118+
if err != nil {
119+
return err
120+
}
121+
122+
if response == nil || response.Record == nil || response.Record.Condition == nil {
123+
return ErrConditionOrcResp
124+
}
125+
126+
update := v1ctypes.ConditionUpdate{
127+
State: state,
128+
Status: status,
129+
ResourceVersion: response.Record.Condition.ResourceVersion,
130+
}
131+
132+
// TODO: add retries for resource version mismatch errors
133+
_, err = c.orcclient.ServerConditionUpdate(ctx, serverID, kind, update)
134+
if err != nil {
135+
return err
136+
}
137+
138+
return nil
139+
}

internal/controller/controller.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"os"
7+
"os/signal"
8+
"sync"
9+
"syscall"
10+
"time"
11+
12+
"github.com/metal-toolbox/alloy/internal/app"
13+
"github.com/metal-toolbox/alloy/internal/model"
14+
"github.com/metal-toolbox/alloy/internal/store"
15+
16+
cptypes "github.com/metal-toolbox/conditionorc/pkg/types"
17+
"github.com/sirupsen/logrus"
18+
"go.hollow.sh/toolbox/events"
19+
)
20+
21+
var (
22+
concurrency = 10
23+
collectInterval = 1 * time.Hour
24+
collectIntervalSplay = 10 * time.Minute
25+
fetchEventsInterval = 10 * time.Second
26+
AckActiveTimeout = 3 * time.Minute
27+
ackActiveInterval = 1 * time.Minute
28+
TaskTimeout = 180 * time.Minute
29+
)
30+
31+
type Controller struct {
32+
repository store.Repository
33+
streamBroker events.StreamBroker
34+
checkpointHelper TaskCheckpointer
35+
cfg *app.Configuration
36+
tasksLocker *TasksLocker
37+
syncWG *sync.WaitGroup
38+
logger *logrus.Logger
39+
iterCollectActive bool
40+
}
41+
42+
func New(
43+
ctx context.Context,
44+
streamBroker events.StreamBroker,
45+
cfg *app.Configuration,
46+
syncWG *sync.WaitGroup,
47+
logger *logrus.Logger,
48+
) (*Controller, error) {
49+
tasksLocker := NewTasksLocker()
50+
51+
checkpointHelper, err := NewTaskCheckpointer("http://conditionorc-api:9001", tasksLocker)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
if cfg.Concurrency == 0 {
57+
cfg.Concurrency = concurrency
58+
}
59+
60+
if cfg.CollectInterval == 0 {
61+
cfg.CollectInterval = collectInterval
62+
}
63+
64+
if cfg.CollectIntervalSplay == 0 {
65+
cfg.CollectIntervalSplay = collectIntervalSplay
66+
}
67+
68+
repository, err := store.NewRepository(ctx, cfg.StoreKind, model.AppKindOutOfBand, cfg, logger)
69+
if err != nil {
70+
return nil, err
71+
}
72+
73+
return &Controller{
74+
cfg: cfg,
75+
tasksLocker: tasksLocker,
76+
syncWG: syncWG,
77+
logger: logger,
78+
repository: repository,
79+
streamBroker: streamBroker,
80+
checkpointHelper: checkpointHelper,
81+
}, nil
82+
}
83+
84+
func (c *Controller) connectStream(ctx context.Context) (events.MsgCh, error) {
85+
if err := c.streamBroker.Open(); err != nil {
86+
return nil, err
87+
}
88+
89+
return c.streamBroker.Subscribe(ctx)
90+
}
91+
92+
func (c *Controller) Run(ctx context.Context) {
93+
// TODO: implement stream reconnect loop
94+
eventCh, err := c.connectStream(ctx)
95+
if err != nil {
96+
c.logger.WithError(err).Error("event stream connection error")
97+
98+
c.loopWithoutEventstream(ctx)
99+
100+
return
101+
}
102+
103+
c.logger.Info("connected to event stream.")
104+
105+
c.loopWithEventstream(ctx, eventCh)
106+
}
107+
108+
func (c *Controller) loopWithEventstream(ctx context.Context, eventCh events.MsgCh) {
109+
tickerFetchEvent := time.NewTicker(fetchEventsInterval).C
110+
tickerAckActive := time.NewTicker(ackActiveInterval).C
111+
tickerCollectAll := time.NewTicker(c.splayInterval()).C
112+
113+
// to kick alloy to collect for all assets.
114+
sigHupCh := make(chan os.Signal, 1)
115+
signal.Notify(sigHupCh, syscall.SIGHUP)
116+
117+
for {
118+
select {
119+
case <-ctx.Done():
120+
c.streamBroker.Close()
121+
122+
case <-tickerFetchEvent:
123+
if c.maximumActive() {
124+
continue
125+
}
126+
127+
c.syncWG.Add(1)
128+
129+
go func() { defer c.syncWG.Done(); c.fetchEvent(ctx, eventCh) }()
130+
131+
case <-tickerAckActive:
132+
c.syncWG.Add(1)
133+
134+
go func() { defer c.syncWG.Done(); c.ackActive(ctx) }()
135+
136+
case <-tickerCollectAll:
137+
c.syncWG.Add(1)
138+
139+
go func() { defer c.syncWG.Done(); c.iterCollectOutofband(ctx) }()
140+
141+
case <-sigHupCh:
142+
c.syncWG.Add(1)
143+
144+
go func() { defer c.syncWG.Done(); c.iterCollectOutofband(ctx) }()
145+
146+
case event := <-eventCh:
147+
c.syncWG.Add(1)
148+
149+
go func() { defer c.syncWG.Done(); c.processEvent(ctx, event) }()
150+
}
151+
}
152+
}
153+
154+
func (c *Controller) loopWithoutEventstream(ctx context.Context) {
155+
tickerCollectAll := time.NewTicker(c.splayInterval()).C
156+
157+
// to kick alloy to collect for all assets.
158+
sigHupCh := make(chan os.Signal, 1)
159+
signal.Notify(sigHupCh, syscall.SIGHUP)
160+
161+
for {
162+
select {
163+
case <-ctx.Done():
164+
return
165+
166+
case <-sigHupCh:
167+
c.syncWG.Add(1)
168+
169+
go func() { defer c.syncWG.Done(); c.iterCollectOutofband(ctx) }()
170+
171+
case <-tickerCollectAll:
172+
c.syncWG.Add(1)
173+
174+
go func() { defer c.syncWG.Done(); c.iterCollectOutofband(ctx) }()
175+
}
176+
}
177+
}
178+
179+
func (c *Controller) splayInterval() time.Duration {
180+
// randomize to given splay value and add to interval
181+
rand.Seed(time.Now().UnixNano())
182+
183+
// nolint:gosec // Ideally this should be using crypto/rand,
184+
// although the generated random value here is just used to add jitter/splay to
185+
// the interval value and is not used outside of this context.
186+
return c.cfg.CollectInterval + time.Duration(rand.Int63n(int64(c.cfg.CollectIntervalSplay)))
187+
}
188+
189+
func (c *Controller) maximumActive() bool {
190+
active := c.tasksLocker.List()
191+
192+
var found int
193+
194+
for idx := range active {
195+
if !cptypes.ConditionStateFinalized(active[idx].State) {
196+
found++
197+
}
198+
}
199+
200+
return found >= concurrency
201+
}

0 commit comments

Comments
 (0)