Skip to content

Commit d969dbb

Browse files
committed
Graceful stop database heartbeats in ICS
Support stopping an individual database heartbeat in the inventory control stream instead of waiting for multiple keepalives to fail.
1 parent a4f1a0e commit d969dbb

File tree

9 files changed

+459
-90
lines changed

9 files changed

+459
-90
lines changed

api/client/inventory.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,10 @@ func (i *downstreamICS) runSendLoop(stream proto.AuthService_InventoryControlStr
343343
oneOf.Msg = &proto.UpstreamInventoryOneOf_Goodbye{
344344
Goodbye: msg,
345345
}
346+
case *proto.UpstreamInventoryStopHeartbeat:
347+
oneOf.Msg = &proto.UpstreamInventoryOneOf_StopHeartbeat{
348+
StopHeartbeat: msg,
349+
}
346350
default:
347351
sendMsg.errC <- trace.BadParameter("cannot send unexpected upstream msg type: %T", msg)
348352
continue
@@ -484,6 +488,8 @@ func (i *upstreamICS) runRecvLoop(stream proto.AuthService_InventoryControlStrea
484488
msg = oneOf.GetAgentMetadata()
485489
case oneOf.GetGoodbye() != nil:
486490
msg = oneOf.GetGoodbye()
491+
case oneOf.GetStopHeartbeat() != nil:
492+
msg = oneOf.GetStopHeartbeat()
487493
default:
488494
slog.WarnContext(stream.Context(), "received unknown upstream message", "message", oneOf)
489495
continue

api/client/proto/inventory.pb.go

Lines changed: 222 additions & 68 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/client/proto/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ func (a *UpstreamInventoryAgentMetadata) sealedUpstreamInventoryMessage() {}
107107

108108
func (h *UpstreamInventoryGoodbye) sealedUpstreamInventoryMessage() {}
109109

110+
func (h *UpstreamInventoryStopHeartbeat) sealedUpstreamInventoryMessage() {}
111+
110112
// DownstreamInventoryMessage is a sealed interface representing the possible
111113
// downstream messages of the inventory controls stream after initial hello.
112114
type DownstreamInventoryMessage interface {

api/proto/teleport/legacy/client/proto/inventory.proto

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ message UpstreamInventoryOneOf {
3838
UpstreamInventoryAgentMetadata AgentMetadata = 4;
3939
// UpstreamInventoryGoodbye advertises that the instance is terminating.
4040
UpstreamInventoryGoodbye Goodbye = 5;
41+
// UpstreamInventoryStopHeartbeat informs the upstream service that the
42+
// heartbeat is stopping.
43+
UpstreamInventoryStopHeartbeat StopHeartbeat = 6;
4144
}
4245
}
4346

@@ -166,6 +169,8 @@ message DownstreamInventoryHello {
166169
bool KubernetesHeartbeats = 17;
167170
// KubernetesCleanup indicates the ICS supports deleting kubernetes clusters when UpstreamInventoryGoodbye.DeleteResources is set.
168171
bool KubernetesCleanup = 18;
172+
// DatabaseHeartbeatGracefulStop indicates the ICS supports stopping an individual database heartbeat.
173+
bool DatabaseHeartbeatGracefulStop = 19;
169174
}
170175

171176
// SupportedCapabilities advertises the supported features of the auth server.
@@ -255,3 +260,22 @@ message InventoryStatusSummary {
255260
// ServiceCounts aggregates the number of services.
256261
map<string, uint32> ServiceCounts = 5;
257262
}
263+
264+
// UpstreamInventoryStopHeartbeat informs the upstream service that the
265+
// heartbeat is stopping.
266+
message UpstreamInventoryStopHeartbeat {
267+
// Kind is the kind of heartbeat to stop.
268+
StopHeartbeatKind Kind = 1;
269+
// HostID is the heartbeat host ID.
270+
string HostID = 2;
271+
// Name is the name of the heatbeat to stop.
272+
string Name = 3;
273+
}
274+
275+
// StopHeartbeatKind is the type of heartbeat to stop.
276+
enum StopHeartbeatKind {
277+
STOP_HEARTBEAT_KIND_UNSPECIFIED = 0;
278+
279+
// STOP_HEARTBEAT_KIND_DATABASE_SERVER means stop a database server heartbeat.
280+
STOP_HEARTBEAT_KIND_DATABASE_SERVER = 1;
281+
}

lib/auth/auth.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4958,13 +4958,14 @@ func (a *Server) RegisterInventoryControlStream(ics client.UpstreamInventoryCont
49584958
Version: teleport.Version,
49594959
ServerID: a.ServerID,
49604960
Capabilities: &proto.DownstreamInventoryHello_SupportedCapabilities{
4961-
NodeHeartbeats: true,
4962-
AppHeartbeats: true,
4963-
AppCleanup: true,
4964-
DatabaseHeartbeats: true,
4965-
DatabaseCleanup: true,
4966-
KubernetesHeartbeats: true,
4967-
KubernetesCleanup: true,
4961+
NodeHeartbeats: true,
4962+
AppHeartbeats: true,
4963+
AppCleanup: true,
4964+
DatabaseHeartbeats: true,
4965+
DatabaseHeartbeatGracefulStop: true,
4966+
DatabaseCleanup: true,
4967+
KubernetesHeartbeats: true,
4968+
KubernetesCleanup: true,
49684969
},
49694970
}
49704971
if err := ics.Send(a.CloseContext(), downstreamHello); err != nil {

lib/inventory/controller.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ const (
9494
dbUpsertRetryOk testEvent = "db-upsert-retry-ok"
9595
dbUpsertRetryErr testEvent = "db-upsert-retry-err"
9696

97+
dbDelOk testEvent = "db-del-ok"
98+
dbDelErr testEvent = "db-del-err"
99+
97100
kubeKeepAliveOk testEvent = "kube-keep-alive-ok"
98101
kubeKeepAliveErr testEvent = "kube-keep-alive-err"
99102
kubeKeepAliveDel testEvent = "kube-keep-alive-del"
@@ -554,6 +557,21 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
554557
c.handlePong(handle, m)
555558
case *proto.UpstreamInventoryGoodbye:
556559
handle.setGoodbye(m)
560+
case *proto.UpstreamInventoryStopHeartbeat:
561+
key := resourceKey{hostID: m.HostID, name: m.Name}
562+
switch m.Kind {
563+
case proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER:
564+
if err := c.handleStopDatabaseServerHB(handle, key); err != nil {
565+
handle.CloseWithError(err)
566+
return
567+
}
568+
default:
569+
slog.WarnContext(c.closeContext, "Unexpected upstream stop heartbeat kind on control stream",
570+
"server_id", handle.Hello().ServerID,
571+
"kind", logutils.StringerAttr(m.Kind),
572+
)
573+
}
574+
557575
default:
558576
slog.WarnContext(c.closeContext, "Unexpected upstream message type on control stream",
559577
"message_type", logutils.TypeAttr(m),
@@ -1294,6 +1312,43 @@ func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) e
12941312
return nil
12951313
}
12961314

1315+
func (c *Controller) handleStopDatabaseServerHB(handle *upstreamHandle, key resourceKey) error {
1316+
// the auth layer verifies that a stream's hello message matches the identity and capabilities of the
1317+
// client cert. after that point it is our responsibility to ensure that heartbeated information is
1318+
// consistent with the identity and capabilities claimed in the initial hello.
1319+
if !handle.HasService(types.RoleDatabase) {
1320+
return trace.AccessDenied("control stream not configured to support database server heartbeats")
1321+
}
1322+
if key.hostID != handle.Hello().ServerID {
1323+
return trace.AccessDenied("incorrect database server ID (expected %q, got %q)", handle.Hello().ServerID, key.hostID)
1324+
}
1325+
1326+
if _, ok := handle.databaseServers[key]; ok {
1327+
c.testEvent(dbKeepAliveDel)
1328+
c.onDisconnectFunc(constants.KeepAliveDatabase, 1)
1329+
if c.dbHBVariableDuration != nil {
1330+
c.dbHBVariableDuration.Dec()
1331+
}
1332+
delete(handle.databaseServers, key)
1333+
handle.dbKeepAliveDelay.Remove(key)
1334+
}
1335+
1336+
err := c.auth.DeleteDatabaseServer(c.closeContext, apidefaults.Namespace, key.hostID, key.name)
1337+
if err == nil {
1338+
c.testEvent(dbDelOk)
1339+
} else {
1340+
c.testEvent(dbDelErr)
1341+
if !trace.IsNotFound(err) {
1342+
slog.WarnContext(c.closeContext, "Failed to delete database server heartbeat",
1343+
"database_name", key.name,
1344+
"server_id", handle.Hello().ServerID,
1345+
"error", err,
1346+
)
1347+
}
1348+
}
1349+
return nil
1350+
}
1351+
12971352
func (c *Controller) createKeepAliveDelay(variableDuration *interval.VariableDuration) *delay.Delay {
12981353
return delay.New(delay.Params{
12991354
FirstInterval: retryutils.HalfJitter(c.serverKeepAlive),

lib/inventory/controller_test.go

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@ type fakeAuth struct {
5555
mu sync.Mutex
5656
failUpserts int
5757
failKeepAlives int
58+
failDeletes int
5859

5960
upserts int
6061
keepalives int
62+
deletes int
6163
err error
6264

6365
expectAddr string
@@ -125,6 +127,14 @@ func (a *fakeAuth) UpsertDatabaseServer(_ context.Context, server types.Database
125127
}
126128

127129
func (a *fakeAuth) DeleteDatabaseServer(ctx context.Context, namespace, hostID, name string) error {
130+
a.mu.Lock()
131+
defer a.mu.Unlock()
132+
a.deletes++
133+
134+
if a.failDeletes > 0 {
135+
a.failDeletes--
136+
return trace.Errorf("delete failed as test condition")
137+
}
128138
return nil
129139
}
130140

@@ -688,7 +698,7 @@ func TestDatabaseServerBasics(t *testing.T) {
688698
require.Equal(t, int64(1), controller.instanceHBVariableDuration.Count())
689699

690700
// send a fake db server heartbeat
691-
for i := 0; i < dbCount; i++ {
701+
for i := range dbCount {
692702
err := downstream.Send(ctx, &proto.InventoryHeartbeat{
693703
DatabaseServer: &types.DatabaseServerV3{
694704
Metadata: types.Metadata{
@@ -717,6 +727,53 @@ func TestDatabaseServerBasics(t *testing.T) {
717727
deny(dbUpsertErr, dbKeepAliveErr, handlerClose),
718728
)
719729

730+
// set up to induce delete failure
731+
auth.mu.Lock()
732+
auth.failDeletes = 2
733+
auth.mu.Unlock()
734+
735+
// stop a heartbeat
736+
err := downstream.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
737+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
738+
HostID: serverID,
739+
Name: "db-1",
740+
})
741+
require.NoError(t, err)
742+
743+
// verify that keep alive stops, even if the heartbeat couldn't be deleted
744+
awaitEvents(t, events,
745+
expect(dbKeepAliveDel, dbDelErr),
746+
deny(dbDelOk, dbUpsertErr, dbKeepAliveErr, handlerClose),
747+
)
748+
require.Equal(t, dbCount-1, rc.count())
749+
750+
// verify heartbeat stop keepalive idempotency
751+
err = downstream.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
752+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
753+
HostID: serverID,
754+
Name: "db-1",
755+
})
756+
require.NoError(t, err)
757+
require.Equal(t, dbCount-1, rc.count())
758+
759+
awaitEvents(t, events,
760+
expect(dbDelErr),
761+
deny(dbKeepAliveDel, dbDelOk, dbUpsertErr, dbKeepAliveErr, handlerClose),
762+
)
763+
require.Equal(t, dbCount-1, rc.count())
764+
765+
err = downstream.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
766+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
767+
HostID: serverID,
768+
Name: "db-1",
769+
})
770+
require.NoError(t, err)
771+
awaitEvents(t, events,
772+
expect(dbDelOk),
773+
deny(dbKeepAliveDel, dbDelErr, dbUpsertErr, dbKeepAliveErr, handlerClose),
774+
)
775+
require.Equal(t, dbCount-1, rc.count())
776+
720777
// set up to induce some failures, but not enough to cause the control
721778
// stream to be closed.
722779
auth.mu.Lock()
@@ -778,7 +835,7 @@ func TestDatabaseServerBasics(t *testing.T) {
778835
defer cancel()
779836

780837
// execute ping
781-
_, err := handle.Ping(pingCtx, 1)
838+
_, err = handle.Ping(pingCtx, 1)
782839
require.NoError(t, err)
783840

784841
// ensure that local db keepalive states have reset to healthy by waiting
@@ -1741,6 +1798,7 @@ func deny(events ...testEvent) eventOption {
17411798
}
17421799

17431800
func awaitEvents(t *testing.T, ch <-chan testEvent, opts ...eventOption) {
1801+
t.Helper()
17441802
options := eventOpts{
17451803
expect: make(map[testEvent]int),
17461804
deny: make(map[testEvent]struct{}),

lib/srv/db/server.go

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"golang.org/x/sync/errgroup"
3636

3737
"github.com/gravitational/teleport"
38+
"github.com/gravitational/teleport/api/client/proto"
3839
apidefaults "github.com/gravitational/teleport/api/defaults"
3940
"github.com/gravitational/teleport/api/types"
4041
apievents "github.com/gravitational/teleport/api/types/events"
@@ -673,8 +674,8 @@ func (s *Server) stopDynamicLabels(name string) {
673674
func (s *Server) registerDatabase(ctx context.Context, database types.Database) error {
674675
if err := s.startDatabase(ctx, database); err != nil {
675676
// Cleanup in case database was initialized only partially.
676-
if errStop := s.stopDatabase(ctx, database); errStop != nil {
677-
return trace.NewAggregate(err, errStop)
677+
if errUnregister := s.unregisterDatabase(ctx, database); errUnregister != nil {
678+
return trace.NewAggregate(err, errUnregister)
678679
}
679680
return trace.Wrap(err)
680681
}
@@ -691,10 +692,6 @@ func (s *Server) updateDatabase(ctx context.Context, database types.Database) er
691692
return trace.Wrap(err)
692693
}
693694
if err := s.registerDatabase(ctx, database); err != nil {
694-
// If we failed to re-register, don't keep proxying the old database.
695-
if errUnregister := s.unregisterDatabase(ctx, database); errUnregister != nil {
696-
return trace.NewAggregate(err, errUnregister)
697-
}
698695
return trace.Wrap(err)
699696
}
700697
return nil
@@ -721,10 +718,14 @@ func (s *Server) stopProxyingAndDeleteDatabase(ctx context.Context, database typ
721718
if err := s.stopDatabase(ctx, database); err != nil {
722719
return trace.Wrap(err)
723720
}
724-
// Heartbeat is stopped but if we don't remove this database server,
725-
// it can linger for up to ~10m until its TTL expires.
726-
if err := s.deleteDatabaseServer(ctx, database.GetName()); err != nil {
727-
return trace.Wrap(err)
721+
// stopping the upstream inventory heartbeat may incur a backend write
722+
// to delete the heartbeat, which is unnecessary when updating a DB,
723+
// since we will upsert a new heartbeat
724+
if err := s.stopUpstreamInventoryHeartbeat(ctx, database); err != nil {
725+
s.log.WarnContext(ctx, "Failed to stop upstream inventory database heartbeat",
726+
"db", log.StringerAttr(database),
727+
"error", err,
728+
)
728729
}
729730
s.mu.Lock()
730731
defer s.mu.Unlock()
@@ -781,9 +782,12 @@ func (s *Server) copyDatabaseWithUpdatedLabelsLocked(database types.Database) *t
781782
func (s *Server) startHeartbeat(ctx context.Context, database types.Database) error {
782783
heartbeat, err := srv.NewDatabaseServerHeartbeat(srv.HeartbeatV2Config[*types.DatabaseServerV3]{
783784
InventoryHandle: s.cfg.InventoryHandle,
784-
Announcer: s.cfg.AccessPoint,
785-
GetResource: s.getServerInfoFunc(database),
786-
OnHeartbeat: s.cfg.OnHeartbeat,
785+
// Announcer is provided to allow falling back to non-ICS heartbeats if
786+
// the Auth server is older than the db service.
787+
// TODO(gavin): DELETE IN 19.0.0
788+
Announcer: s.cfg.AccessPoint,
789+
GetResource: s.getServerInfoFunc(database),
790+
OnHeartbeat: s.cfg.OnHeartbeat,
787791
})
788792
if err != nil {
789793
return trace.Wrap(err)
@@ -807,6 +811,37 @@ func (s *Server) stopHeartbeat(name string) error {
807811
return heartbeat.Close()
808812
}
809813

814+
// stopUpstreamInventoryHeartbeat stops the upstream inventory control stream
815+
// heartbeat for a single database.
816+
// https://github.com/gravitational/teleport/issues/50237
817+
func (s *Server) stopUpstreamInventoryHeartbeat(ctx context.Context, db types.Database) error {
818+
if _, ok := s.cfg.InventoryHandle.GetSender(); ok {
819+
select {
820+
// get latest sender
821+
case sender := <-s.cfg.InventoryHandle.Sender():
822+
if sender.Hello().Capabilities.DatabaseHeartbeatGracefulStop {
823+
err := sender.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
824+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
825+
HostID: s.cfg.HostID,
826+
Name: db.GetName(),
827+
})
828+
return trace.Wrap(err)
829+
}
830+
case <-ctx.Done():
831+
return trace.Wrap(ctx.Err())
832+
}
833+
}
834+
// the heartbeat may have been created using a fallback method or the
835+
// upstream doesn't support graceful stop.
836+
// if we don't remove this database server, it can linger for up to ~10m
837+
// until its TTL expires.
838+
// TODO(gavin): DELETE IN 20.0.0
839+
if err := s.deleteDatabaseServer(ctx, db.GetName()); err != nil {
840+
return trace.Wrap(err)
841+
}
842+
return nil
843+
}
844+
810845
// getServerInfoFunc returns function that the heartbeater uses to report the
811846
// provided database to the auth server.
812847
//

0 commit comments

Comments
 (0)