@@ -24,15 +24,17 @@ import (
24
24
"iter"
25
25
"log/slog"
26
26
"slices"
27
+ "time"
27
28
28
29
"filippo.io/age"
29
30
"github.com/gravitational/trace"
30
31
32
+ "github.com/gravitational/teleport"
31
33
recordingencryptionv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/recordingencryption/v1"
32
34
"github.com/gravitational/teleport/api/types"
35
+ "github.com/gravitational/teleport/api/utils/retryutils"
33
36
"github.com/gravitational/teleport/lib/backend"
34
37
"github.com/gravitational/teleport/lib/cryptosuites"
35
- "github.com/gravitational/teleport/lib/events"
36
38
"github.com/gravitational/teleport/lib/services"
37
39
)
38
40
@@ -51,10 +53,6 @@ type ManagerConfig struct {
51
53
52
54
// NewManager returns a new Manager using the given ManagerConfig.
53
55
func NewManager (cfg ManagerConfig ) (* Manager , error ) {
54
- if cfg .Logger == nil {
55
- cfg .Logger = slog .Default ()
56
- }
57
-
58
56
if cfg .Backend == nil {
59
57
return nil , trace .BadParameter ("backend is required" )
60
58
}
@@ -63,6 +61,10 @@ func NewManager(cfg ManagerConfig) (*Manager, error) {
63
61
return nil , trace .BadParameter ("key store is required" )
64
62
}
65
63
64
+ if cfg .Logger == nil {
65
+ cfg .Logger = slog .With (teleport .ComponentKey , "encryption-manager" )
66
+ }
67
+
66
68
return & Manager {
67
69
RecordingEncryption : cfg .Backend ,
68
70
keyStore : cfg .KeyStore ,
@@ -78,7 +80,6 @@ type Manager struct {
78
80
79
81
logger * slog.Logger
80
82
keyStore EncryptionKeyStore
81
- uploader events.MultipartUploader
82
83
}
83
84
84
85
// ensureActiveRecordingEncryption returns the configured RecordingEncryption resource if it exists with active keys. If it does not,
@@ -301,94 +302,127 @@ type RecordingEncryptionResolver interface {
301
302
ResolveRecordingEncryption (ctx context.Context ) (* recordingencryptionv1.RecordingEncryption , error )
302
303
}
303
304
304
- // WatchConfig captures required dependencies for building a RecordingEncyprtion watcher that
305
+ // WatchConfig captures required dependencies for building a RecordingEncryption watcher that
305
306
// automatically resolves state.
306
307
type WatchConfig struct {
307
308
Events types.Events
308
309
Resolver RecordingEncryptionResolver
309
310
ClusterConfig services.ClusterConfiguration
310
311
Logger * slog.Logger
311
- LockConfig backend.RunWhileLockedConfig
312
+ LockConfig * backend.RunWhileLockedConfig
312
313
}
313
314
314
- // Watch creates a watcher responsible for responding to changes in the RecordingEncryption
315
- // resource. This is how auth servers cooperate and ensure there are accessible wrapped keys for each unique
316
- // keystore configuration in a cluster.
317
- func Watch (ctx context.Context , cfg WatchConfig ) error {
315
+ // A Watcher watches for changes to the RecordingEncryption resource and resolves the state for the calling
316
+ // auth server.
317
+ type Watcher struct {
318
+ events types.Events
319
+ resolver RecordingEncryptionResolver
320
+ clusterConfig services.ClusterConfiguration
321
+ logger * slog.Logger
322
+ lockConfig * backend.RunWhileLockedConfig
323
+ }
324
+
325
+ // NewWatcher returns a new Watcher.
326
+ func NewWatcher (cfg WatchConfig ) (* Watcher , error ) {
318
327
switch {
319
328
case cfg .Events == nil :
320
- return trace .BadParameter ("events is required" )
329
+ return nil , trace .BadParameter ("events is required" )
321
330
case cfg .Resolver == nil :
322
- return trace .BadParameter ("recording encryption resolver is required" )
331
+ return nil , trace .BadParameter ("recording encryption resolver is required" )
323
332
case cfg .ClusterConfig == nil :
324
- return trace .BadParameter ("cluster config backend is required" )
333
+ return nil , trace .BadParameter ("cluster config backend is required" )
334
+ case cfg .LockConfig == nil :
335
+ return nil , trace .BadParameter ("lock config is required" )
325
336
}
326
337
if cfg .Logger == nil {
327
- cfg .Logger = slog .Default ( )
338
+ cfg .Logger = slog .With ( teleport . ComponentKey , "encryption-watcher" )
328
339
}
329
340
330
- cfg .Logger .DebugContext (ctx , "creating recording_encryption watcher" )
331
- w , err := cfg .Events .NewWatcher (ctx , types.Watch {
332
- Name : "recording_encryption_watcher" ,
333
- Kinds : []types.WatchKind {
334
- {
335
- Kind : types .KindRecordingEncryption ,
336
- },
337
- },
338
- })
339
- if err != nil {
340
- return trace .Wrap (err )
341
+ return & Watcher {
342
+ events : cfg .Events ,
343
+ resolver : cfg .Resolver ,
344
+ clusterConfig : cfg .ClusterConfig ,
345
+ logger : cfg .Logger ,
346
+ lockConfig : cfg .LockConfig ,
347
+ }, nil
348
+ }
349
+
350
+ // Watch creates a watcher responsible for responding to changes in the RecordingEncryption resource.
351
+ // This is how auth servers cooperate and ensure there are accessible wrapped keys for each unique keystore
352
+ // configuration in a cluster.
353
+ func (w * Watcher ) Run (ctx context.Context ) error {
354
+ wait := func () {
355
+ <- time .After (retryutils .SeventhJitter (time .Second * 5 ))
341
356
}
342
357
343
- go func () {
358
+ for {
359
+ watch , err := w .events .NewWatcher (ctx , types.Watch {
360
+ Name : "recording_encryption_watcher" ,
361
+ Kinds : []types.WatchKind {
362
+ {
363
+ Kind : types .KindRecordingEncryption ,
364
+ },
365
+ },
366
+ })
367
+ if err != nil {
368
+ w .logger .ErrorContext (ctx , "failed to create watcher, retrying" , "error" , err )
369
+ wait ()
370
+ }
371
+
372
+ HandleEvents:
344
373
for {
374
+ err := w .handleRecordingEncryptionChange (ctx )
375
+ if err != nil {
376
+ w .logger .ErrorContext (ctx , "failed to handle session recording config change" , "error" , err )
377
+ wait ()
378
+ continue
379
+
380
+ }
381
+
345
382
select {
346
- case ev := <- w .Events ():
383
+ case ev := <- watch .Events ():
347
384
if ev .Type != types .OpPut {
348
385
continue
349
386
}
350
- const retries = 3
351
- for tries := range retries {
352
- err := handleRecordingEncryptionChange (ctx , cfg )
353
- if err == nil {
354
- break
355
- }
356
-
357
- cfg .Logger .ErrorContext (ctx , "failed to handle session recording config change" , "error" , err , "remaining_tries" , retries - tries - 1 )
387
+ case <- watch .Done ():
388
+ if err := watch .Error (); err == nil || errors .Is (err , context .Canceled ) {
389
+ return nil
358
390
}
359
391
360
- case <- w .Done ():
361
- cfg .Logger .DebugContext (ctx , "no longer watching recording_encryption" )
362
- return
392
+ w .logger .ErrorContext (ctx , "watcher failed, retrying" , "error" , err )
393
+ wait ()
394
+ break HandleEvents
395
+ case <- ctx .Done ():
396
+ w .logger .InfoContext (ctx , "stopping encryption watcher" , "error" , err )
397
+ watch .Close ()
398
+ return nil
363
399
}
364
400
}
365
- }()
366
-
367
- return nil
401
+ }
368
402
}
369
403
370
404
// this helper handles reacting to individual Put events on the RecordingEncryption resource and updates the
371
405
// SessionRecordingConfig with the results, if necessary
372
- func handleRecordingEncryptionChange (ctx context.Context , cfg WatchConfig ) error {
373
- return trace .Wrap (backend .RunWhileLocked (ctx , cfg . LockConfig , func (ctx context.Context ) error {
374
- recConfig , err := cfg . ClusterConfig .GetSessionRecordingConfig (ctx )
406
+ func ( w * Watcher ) handleRecordingEncryptionChange (ctx context.Context ) error {
407
+ return trace .Wrap (backend .RunWhileLocked (ctx , * w . lockConfig , func (ctx context.Context ) error {
408
+ recConfig , err := w . clusterConfig .GetSessionRecordingConfig (ctx )
375
409
if err != nil {
376
410
return trace .Wrap (err , "fetching recording config" )
377
411
}
378
412
379
413
if ! recConfig .GetEncrypted () {
380
- cfg . Logger .DebugContext (ctx , "session recording encryption disabled, skip resolving keys" )
414
+ w . logger .DebugContext (ctx , "session recording encryption disabled, skip resolving keys" )
381
415
return nil
382
416
}
383
417
384
- encryption , err := cfg . Resolver .ResolveRecordingEncryption (ctx )
418
+ encryption , err := w . resolver .ResolveRecordingEncryption (ctx )
385
419
if err != nil {
386
- cfg . Logger .ErrorContext (ctx , "failed to resolve recording encryption state" , "error" , err )
420
+ w . logger .ErrorContext (ctx , "failed to resolve recording encryption state" , "error" , err )
387
421
return trace .Wrap (err , "resolving recording encryption" )
388
422
}
389
423
390
424
if recConfig .SetEncryptionKeys (GetAgeEncryptionKeys (encryption .GetSpec ().ActiveKeys )) {
391
- _ , err = cfg . ClusterConfig .UpdateSessionRecordingConfig (ctx , recConfig )
425
+ _ , err = w . clusterConfig .UpdateSessionRecordingConfig (ctx , recConfig )
392
426
return trace .Wrap (err , "updating encryption keys" )
393
427
}
394
428
0 commit comments