Skip to content

Commit 2aff1c8

Browse files
committed
switching watcher to a sync run function rather than spawning a new goroutine so we can execute as a service within the auth server
1 parent 127801a commit 2aff1c8

File tree

3 files changed

+98
-57
lines changed

3 files changed

+98
-57
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
cloud.google.com/go/storage v1.52.0
1616
code.dny.dev/ssrf v0.2.0
1717
connectrpc.com/connect v1.18.1
18+
filippo.io/age v1.2.1
1819
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.0
1920
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.9.0
2021
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/authorization/armauthorization/v2 v2.2.0
@@ -272,7 +273,6 @@ require (
272273
cloud.google.com/go/monitoring v1.24.1 // indirect
273274
cloud.google.com/go/pubsub v1.47.0 // indirect
274275
dario.cat/mergo v1.0.1 // indirect
275-
filippo.io/age v1.2.1
276276
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
277277
github.com/99designs/keyring v1.2.2 // indirect
278278
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect

lib/auth/recordingencryption/manager.go

Lines changed: 87 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ import (
2424
"iter"
2525
"log/slog"
2626
"slices"
27+
"time"
2728

2829
"filippo.io/age"
2930
"github.com/gravitational/trace"
3031

32+
"github.com/gravitational/teleport"
3133
recordingencryptionv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/recordingencryption/v1"
3234
"github.com/gravitational/teleport/api/types"
35+
"github.com/gravitational/teleport/api/utils/retryutils"
3336
"github.com/gravitational/teleport/lib/backend"
3437
"github.com/gravitational/teleport/lib/cryptosuites"
35-
"github.com/gravitational/teleport/lib/events"
3638
"github.com/gravitational/teleport/lib/services"
3739
)
3840

@@ -51,10 +53,6 @@ type ManagerConfig struct {
5153

5254
// NewManager returns a new Manager using the given ManagerConfig.
5355
func NewManager(cfg ManagerConfig) (*Manager, error) {
54-
if cfg.Logger == nil {
55-
cfg.Logger = slog.Default()
56-
}
57-
5856
if cfg.Backend == nil {
5957
return nil, trace.BadParameter("backend is required")
6058
}
@@ -63,6 +61,10 @@ func NewManager(cfg ManagerConfig) (*Manager, error) {
6361
return nil, trace.BadParameter("key store is required")
6462
}
6563

64+
if cfg.Logger == nil {
65+
cfg.Logger = slog.With(teleport.ComponentKey, "encryption-manager")
66+
}
67+
6668
return &Manager{
6769
RecordingEncryption: cfg.Backend,
6870
keyStore: cfg.KeyStore,
@@ -78,7 +80,6 @@ type Manager struct {
7880

7981
logger *slog.Logger
8082
keyStore EncryptionKeyStore
81-
uploader events.MultipartUploader
8283
}
8384

8485
// ensureActiveRecordingEncryption returns the configured RecordingEncryption resource if it exists with active keys. If it does not,
@@ -301,94 +302,130 @@ type RecordingEncryptionResolver interface {
301302
ResolveRecordingEncryption(ctx context.Context) (*recordingencryptionv1.RecordingEncryption, error)
302303
}
303304

304-
// WatchConfig captures required dependencies for building a RecordingEncyprtion watcher that
305+
// WatchConfig captures required dependencies for building a RecordingEncryption watcher that
305306
// automatically resolves state.
306307
type WatchConfig struct {
307308
Events types.Events
308309
Resolver RecordingEncryptionResolver
309310
ClusterConfig services.ClusterConfiguration
310311
Logger *slog.Logger
311-
LockConfig backend.RunWhileLockedConfig
312+
LockConfig *backend.RunWhileLockedConfig
313+
}
314+
315+
// A Watcher watches for changes to the RecordingEncryption resource and resolves the state for the calling
316+
// auth server.
317+
type Watcher struct {
318+
events types.Events
319+
resolver RecordingEncryptionResolver
320+
clusterConfig services.ClusterConfiguration
321+
logger *slog.Logger
322+
lockConfig *backend.RunWhileLockedConfig
312323
}
313324

314-
// Watch creates a watcher responsible for responding to changes in the RecordingEncryption
315-
// resource. This is how auth servers cooperate and ensure there are accessible wrapped keys for each unique
316-
// keystore configuration in a cluster.
317-
func Watch(ctx context.Context, cfg WatchConfig) error {
325+
// NewWatcher returns a new Watcher.
326+
func NewWatcher(cfg WatchConfig) (*Watcher, error) {
318327
switch {
319328
case cfg.Events == nil:
320-
return trace.BadParameter("events is required")
329+
return nil, trace.BadParameter("events is required")
321330
case cfg.Resolver == nil:
322-
return trace.BadParameter("recording encryption resolver is required")
331+
return nil, trace.BadParameter("recording encryption resolver is required")
323332
case cfg.ClusterConfig == nil:
324-
return trace.BadParameter("cluster config backend is required")
333+
return nil, trace.BadParameter("cluster config backend is required")
334+
case cfg.LockConfig == nil:
335+
return nil, trace.BadParameter("lock config is required")
325336
}
326337
if cfg.Logger == nil {
327-
cfg.Logger = slog.Default()
338+
cfg.Logger = slog.With(teleport.ComponentKey, "encryption-watcher")
328339
}
329340

330-
cfg.Logger.DebugContext(ctx, "creating recording_encryption watcher")
331-
w, err := cfg.Events.NewWatcher(ctx, types.Watch{
332-
Name: "recording_encryption_watcher",
333-
Kinds: []types.WatchKind{
334-
{
335-
Kind: types.KindRecordingEncryption,
336-
},
337-
},
338-
})
339-
if err != nil {
340-
return trace.Wrap(err)
341+
return &Watcher{
342+
events: cfg.Events,
343+
resolver: cfg.Resolver,
344+
clusterConfig: cfg.ClusterConfig,
345+
logger: cfg.Logger,
346+
lockConfig: cfg.LockConfig,
347+
}, nil
348+
}
349+
350+
// Watch creates a watcher responsible for responding to changes in the RecordingEncryption resource.
351+
// This is how auth servers cooperate and ensure there are accessible wrapped keys for each unique keystore
352+
// configuration in a cluster.
353+
func (w *Watcher) Run(ctx context.Context) (err error) {
354+
jitter := func() {
355+
<-time.After(retryutils.SeventhJitter(time.Second * 5))
341356
}
342357

343-
go func() {
358+
defer func() {
359+
w.logger.InfoContext(ctx, "stopping encryption watcher", "error", err)
360+
}()
361+
362+
for {
363+
watch, err := w.events.NewWatcher(ctx, types.Watch{
364+
Name: "recording_encryption_watcher",
365+
Kinds: []types.WatchKind{
366+
{
367+
Kind: types.KindRecordingEncryption,
368+
},
369+
},
370+
})
371+
if err != nil {
372+
w.logger.ErrorContext(ctx, "failed to create watcher, retrying", "error", err)
373+
jitter()
374+
}
375+
376+
HandleEvents:
344377
for {
378+
err := w.handleRecordingEncryptionChange(ctx)
379+
if err != nil {
380+
w.logger.ErrorContext(ctx, "failed to handle session recording config change", "error", err)
381+
jitter()
382+
continue
383+
384+
}
385+
345386
select {
346-
case ev := <-w.Events():
387+
case ev := <-watch.Events():
347388
if ev.Type != types.OpPut {
348389
continue
349390
}
350-
const retries = 3
351-
for tries := range retries {
352-
err := handleRecordingEncryptionChange(ctx, cfg)
353-
if err == nil {
354-
break
355-
}
356-
357-
cfg.Logger.ErrorContext(ctx, "failed to handle session recording config change", "error", err, "remaining_tries", retries-tries-1)
391+
case <-watch.Done():
392+
if err := watch.Error(); err == nil {
393+
return nil
358394
}
359395

360-
case <-w.Done():
361-
cfg.Logger.DebugContext(ctx, "no longer watching recording_encryption")
362-
return
396+
w.logger.ErrorContext(ctx, "watcher failed, retrying", "error", err)
397+
jitter()
398+
break HandleEvents
399+
case <-ctx.Done():
400+
watch.Close()
401+
return ctx.Err()
363402
}
364403
}
365-
}()
366-
367-
return nil
404+
}
368405
}
369406

370407
// this helper handles reacting to individual Put events on the RecordingEncryption resource and updates the
371408
// SessionRecordingConfig with the results, if necessary
372-
func handleRecordingEncryptionChange(ctx context.Context, cfg WatchConfig) error {
373-
return trace.Wrap(backend.RunWhileLocked(ctx, cfg.LockConfig, func(ctx context.Context) error {
374-
recConfig, err := cfg.ClusterConfig.GetSessionRecordingConfig(ctx)
409+
func (w *Watcher) handleRecordingEncryptionChange(ctx context.Context) error {
410+
return trace.Wrap(backend.RunWhileLocked(ctx, *w.lockConfig, func(ctx context.Context) error {
411+
recConfig, err := w.clusterConfig.GetSessionRecordingConfig(ctx)
375412
if err != nil {
376413
return trace.Wrap(err, "fetching recording config")
377414
}
378415

379416
if !recConfig.GetEncrypted() {
380-
cfg.Logger.DebugContext(ctx, "session recording encryption disabled, skip resolving keys")
417+
w.logger.DebugContext(ctx, "session recording encryption disabled, skip resolving keys")
381418
return nil
382419
}
383420

384-
encryption, err := cfg.Resolver.ResolveRecordingEncryption(ctx)
421+
encryption, err := w.resolver.ResolveRecordingEncryption(ctx)
385422
if err != nil {
386-
cfg.Logger.ErrorContext(ctx, "failed to resolve recording encryption state", "error", err)
423+
w.logger.ErrorContext(ctx, "failed to resolve recording encryption state", "error", err)
387424
return trace.Wrap(err, "resolving recording encryption")
388425
}
389426

390427
if recConfig.SetEncryptionKeys(GetAgeEncryptionKeys(encryption.GetSpec().ActiveKeys)) {
391-
_, err = cfg.ClusterConfig.UpdateSessionRecordingConfig(ctx, recConfig)
428+
_, err = w.clusterConfig.UpdateSessionRecordingConfig(ctx, recConfig)
392429
return trace.Wrap(err, "updating encryption keys")
393430
}
394431

lib/auth/recordingencryption/manager_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package recordingencryption_test
1919
import (
2020
"context"
2121
"crypto"
22-
"crypto/rand"
2322
"crypto/rsa"
2423
"crypto/x509"
2524
"encoding/pem"
@@ -58,11 +57,16 @@ type fakeEncryptionKeyStore struct {
5857
}
5958

6059
func (f *fakeEncryptionKeyStore) NewEncryptionKeyPair(ctx context.Context, purpose cryptosuites.KeyPurpose) (*types.EncryptionKeyPair, error) {
61-
private, err := rsa.GenerateKey(rand.Reader, 2048)
60+
decrypter, err := cryptosuites.GenerateDecrypterWithAlgorithm(cryptosuites.RSA2048)
6261
if err != nil {
6362
return nil, err
6463
}
6564

65+
private, ok := decrypter.(*rsa.PrivateKey)
66+
if !ok {
67+
return nil, errors.New("expected RSA private key")
68+
}
69+
6670
privatePEM := pem.EncodeToMemory(&pem.Block{
6771
Type: keys.PKCS1PrivateKeyType,
6872
Bytes: x509.MarshalPKCS1PrivateKey(private),
@@ -139,7 +143,7 @@ func TestResolveRecordingEncryption(t *testing.T) {
139143
require.NoError(t, err)
140144
activeKeys := encryption.GetSpec().GetActiveKeys()
141145

142-
require.Equal(t, 1, len(activeKeys))
146+
require.Len(t, activeKeys, 1)
143147
firstKey := activeKeys[0]
144148

145149
// should generate a wrapped key with the initial recording encryption pair
@@ -151,7 +155,7 @@ func TestResolveRecordingEncryption(t *testing.T) {
151155
require.NoError(t, err)
152156

153157
activeKeys = encryption.GetSpec().ActiveKeys
154-
require.Equal(t, 2, len(activeKeys))
158+
require.Len(t, activeKeys, 2)
155159
for _, key := range activeKeys {
156160
require.NotNil(t, key.KeyEncryptionPair)
157161
if key.KeyEncryptionPair.PrivateKeyType == serviceAType {
@@ -165,7 +169,7 @@ func TestResolveRecordingEncryption(t *testing.T) {
165169
encryption, err = serviceB.ResolveRecordingEncryption(ctx)
166170
require.NoError(t, err)
167171
activeKeys = encryption.GetSpec().ActiveKeys
168-
require.Equal(t, 2, len(activeKeys))
172+
require.Len(t, activeKeys, 2)
169173
for _, key := range activeKeys {
170174
require.NotNil(t, key.KeyEncryptionPair)
171175
if key.KeyEncryptionPair.PrivateKeyType == serviceAType {
@@ -179,7 +183,7 @@ func TestResolveRecordingEncryption(t *testing.T) {
179183
encryption, err = serviceA.ResolveRecordingEncryption(ctx)
180184
require.NoError(t, err)
181185
activeKeys = encryption.GetSpec().ActiveKeys
182-
require.Equal(t, 2, len(activeKeys))
186+
require.Len(t, activeKeys, 2)
183187
for _, key := range activeKeys {
184188
require.NotNil(t, key.KeyEncryptionPair)
185189
require.NotNil(t, key.RecordingEncryptionPair)

0 commit comments

Comments
 (0)