@@ -24,12 +24,14 @@ import (
24
24
"iter"
25
25
"log/slog"
26
26
"slices"
27
+ "time"
27
28
28
29
"filippo.io/age"
29
30
"github.com/gravitational/trace"
30
31
31
32
recordingencryptionv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/recordingencryption/v1"
32
33
"github.com/gravitational/teleport/api/types"
34
+ "github.com/gravitational/teleport/api/utils/retryutils"
33
35
"github.com/gravitational/teleport/lib/backend"
34
36
"github.com/gravitational/teleport/lib/cryptosuites"
35
37
"github.com/gravitational/teleport/lib/events"
@@ -301,34 +303,56 @@ type RecordingEncryptionResolver interface {
301
303
ResolveRecordingEncryption (ctx context.Context ) (* recordingencryptionv1.RecordingEncryption , error )
302
304
}
303
305
304
- // WatchConfig captures required dependencies for building a RecordingEncyprtion watcher that
306
+ // WatchConfig captures required dependencies for building a RecordingEncryption watcher that
305
307
// automatically resolves state.
306
308
type WatchConfig struct {
307
309
Events types.Events
308
310
Resolver RecordingEncryptionResolver
309
311
ClusterConfig services.ClusterConfiguration
310
312
Logger * slog.Logger
311
- LockConfig backend.RunWhileLockedConfig
313
+ LockConfig * backend.RunWhileLockedConfig
312
314
}
313
315
314
- // Watch creates a watcher responsible for responding to changes in the RecordingEncryption
315
- // resource. This is how auth servers cooperate and ensure there are accessible wrapped keys for each unique
316
- // keystore configuration in a cluster.
317
- func Watch (ctx context.Context , cfg WatchConfig ) error {
316
+ // A Watcher watches for changes to the RecordingEncryption resource and resolves the state for the calling
317
+ // auth server.
318
+ type Watcher struct {
319
+ events types.Events
320
+ resolver RecordingEncryptionResolver
321
+ clusterConfig services.ClusterConfiguration
322
+ logger * slog.Logger
323
+ lockConfig * backend.RunWhileLockedConfig
324
+ }
325
+
326
+ // NewWatcher returns a new Watcher.
327
+ func NewWatcher (cfg WatchConfig ) (* Watcher , error ) {
318
328
switch {
319
329
case cfg .Events == nil :
320
- return trace .BadParameter ("events is required" )
330
+ return nil , trace .BadParameter ("events is required" )
321
331
case cfg .Resolver == nil :
322
- return trace .BadParameter ("recording encryption resolver is required" )
332
+ return nil , trace .BadParameter ("recording encryption resolver is required" )
323
333
case cfg .ClusterConfig == nil :
324
- return trace .BadParameter ("cluster config backend is required" )
334
+ return nil , trace .BadParameter ("cluster config backend is required" )
335
+ case cfg .LockConfig == nil :
336
+ return nil , trace .BadParameter ("lock config is required" )
325
337
}
326
338
if cfg .Logger == nil {
327
339
cfg .Logger = slog .Default ()
328
340
}
329
341
330
- cfg .Logger .DebugContext (ctx , "creating recording_encryption watcher" )
331
- w , err := cfg .Events .NewWatcher (ctx , types.Watch {
342
+ return & Watcher {
343
+ events : cfg .Events ,
344
+ resolver : cfg .Resolver ,
345
+ clusterConfig : cfg .ClusterConfig ,
346
+ logger : cfg .Logger ,
347
+ lockConfig : cfg .LockConfig ,
348
+ }, nil
349
+ }
350
+
351
+ // Watch creates a watcher responsible for responding to changes in the RecordingEncryption resource.
352
+ // This is how auth servers cooperate and ensure there are accessible wrapped keys for each unique keystore
353
+ // configuration in a cluster.
354
+ func (w * Watcher ) Run (ctx context.Context ) error {
355
+ watch , err := w .events .NewWatcher (ctx , types.Watch {
332
356
Name : "recording_encryption_watcher" ,
333
357
Kinds : []types.WatchKind {
334
358
{
@@ -340,55 +364,51 @@ func Watch(ctx context.Context, cfg WatchConfig) error {
340
364
return trace .Wrap (err )
341
365
}
342
366
343
- go func () {
344
- for {
345
- select {
346
- case ev := <- w .Events ():
347
- if ev .Type != types .OpPut {
348
- continue
349
- }
350
- const retries = 3
351
- for tries := range retries {
352
- err := handleRecordingEncryptionChange (ctx , cfg )
353
- if err == nil {
354
- break
355
- }
356
-
357
- cfg .Logger .ErrorContext (ctx , "failed to handle session recording config change" , "error" , err , "remaining_tries" , retries - tries - 1 )
367
+ for {
368
+ select {
369
+ case ev := <- watch .Events ():
370
+ if ev .Type != types .OpPut {
371
+ continue
372
+ }
373
+ const retries = 3
374
+ for tries := range retries {
375
+ err := w .handleRecordingEncryptionChange (ctx )
376
+ if err == nil {
377
+ break
358
378
}
359
379
360
- case <- w .Done ():
361
- cfg .Logger .DebugContext (ctx , "no longer watching recording_encryption" )
362
- return
380
+ w .logger .ErrorContext (ctx , "failed to handle session recording config change" , "error" , err , "remaining_tries" , retries - tries - 1 )
381
+ <- time .After (retryutils .SeventhJitter (time .Second * 10 ))
363
382
}
364
- }
365
- }()
366
383
367
- return nil
384
+ case <- watch .Done ():
385
+ return trace .Wrap (watch .Error ())
386
+ }
387
+ }
368
388
}
369
389
370
390
// this helper handles reacting to individual Put events on the RecordingEncryption resource and updates the
371
391
// SessionRecordingConfig with the results, if necessary
372
- func handleRecordingEncryptionChange (ctx context.Context , cfg WatchConfig ) error {
373
- return trace .Wrap (backend .RunWhileLocked (ctx , cfg . LockConfig , func (ctx context.Context ) error {
374
- recConfig , err := cfg . ClusterConfig .GetSessionRecordingConfig (ctx )
392
+ func ( w * Watcher ) handleRecordingEncryptionChange (ctx context.Context ) error {
393
+ return trace .Wrap (backend .RunWhileLocked (ctx , * w . lockConfig , func (ctx context.Context ) error {
394
+ recConfig , err := w . clusterConfig .GetSessionRecordingConfig (ctx )
375
395
if err != nil {
376
396
return trace .Wrap (err , "fetching recording config" )
377
397
}
378
398
379
399
if ! recConfig .GetEncrypted () {
380
- cfg . Logger .DebugContext (ctx , "session recording encryption disabled, skip resolving keys" )
400
+ w . logger .DebugContext (ctx , "session recording encryption disabled, skip resolving keys" )
381
401
return nil
382
402
}
383
403
384
- encryption , err := cfg . Resolver .ResolveRecordingEncryption (ctx )
404
+ encryption , err := w . resolver .ResolveRecordingEncryption (ctx )
385
405
if err != nil {
386
- cfg . Logger .ErrorContext (ctx , "failed to resolve recording encryption state" , "error" , err )
406
+ w . logger .ErrorContext (ctx , "failed to resolve recording encryption state" , "error" , err )
387
407
return trace .Wrap (err , "resolving recording encryption" )
388
408
}
389
409
390
410
if recConfig .SetEncryptionKeys (GetAgeEncryptionKeys (encryption .GetSpec ().ActiveKeys )) {
391
- _ , err = cfg . ClusterConfig .UpdateSessionRecordingConfig (ctx , recConfig )
411
+ _ , err = w . clusterConfig .UpdateSessionRecordingConfig (ctx , recConfig )
392
412
return trace .Wrap (err , "updating encryption keys" )
393
413
}
394
414
0 commit comments