Skip to content

Commit df2ca7c

Browse files
committed
switching watcher to a sync run function rather than spawning a new goroutine so we can execute as a service within the auth server
1 parent 0bed97e commit df2ca7c

File tree

2 files changed

+69
-47
lines changed

2 files changed

+69
-47
lines changed

lib/auth/recordingencryption/manager.go

Lines changed: 59 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,16 @@ import (
2424
"iter"
2525
"log/slog"
2626
"slices"
27+
"time"
2728

2829
"filippo.io/age"
2930
"github.com/gravitational/trace"
3031

3132
recordingencryptionv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/recordingencryption/v1"
3233
"github.com/gravitational/teleport/api/types"
34+
"github.com/gravitational/teleport/api/utils/retryutils"
3335
"github.com/gravitational/teleport/lib/backend"
3436
"github.com/gravitational/teleport/lib/cryptosuites"
35-
"github.com/gravitational/teleport/lib/events"
3637
"github.com/gravitational/teleport/lib/services"
3738
)
3839

@@ -78,7 +79,6 @@ type Manager struct {
7879

7980
logger *slog.Logger
8081
keyStore EncryptionKeyStore
81-
uploader events.MultipartUploader
8282
}
8383

8484
// ensureActiveRecordingEncryption returns the configured RecordingEncryption resource if it exists with active keys. If it does not,
@@ -301,34 +301,56 @@ type RecordingEncryptionResolver interface {
301301
ResolveRecordingEncryption(ctx context.Context) (*recordingencryptionv1.RecordingEncryption, error)
302302
}
303303

304-
// WatchConfig captures required dependencies for building a RecordingEncyprtion watcher that
304+
// WatchConfig captures required dependencies for building a RecordingEncryption watcher that
305305
// automatically resolves state.
306306
type WatchConfig struct {
307307
Events types.Events
308308
Resolver RecordingEncryptionResolver
309309
ClusterConfig services.ClusterConfiguration
310310
Logger *slog.Logger
311-
LockConfig backend.RunWhileLockedConfig
311+
LockConfig *backend.RunWhileLockedConfig
312312
}
313313

314-
// Watch creates a watcher responsible for responding to changes in the RecordingEncryption
315-
// resource. This is how auth servers cooperate and ensure there are accessible wrapped keys for each unique
316-
// keystore configuration in a cluster.
317-
func Watch(ctx context.Context, cfg WatchConfig) error {
314+
// A Watcher watches for changes to the RecordingEncryption resource and resolves the state for the calling
315+
// auth server.
316+
type Watcher struct {
317+
events types.Events
318+
resolver RecordingEncryptionResolver
319+
clusterConfig services.ClusterConfiguration
320+
logger *slog.Logger
321+
lockConfig *backend.RunWhileLockedConfig
322+
}
323+
324+
// NewWatcher returns a new Watcher.
325+
func NewWatcher(cfg WatchConfig) (*Watcher, error) {
318326
switch {
319327
case cfg.Events == nil:
320-
return trace.BadParameter("events is required")
328+
return nil, trace.BadParameter("events is required")
321329
case cfg.Resolver == nil:
322-
return trace.BadParameter("recording encryption resolver is required")
330+
return nil, trace.BadParameter("recording encryption resolver is required")
323331
case cfg.ClusterConfig == nil:
324-
return trace.BadParameter("cluster config backend is required")
332+
return nil, trace.BadParameter("cluster config backend is required")
333+
case cfg.LockConfig == nil:
334+
return nil, trace.BadParameter("lock config is required")
325335
}
326336
if cfg.Logger == nil {
327337
cfg.Logger = slog.Default()
328338
}
329339

330-
cfg.Logger.DebugContext(ctx, "creating recording_encryption watcher")
331-
w, err := cfg.Events.NewWatcher(ctx, types.Watch{
340+
return &Watcher{
341+
events: cfg.Events,
342+
resolver: cfg.Resolver,
343+
clusterConfig: cfg.ClusterConfig,
344+
logger: cfg.Logger,
345+
lockConfig: cfg.LockConfig,
346+
}, nil
347+
}
348+
349+
// Watch creates a watcher responsible for responding to changes in the RecordingEncryption resource.
350+
// This is how auth servers cooperate and ensure there are accessible wrapped keys for each unique keystore
351+
// configuration in a cluster.
352+
func (w *Watcher) Run(ctx context.Context) error {
353+
watch, err := w.events.NewWatcher(ctx, types.Watch{
332354
Name: "recording_encryption_watcher",
333355
Kinds: []types.WatchKind{
334356
{
@@ -340,55 +362,51 @@ func Watch(ctx context.Context, cfg WatchConfig) error {
340362
return trace.Wrap(err)
341363
}
342364

343-
go func() {
344-
for {
345-
select {
346-
case ev := <-w.Events():
347-
if ev.Type != types.OpPut {
348-
continue
349-
}
350-
const retries = 3
351-
for tries := range retries {
352-
err := handleRecordingEncryptionChange(ctx, cfg)
353-
if err == nil {
354-
break
355-
}
356-
357-
cfg.Logger.ErrorContext(ctx, "failed to handle session recording config change", "error", err, "remaining_tries", retries-tries-1)
365+
for {
366+
select {
367+
case ev := <-watch.Events():
368+
if ev.Type != types.OpPut {
369+
continue
370+
}
371+
const retries = 3
372+
for tries := range retries {
373+
err := w.handleRecordingEncryptionChange(ctx)
374+
if err == nil {
375+
break
358376
}
359377

360-
case <-w.Done():
361-
cfg.Logger.DebugContext(ctx, "no longer watching recording_encryption")
362-
return
378+
w.logger.ErrorContext(ctx, "failed to handle session recording config change", "error", err, "remaining_tries", retries-tries-1)
379+
<-time.After(retryutils.SeventhJitter(time.Second * 10))
363380
}
364-
}
365-
}()
366381

367-
return nil
382+
case <-watch.Done():
383+
return trace.Wrap(watch.Error())
384+
}
385+
}
368386
}
369387

370388
// this helper handles reacting to individual Put events on the RecordingEncryption resource and updates the
371389
// SessionRecordingConfig with the results, if necessary
372-
func handleRecordingEncryptionChange(ctx context.Context, cfg WatchConfig) error {
373-
return trace.Wrap(backend.RunWhileLocked(ctx, cfg.LockConfig, func(ctx context.Context) error {
374-
recConfig, err := cfg.ClusterConfig.GetSessionRecordingConfig(ctx)
390+
func (w *Watcher) handleRecordingEncryptionChange(ctx context.Context) error {
391+
return trace.Wrap(backend.RunWhileLocked(ctx, *w.lockConfig, func(ctx context.Context) error {
392+
recConfig, err := w.clusterConfig.GetSessionRecordingConfig(ctx)
375393
if err != nil {
376394
return trace.Wrap(err, "fetching recording config")
377395
}
378396

379397
if !recConfig.GetEncrypted() {
380-
cfg.Logger.DebugContext(ctx, "session recording encryption disabled, skip resolving keys")
398+
w.logger.DebugContext(ctx, "session recording encryption disabled, skip resolving keys")
381399
return nil
382400
}
383401

384-
encryption, err := cfg.Resolver.ResolveRecordingEncryption(ctx)
402+
encryption, err := w.resolver.ResolveRecordingEncryption(ctx)
385403
if err != nil {
386-
cfg.Logger.ErrorContext(ctx, "failed to resolve recording encryption state", "error", err)
404+
w.logger.ErrorContext(ctx, "failed to resolve recording encryption state", "error", err)
387405
return trace.Wrap(err, "resolving recording encryption")
388406
}
389407

390408
if recConfig.SetEncryptionKeys(GetAgeEncryptionKeys(encryption.GetSpec().ActiveKeys)) {
391-
_, err = cfg.ClusterConfig.UpdateSessionRecordingConfig(ctx, recConfig)
409+
_, err = w.clusterConfig.UpdateSessionRecordingConfig(ctx, recConfig)
392410
return trace.Wrap(err, "updating encryption keys")
393411
}
394412

lib/auth/recordingencryption/manager_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package recordingencryption_test
1919
import (
2020
"context"
2121
"crypto"
22-
"crypto/rand"
2322
"crypto/rsa"
2423
"crypto/x509"
2524
"encoding/pem"
@@ -58,11 +57,16 @@ type fakeEncryptionKeyStore struct {
5857
}
5958

6059
func (f *fakeEncryptionKeyStore) NewEncryptionKeyPair(ctx context.Context, purpose cryptosuites.KeyPurpose) (*types.EncryptionKeyPair, error) {
61-
private, err := rsa.GenerateKey(rand.Reader, 2048)
60+
decrypter, err := cryptosuites.GenerateDecrypterWithAlgorithm(cryptosuites.RSA2048)
6261
if err != nil {
6362
return nil, err
6463
}
6564

65+
private, ok := decrypter.(*rsa.PrivateKey)
66+
if !ok {
67+
return nil, errors.New("expected RSA private key")
68+
}
69+
6670
privatePEM := pem.EncodeToMemory(&pem.Block{
6771
Type: keys.PKCS1PrivateKeyType,
6872
Bytes: x509.MarshalPKCS1PrivateKey(private),
@@ -139,7 +143,7 @@ func TestResolveRecordingEncryption(t *testing.T) {
139143
require.NoError(t, err)
140144
activeKeys := encryption.GetSpec().GetActiveKeys()
141145

142-
require.Equal(t, 1, len(activeKeys))
146+
require.Len(t, activeKeys, 1)
143147
firstKey := activeKeys[0]
144148

145149
// should generate a wrapped key with the initial recording encryption pair
@@ -151,7 +155,7 @@ func TestResolveRecordingEncryption(t *testing.T) {
151155
require.NoError(t, err)
152156

153157
activeKeys = encryption.GetSpec().ActiveKeys
154-
require.Equal(t, 2, len(activeKeys))
158+
require.Len(t, activeKeys, 2)
155159
for _, key := range activeKeys {
156160
require.NotNil(t, key.KeyEncryptionPair)
157161
if key.KeyEncryptionPair.PrivateKeyType == serviceAType {
@@ -165,7 +169,7 @@ func TestResolveRecordingEncryption(t *testing.T) {
165169
encryption, err = serviceB.ResolveRecordingEncryption(ctx)
166170
require.NoError(t, err)
167171
activeKeys = encryption.GetSpec().ActiveKeys
168-
require.Equal(t, 2, len(activeKeys))
172+
require.Len(t, activeKeys, 2)
169173
for _, key := range activeKeys {
170174
require.NotNil(t, key.KeyEncryptionPair)
171175
if key.KeyEncryptionPair.PrivateKeyType == serviceAType {
@@ -179,7 +183,7 @@ func TestResolveRecordingEncryption(t *testing.T) {
179183
encryption, err = serviceA.ResolveRecordingEncryption(ctx)
180184
require.NoError(t, err)
181185
activeKeys = encryption.GetSpec().ActiveKeys
182-
require.Equal(t, 2, len(activeKeys))
186+
require.Len(t, activeKeys, 2)
183187
for _, key := range activeKeys {
184188
require.NotNil(t, key.KeyEncryptionPair)
185189
require.NotNil(t, key.RecordingEncryptionPair)

0 commit comments

Comments
 (0)