Skip to content

Commit d02c350

Browse files
committed
adding watcher directly to manager and listening for SRC events
1 parent f956674 commit d02c350

File tree

2 files changed

+107
-141
lines changed

2 files changed

+107
-141
lines changed

lib/auth/recordingencryption/manager.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ import (
2626
"iter"
2727
"log/slog"
2828
"slices"
29+
"time"
2930

3031
"filippo.io/age"
3132
"github.com/gravitational/trace"
3233

3334
"github.com/gravitational/teleport"
3435
recordingencryptionv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/recordingencryption/v1"
3536
"github.com/gravitational/teleport/api/types"
37+
"github.com/gravitational/teleport/api/utils/retryutils"
3638
"github.com/gravitational/teleport/lib/backend"
3739
"github.com/gravitational/teleport/lib/cryptosuites"
3840
"github.com/gravitational/teleport/lib/services"
@@ -335,6 +337,111 @@ func (m *Manager) FindDecryptionKey(ctx context.Context, publicKeys ...[]byte) (
335337
return nil, trace.NotFound("no accessible decryption key found")
336338
}
337339

340+
func (m *Manager) Watch(ctx context.Context, events types.Events) (err error) {
341+
// shouldRetryAfterJitterFn waits at most 5 seconds and returns a bool specifying whether or not
342+
// execution should continue
343+
shouldRetryAfterJitterFn := func() bool {
344+
select {
345+
case <-time.After(retryutils.SeventhJitter(time.Second * 5)):
346+
return true
347+
case <-ctx.Done():
348+
return false
349+
}
350+
}
351+
352+
defer func() {
353+
m.logger.InfoContext(ctx, "stopping encryption watcher", "error", err)
354+
}()
355+
356+
for {
357+
watch, err := events.NewWatcher(ctx, types.Watch{
358+
Name: "recording_encryption_watcher",
359+
Kinds: []types.WatchKind{
360+
{
361+
Kind: types.KindRecordingEncryption,
362+
},
363+
{
364+
Kind: types.KindSessionRecordingConfig,
365+
},
366+
},
367+
})
368+
if err != nil {
369+
m.logger.ErrorContext(ctx, "failed to create watcher, retrying", "error", err)
370+
if !shouldRetryAfterJitterFn() {
371+
return nil
372+
}
373+
continue
374+
}
375+
defer watch.Close()
376+
377+
HandleEvents:
378+
for {
379+
select {
380+
case ev := <-watch.Events():
381+
if err := m.handleEvent(ctx, ev, shouldRetryAfterJitterFn); err != nil {
382+
m.logger.ErrorContext(ctx, "failure handling recording encryption event", "kind", ev.Resource.GetKind(), "error", err)
383+
}
384+
case <-watch.Done():
385+
if err := watch.Error(); err == nil {
386+
return nil
387+
}
388+
389+
m.logger.ErrorContext(ctx, "watcher failed, retrying", "error", err)
390+
if !shouldRetryAfterJitterFn() {
391+
return nil
392+
}
393+
break HandleEvents
394+
case <-ctx.Done():
395+
return nil
396+
}
397+
398+
}
399+
}
400+
}
401+
402+
func (m *Manager) handleEvent(ctx context.Context, ev types.Event, shouldRetryFn func() bool) error {
403+
if ev.Type != types.OpPut {
404+
return nil
405+
}
406+
407+
kind := ev.Resource.GetKind()
408+
for {
409+
switch kind {
410+
case types.KindRecordingEncryption:
411+
if _, err := m.ResolveRecordingEncryption(ctx); err != nil {
412+
m.logger.ErrorContext(ctx, "failed to resolve recording encryption keys, retrying", "error", err)
413+
if shouldRetryFn() {
414+
continue
415+
}
416+
417+
return trace.Wrap(err)
418+
}
419+
case types.KindSessionRecordingConfig:
420+
previousConfig := m.sessionRecordingConfig
421+
var err error
422+
m.sessionRecordingConfig, err = m.GetSessionRecordingConfig(ctx)
423+
if err != nil {
424+
m.logger.ErrorContext(ctx, "failed to fetch updated session_recording_config", "error", err)
425+
if shouldRetryFn() {
426+
continue
427+
}
428+
429+
return trace.Wrap(err)
430+
}
431+
432+
if m.sessionRecordingConfig.GetEncrypted() && (previousConfig == nil || !previousConfig.GetEncrypted()) {
433+
434+
// restart the loop and resolve recording encryption if
435+
// encryption was just enabled
436+
kind = types.KindRecordingEncryption
437+
continue
438+
}
439+
}
440+
441+
return nil
442+
}
443+
}
444+
338445
// getAgeEncryptionKeys returns an iterator of AgeEncryptionKeys from a list of WrappedKeys. This is for use in
339446
// populating the EncryptionKeys field of SessionRecordingConfigStatus.
340447
func getAgeEncryptionKeys(keys []*recordingencryptionv1.WrappedKey) iter.Seq[*types.AgeEncryptionKey] {

lib/auth/recordingencryption/watcher.go

Lines changed: 0 additions & 141 deletions
This file was deleted.

0 commit comments

Comments
 (0)