Skip to content

Commit c9cb026

Browse files
committed
splitting recording encryption watcher into its own file and fixing some comments and naming
1 parent 621135c commit c9cb026

File tree

2 files changed

+193
-163
lines changed

2 files changed

+193
-163
lines changed

lib/auth/recordingencryption/manager.go

Lines changed: 1 addition & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,15 @@ import (
2323
"crypto/sha256"
2424
"encoding/hex"
2525
"errors"
26-
"iter"
2726
"log/slog"
2827
"slices"
29-
"time"
3028

3129
"filippo.io/age"
3230
"github.com/gravitational/trace"
3331

3432
"github.com/gravitational/teleport"
3533
recordingencryptionv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/recordingencryption/v1"
3634
"github.com/gravitational/teleport/api/types"
37-
"github.com/gravitational/teleport/api/utils/retryutils"
3835
"github.com/gravitational/teleport/lib/backend"
3936
"github.com/gravitational/teleport/lib/cryptosuites"
4037
"github.com/gravitational/teleport/lib/services"
@@ -65,7 +62,7 @@ func NewManager(cfg ManagerConfig) (*Manager, error) {
6562
}
6663

6764
if cfg.Logger == nil {
68-
cfg.Logger = slog.With(teleport.ComponentKey, "encryption-manager")
65+
cfg.Logger = slog.With(teleport.ComponentKey, "recording-encryption-manager")
6966
}
7067

7168
return &Manager{
@@ -314,162 +311,3 @@ func (m *Manager) FindDecryptionKey(ctx context.Context, publicKeys ...[]byte) (
314311

315312
return nil, trace.NotFound("no accessible decryption key found")
316313
}
317-
318-
// GetAgeEncryptionKeys returns an iterator of AgeEncryptionKeys from a list of WrappedKeys. This is for use in
319-
// populating the EncryptionKeys field of SessionRecordingConfigStatus.
320-
func GetAgeEncryptionKeys(keys []*recordingencryptionv1.WrappedKey) iter.Seq[*types.AgeEncryptionKey] {
321-
return func(yield func(*types.AgeEncryptionKey) bool) {
322-
for _, key := range keys {
323-
if !yield(&types.AgeEncryptionKey{
324-
PublicKey: key.RecordingEncryptionPair.PublicKey,
325-
}) {
326-
return
327-
}
328-
}
329-
}
330-
}
331-
332-
// Resolver resolves RecordingEncryption state and passes the result to a postProcessFn callback to be called
333-
// before any locks are released.
334-
type Resolver interface {
335-
ResolveRecordingEncryption(ctx context.Context, postProcessFn func(context.Context, *recordingencryptionv1.RecordingEncryption) error) (*recordingencryptionv1.RecordingEncryption, error)
336-
}
337-
338-
// WatchConfig captures required dependencies for building a RecordingEncryption watcher that
339-
// automatically resolves state.
340-
type WatchConfig struct {
341-
Events types.Events
342-
Resolver Resolver
343-
ClusterConfig services.ClusterConfiguration
344-
Logger *slog.Logger
345-
}
346-
347-
// A Watcher watches for changes to the RecordingEncryption resource and resolves the state for the calling
348-
// auth server.
349-
type Watcher struct {
350-
events types.Events
351-
resolver Resolver
352-
clusterConfig services.ClusterConfiguration
353-
logger *slog.Logger
354-
}
355-
356-
// NewWatcher returns a new Watcher.
357-
func NewWatcher(cfg WatchConfig) (*Watcher, error) {
358-
switch {
359-
case cfg.Events == nil:
360-
return nil, trace.BadParameter("events is required")
361-
case cfg.Resolver == nil:
362-
return nil, trace.BadParameter("recording encryption resolver is required")
363-
case cfg.ClusterConfig == nil:
364-
return nil, trace.BadParameter("cluster config backend is required")
365-
}
366-
if cfg.Logger == nil {
367-
cfg.Logger = slog.With(teleport.ComponentKey, "encryption-watcher")
368-
}
369-
370-
return &Watcher{
371-
events: cfg.Events,
372-
resolver: cfg.Resolver,
373-
clusterConfig: cfg.ClusterConfig,
374-
logger: cfg.Logger,
375-
}, nil
376-
}
377-
378-
// Watch creates a watcher responsible for responding to changes in the RecordingEncryption resource.
379-
// This is how auth servers cooperate and ensure there are accessible wrapped keys for each unique keystore
380-
// configuration in a cluster.
381-
func (w *Watcher) Run(ctx context.Context) (err error) {
382-
// shouldRetryAfterJitterFn returns a bool specifiying whether or not execution should continue
383-
shouldRetryAfterJitterFn := func() bool {
384-
select {
385-
case <-time.After(retryutils.SeventhJitter(time.Second * 5)):
386-
return true
387-
case <-ctx.Done():
388-
return false
389-
}
390-
}
391-
392-
defer func() {
393-
w.logger.InfoContext(ctx, "stopping encryption watcher", "error", err)
394-
}()
395-
396-
for {
397-
watch, err := w.events.NewWatcher(ctx, types.Watch{
398-
Name: "recording_encryption_watcher",
399-
Kinds: []types.WatchKind{
400-
{
401-
Kind: types.KindRecordingEncryption,
402-
},
403-
},
404-
})
405-
if err != nil {
406-
w.logger.ErrorContext(ctx, "failed to create watcher, retrying", "error", err)
407-
if !shouldRetryAfterJitterFn() {
408-
return nil
409-
}
410-
continue
411-
}
412-
defer watch.Close()
413-
414-
HandleEvents:
415-
for {
416-
err := w.handleRecordingEncryptionChange(ctx)
417-
if err != nil {
418-
w.logger.ErrorContext(ctx, "failure while resolving recording encryption state", "error", err)
419-
if !shouldRetryAfterJitterFn() {
420-
return nil
421-
}
422-
continue
423-
424-
}
425-
426-
select {
427-
case ev := <-watch.Events():
428-
if ev.Type != types.OpPut {
429-
continue
430-
}
431-
case <-watch.Done():
432-
if err := watch.Error(); err == nil {
433-
return nil
434-
}
435-
436-
w.logger.ErrorContext(ctx, "watcher failed, retrying", "error", err)
437-
if !shouldRetryAfterJitterFn() {
438-
return nil
439-
}
440-
break HandleEvents
441-
case <-ctx.Done():
442-
return nil
443-
}
444-
}
445-
}
446-
}
447-
448-
// this helper handles reacting to individual Put events on the RecordingEncryption resource and updates the
449-
// SessionRecordingConfig with the results, if necessary
450-
func (w *Watcher) handleRecordingEncryptionChange(ctx context.Context) error {
451-
recConfig, err := w.clusterConfig.GetSessionRecordingConfig(ctx)
452-
if err != nil {
453-
return trace.Wrap(err, "fetching recording config")
454-
}
455-
456-
if !recConfig.GetEncrypted() {
457-
w.logger.DebugContext(ctx, "session recording encryption disabled, skip resolving keys")
458-
return nil
459-
}
460-
461-
_, err = w.resolver.ResolveRecordingEncryption(ctx, func(ctx context.Context, encryption *recordingencryptionv1.RecordingEncryption) error {
462-
if !recConfig.SetEncryptionKeys(GetAgeEncryptionKeys(encryption.GetSpec().ActiveKeys)) {
463-
return nil
464-
}
465-
466-
_, err = w.clusterConfig.UpdateSessionRecordingConfig(ctx, recConfig)
467-
return trace.Wrap(err, "updating encryption keys")
468-
})
469-
470-
if err != nil {
471-
return trace.Wrap(err, "resolving recording encryption")
472-
}
473-
474-
return nil
475-
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Teleport
2+
// Copyright (C) 2025 Gravitational, Inc.
3+
//
4+
// This program is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Affero General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// This program is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Affero General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Affero General Public License
15+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package recordingencryption
18+
19+
import (
20+
"context"
21+
"iter"
22+
"log/slog"
23+
"time"
24+
25+
"github.com/gravitational/trace"
26+
27+
"github.com/gravitational/teleport"
28+
recordingencryptionv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/recordingencryption/v1"
29+
"github.com/gravitational/teleport/api/types"
30+
"github.com/gravitational/teleport/api/utils/retryutils"
31+
"github.com/gravitational/teleport/lib/services"
32+
)
33+
34+
// Resolver resolves RecordingEncryption state and passes the result to a postProcessFn callback to be called
35+
// before any locks are released.
36+
type Resolver interface {
37+
ResolveRecordingEncryption(ctx context.Context, postProcessFn func(context.Context, *recordingencryptionv1.RecordingEncryption) error) (*recordingencryptionv1.RecordingEncryption, error)
38+
}
39+
40+
// WatchConfig captures required dependencies for building a RecordingEncryption watcher that
41+
// automatically resolves state.
42+
type WatchConfig struct {
43+
Events types.Events
44+
Resolver Resolver
45+
ClusterConfig services.ClusterConfiguration
46+
Logger *slog.Logger
47+
}
48+
49+
// A Watcher watches for changes to the RecordingEncryption resource and resolves the state for the calling
50+
// auth server.
51+
type Watcher struct {
52+
events types.Events
53+
resolver Resolver
54+
clusterConfig services.ClusterConfiguration
55+
logger *slog.Logger
56+
}
57+
58+
// NewWatcher returns a new Watcher.
59+
func NewWatcher(cfg WatchConfig) (*Watcher, error) {
60+
switch {
61+
case cfg.Events == nil:
62+
return nil, trace.BadParameter("events is required")
63+
case cfg.Resolver == nil:
64+
return nil, trace.BadParameter("recording encryption resolver is required")
65+
case cfg.ClusterConfig == nil:
66+
return nil, trace.BadParameter("cluster config backend is required")
67+
}
68+
if cfg.Logger == nil {
69+
cfg.Logger = slog.With(teleport.ComponentKey, "encryption-watcher")
70+
}
71+
72+
return &Watcher{
73+
events: cfg.Events,
74+
resolver: cfg.Resolver,
75+
clusterConfig: cfg.ClusterConfig,
76+
logger: cfg.Logger,
77+
}, nil
78+
}
79+
80+
// Watch creates a watcher responsible for responding to changes in the RecordingEncryption resource.
81+
// This is how auth servers cooperate and ensure there are accessible wrapped keys for each unique keystore
82+
// configuration in a cluster.
83+
func (w *Watcher) Run(ctx context.Context) (err error) {
84+
// shouldRetryAfterJitterFn waits at most 5 seconds and returns a bool specifying whether or not
85+
// execution should continue
86+
shouldRetryAfterJitterFn := func() bool {
87+
select {
88+
case <-time.After(retryutils.SeventhJitter(time.Second * 5)):
89+
return true
90+
case <-ctx.Done():
91+
return false
92+
}
93+
}
94+
95+
defer func() {
96+
w.logger.InfoContext(ctx, "stopping encryption watcher", "error", err)
97+
}()
98+
99+
for {
100+
watch, err := w.events.NewWatcher(ctx, types.Watch{
101+
Name: "recording_encryption_watcher",
102+
Kinds: []types.WatchKind{
103+
{
104+
Kind: types.KindRecordingEncryption,
105+
},
106+
},
107+
})
108+
if err != nil {
109+
w.logger.ErrorContext(ctx, "failed to create watcher, retrying", "error", err)
110+
if !shouldRetryAfterJitterFn() {
111+
return nil
112+
}
113+
continue
114+
}
115+
defer watch.Close()
116+
117+
HandleEvents:
118+
for {
119+
err := w.handleRecordingEncryptionChange(ctx)
120+
if err != nil {
121+
w.logger.ErrorContext(ctx, "failure while resolving recording encryption state", "error", err)
122+
if !shouldRetryAfterJitterFn() {
123+
return nil
124+
}
125+
continue
126+
127+
}
128+
129+
select {
130+
case ev := <-watch.Events():
131+
if ev.Type != types.OpPut {
132+
continue
133+
}
134+
case <-watch.Done():
135+
if err := watch.Error(); err == nil {
136+
return nil
137+
}
138+
139+
w.logger.ErrorContext(ctx, "watcher failed, retrying", "error", err)
140+
if !shouldRetryAfterJitterFn() {
141+
return nil
142+
}
143+
break HandleEvents
144+
case <-ctx.Done():
145+
return nil
146+
}
147+
}
148+
}
149+
}
150+
151+
// this helper handles reacting to individual Put events on the RecordingEncryption resource and updates the
152+
// SessionRecordingConfig with the results, if necessary
153+
func (w *Watcher) handleRecordingEncryptionChange(ctx context.Context) error {
154+
recConfig, err := w.clusterConfig.GetSessionRecordingConfig(ctx)
155+
if err != nil {
156+
return trace.Wrap(err, "fetching recording config")
157+
}
158+
159+
if !recConfig.GetEncrypted() {
160+
w.logger.DebugContext(ctx, "session recording encryption disabled, skip resolving keys")
161+
return nil
162+
}
163+
164+
_, err = w.resolver.ResolveRecordingEncryption(ctx, func(ctx context.Context, encryption *recordingencryptionv1.RecordingEncryption) error {
165+
if !recConfig.SetEncryptionKeys(getAgeEncryptionKeys(encryption.GetSpec().ActiveKeys)) {
166+
return nil
167+
}
168+
169+
_, err = w.clusterConfig.UpdateSessionRecordingConfig(ctx, recConfig)
170+
return trace.Wrap(err, "updating encryption keys")
171+
})
172+
173+
if err != nil {
174+
return trace.Wrap(err, "resolving recording encryption")
175+
}
176+
177+
return nil
178+
}
179+
180+
// getAgeEncryptionKeys returns an iterator of AgeEncryptionKeys from a list of WrappedKeys. This is for use in
181+
// populating the EncryptionKeys field of SessionRecordingConfigStatus.
182+
func getAgeEncryptionKeys(keys []*recordingencryptionv1.WrappedKey) iter.Seq[*types.AgeEncryptionKey] {
183+
return func(yield func(*types.AgeEncryptionKey) bool) {
184+
for _, key := range keys {
185+
if !yield(&types.AgeEncryptionKey{
186+
PublicKey: key.RecordingEncryptionPair.PublicKey,
187+
}) {
188+
return
189+
}
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)