Skip to content

Commit f0f55a6

Browse files
committed
adding async recording encryption with gRPC multipart uploader
1 parent 9e5a6c6 commit f0f55a6

File tree

17 files changed

+423
-39
lines changed

17 files changed

+423
-39
lines changed

api/client/client.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"errors"
2525
"fmt"
2626
"io"
27+
"iter"
2728
"log/slog"
2829
"net"
2930
"slices"
@@ -88,6 +89,7 @@ import (
8889
oktapb "github.com/gravitational/teleport/api/gen/proto/go/teleport/okta/v1"
8990
pluginspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/plugins/v1"
9091
presencepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/presence/v1"
92+
recordingencryptionv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/recordingencryption/v1"
9193
resourceusagepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/resourceusage/v1"
9294
samlidppb "github.com/gravitational/teleport/api/gen/proto/go/teleport/samlidp/v1"
9395
secreportsv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/secreports/v1"
@@ -125,6 +127,7 @@ type AuthServiceClient struct {
125127
auditlogpb.AuditLogServiceClient
126128
userpreferencespb.UserPreferencesServiceClient
127129
notificationsv1pb.NotificationServiceClient
130+
recordingencryptionv1pb.RecordingEncryptionServiceClient
128131
}
129132

130133
// Client is a gRPC Client that connects to a Teleport Auth server either
@@ -531,10 +534,11 @@ func (c *Client) dialGRPC(ctx context.Context, addr string) error {
531534

532535
c.conn = conn
533536
c.grpc = AuthServiceClient{
534-
AuthServiceClient: proto.NewAuthServiceClient(c.conn),
535-
AuditLogServiceClient: auditlogpb.NewAuditLogServiceClient(c.conn),
536-
UserPreferencesServiceClient: userpreferencespb.NewUserPreferencesServiceClient(c.conn),
537-
NotificationServiceClient: notificationsv1pb.NewNotificationServiceClient(c.conn),
537+
AuthServiceClient: proto.NewAuthServiceClient(c.conn),
538+
AuditLogServiceClient: auditlogpb.NewAuditLogServiceClient(c.conn),
539+
UserPreferencesServiceClient: userpreferencespb.NewUserPreferencesServiceClient(c.conn),
540+
NotificationServiceClient: notificationsv1pb.NewNotificationServiceClient(c.conn),
541+
RecordingEncryptionServiceClient: recordingencryptionv1pb.NewRecordingEncryptionServiceClient(c.conn),
538542
}
539543
c.JoinServiceClient = NewJoinServiceClient(proto.NewJoinServiceClient(c.conn))
540544

@@ -2454,6 +2458,47 @@ func (c *Client) StreamSessionEvents(ctx context.Context, sessionID string, star
24542458
return ch, e
24552459
}
24562460

2461+
// UploadEncryptedRecording streams encrypted recording parts to the auth
2462+
// server to be saved in long term storage.
2463+
func (c *Client) UploadEncryptedRecording(ctx context.Context, sessionID string, parts iter.Seq2[[]byte, error]) error {
2464+
createRes, err := c.grpc.CreateUpload(ctx, &recordingencryptionv1pb.CreateUploadRequest{
2465+
SessionId: sessionID,
2466+
})
2467+
if err != nil {
2468+
return trace.Wrap(err)
2469+
}
2470+
2471+
var uploadedParts []*recordingencryptionv1pb.UploadPartResponse
2472+
var partNumber int64
2473+
for part, err := range parts {
2474+
defer func() {
2475+
partNumber++
2476+
}()
2477+
if err != nil {
2478+
return trace.Wrap(err)
2479+
}
2480+
2481+
uploadRes, err := c.grpc.UploadPart(ctx, &recordingencryptionv1pb.UploadPartRequest{
2482+
Upload: createRes.Upload,
2483+
PartNumber: partNumber,
2484+
Part: part,
2485+
})
2486+
if err != nil {
2487+
return trace.Wrap(err)
2488+
}
2489+
uploadedParts = append(uploadedParts, uploadRes)
2490+
}
2491+
2492+
if _, err := c.grpc.CompleteUpload(ctx, &recordingencryptionv1pb.CompleteUploadRequest{
2493+
Upload: createRes.Upload,
2494+
Parts: uploadedParts,
2495+
}); err != nil {
2496+
return trace.Wrap(err)
2497+
}
2498+
2499+
return nil
2500+
}
2501+
24572502
// SearchEvents allows searching for events with a full pagination support.
24582503
func (c *Client) SearchEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]events.AuditEvent, string, error) {
24592504
request := &proto.GetEventsRequest{

constants.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,9 @@ const (
304304
// ComponentMCP represents the MCP server handler.
305305
ComponentMCP = "mcp"
306306

307+
// ComponentRecordingEncryption represents recording encryption
308+
ComponentRecordingEncryption = "recording-encryption"
309+
307310
// VerboseLogsEnvVar forces all logs to be verbose (down to DEBUG level)
308311
VerboseLogsEnvVar = "TELEPORT_DEBUG"
309312

lib/auth/authclient/clt.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,10 @@ func (c *Client) ListWindowsDesktopServices(ctx context.Context, req types.ListW
406406
return nil, trace.NotImplemented(notImplementedMessage)
407407
}
408408

409+
func (c *Client) GetMultipartUploader() events.MultipartUploader {
410+
return nil
411+
}
412+
409413
const (
410414
// UserTokenTypeResetPasswordInvite is a token type used for the UI invite flow that
411415
// allows users to change their password and set second factor (if enabled).

lib/auth/grpcserver.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ import (
6868
mfav1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/mfa/v1"
6969
notificationsv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/notifications/v1"
7070
presencev1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/presence/v1"
71+
recordingencryptionv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/recordingencryption/v1"
7172
scopedaccessv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/scopes/access/v1"
7273
scopedjoiningv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/scopes/joining/v1"
7374
secreportsv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/secreports/v1"
@@ -106,6 +107,7 @@ import (
106107
"github.com/gravitational/teleport/lib/auth/machineid/workloadidentityv1"
107108
"github.com/gravitational/teleport/lib/auth/notifications/notificationsv1"
108109
"github.com/gravitational/teleport/lib/auth/presence/presencev1"
110+
"github.com/gravitational/teleport/lib/auth/recordingencryption/recordingencryptionv1"
109111
scopedaccess "github.com/gravitational/teleport/lib/auth/scopes/access"
110112
scopedjoining "github.com/gravitational/teleport/lib/auth/scopes/joining"
111113
"github.com/gravitational/teleport/lib/auth/secreports/secreportsv1"
@@ -5603,6 +5605,12 @@ func NewGRPCServer(cfg GRPCServerConfig) (*GRPCServer, error) {
56035605
}
56045606
userloginstatev1pb.RegisterUserLoginStateServiceServer(server, userLoginStateServer)
56055607

5608+
recordingEncryptionService, err := recordingencryptionv1.NewService(recordingencryptionv1.ServiceConfig{
5609+
Uploader: cfg.AuditLog.GetMultipartUploader(),
5610+
Logger: cfg.AuthServer.logger.With(teleport.ComponentKey, teleport.ComponentRecordingEncryption),
5611+
})
5612+
recordingencryptionv1pb.RegisterRecordingEncryptionServiceServer(server, recordingEncryptionService)
5613+
56065614
clusterConfigService, err := clusterconfigv1.NewService(clusterconfigv1.ServiceConfig{
56075615
Cache: cfg.AuthServer.Cache,
56085616
Backend: cfg.AuthServer.Services,

lib/auth/init.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,9 @@ type InitConfig struct {
390390

391391
// VnetConfigService manages the VNet config resource.
392392
VnetConfigService services.VnetConfigService
393+
394+
// MultipartUploader handles multipart uploads.
395+
MultipartUploader events.MultipartUploader
393396
}
394397

395398
// Init instantiates and configures an instance of AuthServer
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package recordingencryptionv1
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"log/slog"
7+
8+
"github.com/gravitational/trace"
9+
"google.golang.org/protobuf/types/known/timestamppb"
10+
11+
"github.com/gravitational/teleport"
12+
13+
"github.com/gravitational/teleport/lib/events"
14+
"github.com/gravitational/teleport/lib/session"
15+
16+
recordingencryptionv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/recordingencryption/v1"
17+
)
18+
19+
// ServiceConfig captures everything a [Service] requires to fulfill requests.
20+
type ServiceConfig struct {
21+
Logger *slog.Logger
22+
Uploader events.MultipartUploader
23+
}
24+
25+
// NewService returns a new [Service] based on the given [ServiceConfig].
26+
func NewService(cfg ServiceConfig) (*Service, error) {
27+
if cfg.Logger == nil {
28+
cfg.Logger = slog.Default()
29+
}
30+
31+
return &Service{
32+
logger: cfg.Logger.With("component", teleport.ComponentRecordingEncryption),
33+
uploader: cfg.Uploader,
34+
}, nil
35+
}
36+
37+
// Service implements a gRPC server for interacting with encrypted recordings.
38+
type Service struct {
39+
recordingencryptionv1.UnimplementedRecordingEncryptionServiceServer
40+
41+
logger *slog.Logger
42+
uploader events.MultipartUploader
43+
}
44+
45+
func streamUploadAsProto(upload events.StreamUpload) *recordingencryptionv1.Upload {
46+
return &recordingencryptionv1.Upload{
47+
UploadId: upload.ID,
48+
SessionId: upload.SessionID.String(),
49+
InitiatedAt: timestamppb.New(upload.Initiated),
50+
}
51+
}
52+
53+
func protoAsStreamUpload(upload *recordingencryptionv1.Upload) (events.StreamUpload, error) {
54+
sessionID, err := session.ParseID(upload.SessionId)
55+
if err != nil {
56+
return events.StreamUpload{}, trace.BadParameter("invalid session ID", err)
57+
}
58+
59+
return events.StreamUpload{
60+
ID: upload.UploadId,
61+
SessionID: *sessionID,
62+
Initiated: upload.InitiatedAt.AsTime(),
63+
}, nil
64+
}
65+
66+
func protoAsStreamPart(part *recordingencryptionv1.UploadPartResponse) events.StreamPart {
67+
return events.StreamPart{
68+
Number: part.PartNumber,
69+
ETag: part.ETag,
70+
LastModified: part.LastModified.AsTime(),
71+
}
72+
}
73+
74+
// CreateUpload begins a multipart upload for an encrypted session recording.
75+
func (s *Service) CreateUpload(ctx context.Context, req *recordingencryptionv1.CreateUploadRequest) (*recordingencryptionv1.CreateUploadResponse, error) {
76+
sessionID, err := session.ParseID(req.SessionId)
77+
if err != nil {
78+
return nil, trace.BadParameter("invalid session ID", err)
79+
}
80+
81+
upload, err := s.uploader.CreateUpload(ctx, *sessionID)
82+
if err != nil {
83+
return nil, trace.Wrap(err, "creating encrypted recording upload")
84+
}
85+
86+
return &recordingencryptionv1.CreateUploadResponse{
87+
Upload: streamUploadAsProto(*upload),
88+
}, nil
89+
}
90+
91+
// UploadPart uploads an encrypted session recording part to the given upload ID.
92+
func (s *Service) UploadPart(ctx context.Context, req *recordingencryptionv1.UploadPartRequest) (*recordingencryptionv1.UploadPartResponse, error) {
93+
upload, err := protoAsStreamUpload(req.Upload)
94+
if err != nil {
95+
return nil, trace.Wrap(err)
96+
}
97+
98+
if err := s.uploader.ReserveUploadPart(ctx, upload, req.PartNumber); err != nil {
99+
return nil, trace.Wrap(err)
100+
}
101+
102+
part := bytes.NewReader(req.Part)
103+
streamPart, err := s.uploader.UploadPart(ctx, upload, req.PartNumber, part)
104+
if err != nil {
105+
return nil, trace.Wrap(err, "uploading encrypted recording part")
106+
}
107+
108+
return &recordingencryptionv1.UploadPartResponse{
109+
PartNumber: streamPart.Number,
110+
ETag: streamPart.ETag,
111+
}, nil
112+
}
113+
114+
// CompleteUpload marks a given encrypted session upload as complete.
115+
func (s *Service) CompleteUpload(ctx context.Context, req *recordingencryptionv1.CompleteUploadRequest) (*recordingencryptionv1.CompleteUploadResponse, error) {
116+
upload, err := protoAsStreamUpload(req.Upload)
117+
if err != nil {
118+
return nil, trace.Wrap(err)
119+
}
120+
121+
parts := make([]events.StreamPart, len(req.Parts))
122+
for idx, part := range req.Parts {
123+
parts[idx] = protoAsStreamPart(part)
124+
}
125+
126+
if err := s.uploader.CompleteUpload(ctx, upload, parts); err != nil {
127+
return nil, trace.Wrap(err)
128+
}
129+
130+
return &recordingencryptionv1.CompleteUploadResponse{}, nil
131+
}

lib/events/api.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"context"
2323
"fmt"
2424
"io"
25+
"iter"
2526
"math"
2627
"time"
2728

@@ -1107,6 +1108,8 @@ type StreamEmitter interface {
11071108
type AuditLogSessionStreamer interface {
11081109
AuditLogger
11091110
SessionStreamer
1111+
EncryptedRecordingUploader
1112+
GetMultipartUploader() MultipartUploader
11101113
}
11111114

11121115
// SessionStreamer supports streaming session chunks or events.
@@ -1121,6 +1124,12 @@ type SessionStreamer interface {
11211124
StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error)
11221125
}
11231126

1127+
// EncryptedRecordingUploader takes a session ID and a sequence of encrypted
1128+
// recording parts and uploads an encrypted session recording.
1129+
type EncryptedRecordingUploader interface {
1130+
UploadEncryptedRecording(ctx context.Context, sessionID string, parts iter.Seq2[[]byte, error]) error
1131+
}
1132+
11241133
type SearchEventsRequest struct {
11251134
// From is oldest date of returned events, can be zero.
11261135
From time.Time

lib/events/auditlog.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
package events
2020

2121
import (
22+
"bytes"
2223
"context"
2324
"errors"
2425
"io"
2526
"io/fs"
27+
"iter"
2628
"log/slog"
2729
"os"
2830
"path/filepath"
@@ -635,6 +637,55 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
635637
return c, e
636638
}
637639

640+
// UploadEncryptedRecording uploads recording parts provided over a channel using the
641+
// configured MultipartUploader.
642+
func UploadEncryptedRecording(ctx context.Context, uploader MultipartUploader, sessionID string, parts iter.Seq2[[]byte, error]) error {
643+
sessID, err := session.ParseID(sessionID)
644+
if err != nil {
645+
return trace.BadParameter("invalid session ID", err)
646+
}
647+
upload, err := uploader.CreateUpload(ctx, *sessID)
648+
if err != nil {
649+
return trace.Wrap(err, "creating upload")
650+
}
651+
652+
var streamParts []StreamPart
653+
var partNumber int64
654+
for part, err := range parts {
655+
defer func() {
656+
partNumber++
657+
}()
658+
659+
if err != nil {
660+
return trace.Wrap(err)
661+
}
662+
663+
if err := uploader.ReserveUploadPart(ctx, *upload, partNumber); err != nil {
664+
return trace.Wrap(err)
665+
}
666+
667+
streamPart, err := uploader.UploadPart(ctx, *upload, partNumber, bytes.NewReader(part))
668+
if err != nil {
669+
return trace.Wrap(err)
670+
}
671+
streamParts = append(streamParts, *streamPart)
672+
}
673+
674+
return trace.Wrap(uploader.CompleteUpload(ctx, *upload, streamParts))
675+
}
676+
677+
// UploadEncryptedRecording uploads encrypted recordings using the AuditLog's configured
678+
// UploadHandler.
679+
func (l *AuditLog) UploadEncryptedRecording(ctx context.Context, sessionID string, parts iter.Seq2[[]byte, error]) error {
680+
return UploadEncryptedRecording(ctx, l.UploadHandler, sessionID, parts)
681+
}
682+
683+
// GetMultipartUploader returns the upload handler used by the AuditLog for persisting
684+
// session recordings.
685+
func (l *AuditLog) GetMultipartUploader() MultipartUploader {
686+
return l.UploadHandler
687+
}
688+
638689
// getLocalLog returns the local (file based) AuditLogger.
639690
func (l *AuditLog) getLocalLog() AuditLogger {
640691
l.RLock()

0 commit comments

Comments
 (0)