Skip to content

Commit 031fe61

Browse files
committed
outofband/worker: set kv replicas through cli params for consistency
with other controllers
1 parent 2b8e222 commit 031fe61

File tree

5 files changed

+16
-18
lines changed

5 files changed

+16
-18
lines changed

cmd/outofband.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ var (
3636

3737
// asWorker when true runs Alloy as a worker listening on the NATS JS for Conditions to act on.
3838
asWorker bool
39+
40+
// The number of replicaCount to use NATS KV data
41+
replicaCount int
3942
)
4043

4144
// outofband inventory, bios configuration collection command
@@ -90,7 +93,7 @@ func runWorker(ctx context.Context, alloy *app.App) {
9093
alloy.Logger.Fatal(err)
9194
}
9295

93-
w, err := worker.New(ctx, facilityCode, stream, alloy.Config, alloy.SyncWg, alloy.Logger)
96+
w, err := worker.New(ctx, facilityCode, replicaCount, stream, alloy.Config, alloy.SyncWg, alloy.Logger)
9497
if err != nil {
9598
alloy.Logger.Fatal(err)
9699
}
@@ -127,6 +130,7 @@ func init() {
127130
cmdOutofband.PersistentFlags().StringVar(&csvFile, "csv-file", "assets.csv", "CSV file containing BMC credentials for assets.")
128131
cmdOutofband.PersistentFlags().StringVar(&facilityCode, "facility-code", "sandbox", "The facility code this Alloy instance is associated with")
129132
cmdOutofband.PersistentFlags().BoolVar(&asWorker, "worker", false, "Run Alloy as a worker listening for conditions on NATS")
133+
cmdOutofband.PersistentFlags().IntVarP(&replicaCount, "replica-count", "r", 3, "The number of replicaCount to use for NATS KV data") // nolint:gomnd // obvious int is obvious
130134

131135
rootCmd.AddCommand(cmdOutofband)
132136
}

internal/app/config.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,6 @@ func (a *App) envVarNatsOverrides() error {
240240
a.Config.NatsOptions.Consumer.Name = a.v.GetString("nats.consumer.name")
241241
}
242242

243-
if a.v.GetInt("nats.kv.replicaCount") != 0 {
244-
if a.Config.NatsOptions.KV == nil {
245-
a.Config.NatsOptions.KV = &events.NatsKVOptions{}
246-
}
247-
248-
a.Config.NatsOptions.KV.ReplicaCount = a.v.GetInt("nats.kv.replicaCount")
249-
}
250-
251243
if len(a.v.GetStringSlice("nats.consumer.subscribeSubjects")) != 0 {
252244
a.Config.NatsOptions.Consumer.SubscribeSubjects = a.v.GetStringSlice("nats.consumer.subscribeSubjects")
253245
}

internal/worker/kv_status.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ var (
2929
}
3030
)
3131

32-
func createOrBindKVBucketWithOpts(s events.Stream, opts *events.NatsKVOptions) (nats.KeyValue, error) {
32+
func createOrBindKVBucketWithOpts(s events.Stream, replicaCount int) (nats.KeyValue, error) {
3333
kvOptions := defaultKVOpts
3434

35-
if opts.ReplicaCount > 1 {
36-
kvOptions = append(kvOptions, kv.WithReplicas(opts.ReplicaCount))
35+
if replicaCount > 1 {
36+
kvOptions = append(kvOptions, kv.WithReplicas(replicaCount))
3737
}
3838

3939
js, ok := s.(*events.NatsJetstream)
@@ -52,8 +52,8 @@ type statusKVPublisher struct {
5252
facility string
5353
}
5454

55-
func newStatusKVPublisher(s events.Stream, log *logrus.Logger, workerID, facility string, opts *events.NatsKVOptions) (*statusKVPublisher, error) {
56-
statusKV, err := createOrBindKVBucketWithOpts(s, opts)
55+
func newStatusKVPublisher(s events.Stream, log *logrus.Logger, workerID, facility string, replicaCount int) (*statusKVPublisher, error) {
56+
statusKV, err := createOrBindKVBucketWithOpts(s, replicaCount)
5757
if err != nil {
5858
return nil, err
5959
}

internal/worker/liveness.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (w *Worker) startWorkerLivenessCheckin(ctx context.Context) {
3434
kv.WithTTL(livenessTTL),
3535
}
3636

37-
// any setting of replicas (even 1) chokes NATS in non-clustered mode
37+
// any setting of replicaCount (even 1) chokes NATS in non-clustered mode
3838
if w.replicaCount != 1 {
3939
opts = append(opts, kv.WithReplicas(w.replicaCount))
4040
}

internal/worker/worker.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,15 @@ type Worker struct {
6464
logger *logrus.Logger
6565
name string
6666
facilityCode string
67-
replicaCount int
6867
concurrency int
68+
replicaCount int
6969
dispatched int32
7070
}
7171

7272
func New(
7373
ctx context.Context,
7474
facilityCode string,
75+
replicaCount int,
7576
stream events.Stream,
7677
cfg *app.Configuration,
7778
syncWG *sync.WaitGroup,
@@ -92,6 +93,7 @@ func New(
9293
return &Worker{
9394
name: id,
9495
facilityCode: facilityCode,
96+
replicaCount: replicaCount,
9597
cfg: cfg,
9698
syncWG: syncWG,
9799
logger: logger,
@@ -121,7 +123,7 @@ func (w *Worker) Run(ctx context.Context) {
121123
// register worker in NATS active-controllers kv bucket
122124
w.startWorkerLivenessCheckin(ctx)
123125

124-
if _, err := createOrBindKVBucketWithOpts(w.stream, w.cfg.NatsOptions.KV); err != nil {
126+
if _, err := createOrBindKVBucketWithOpts(w.stream, w.replicaCount); err != nil {
125127
w.logger.WithError(err).Error("failed to create/bind to status kv" + inventoryStatusKVBucket)
126128
}
127129

@@ -286,7 +288,7 @@ func (w *Worker) doWork(ctx context.Context, condition *rctypes.Condition, e eve
286288

287289
startTS := time.Now()
288290

289-
publisher, err := newStatusKVPublisher(w.stream, w.logger, w.id.String(), w.facilityCode, w.cfg.NatsOptions.KV)
291+
publisher, err := newStatusKVPublisher(w.stream, w.logger, w.id.String(), w.facilityCode, w.replicaCount)
290292
if err != nil {
291293
w.logger.WithError(err).Warn("status KV init - internal error")
292294

0 commit comments

Comments
 (0)