Skip to content

Commit 9fc96d4

Browse files
committed
Graceful stop database heartbeats in ICS
Support stopping an individual database heartbeat in the inventory control stream instead of waiting for multiple keepalives to fail.
1 parent a4f1a0e commit 9fc96d4

File tree

9 files changed

+434
-88
lines changed

9 files changed

+434
-88
lines changed

api/client/inventory.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,10 @@ func (i *downstreamICS) runSendLoop(stream proto.AuthService_InventoryControlStr
343343
oneOf.Msg = &proto.UpstreamInventoryOneOf_Goodbye{
344344
Goodbye: msg,
345345
}
346+
case *proto.UpstreamInventoryStopHeartbeat:
347+
oneOf.Msg = &proto.UpstreamInventoryOneOf_StopHeartbeat{
348+
StopHeartbeat: msg,
349+
}
346350
default:
347351
sendMsg.errC <- trace.BadParameter("cannot send unexpected upstream msg type: %T", msg)
348352
continue
@@ -484,6 +488,8 @@ func (i *upstreamICS) runRecvLoop(stream proto.AuthService_InventoryControlStrea
484488
msg = oneOf.GetAgentMetadata()
485489
case oneOf.GetGoodbye() != nil:
486490
msg = oneOf.GetGoodbye()
491+
case oneOf.GetStopHeartbeat() != nil:
492+
msg = oneOf.GetStopHeartbeat()
487493
default:
488494
slog.WarnContext(stream.Context(), "received unknown upstream message", "message", oneOf)
489495
continue

api/client/proto/inventory.pb.go

Lines changed: 212 additions & 68 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/client/proto/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ func (a *UpstreamInventoryAgentMetadata) sealedUpstreamInventoryMessage() {}
107107

108108
func (h *UpstreamInventoryGoodbye) sealedUpstreamInventoryMessage() {}
109109

110+
func (h *UpstreamInventoryStopHeartbeat) sealedUpstreamInventoryMessage() {}
111+
110112
// DownstreamInventoryMessage is a sealed interface representing the possible
111113
// downstream messages of the inventory controls stream after initial hello.
112114
type DownstreamInventoryMessage interface {

api/proto/teleport/legacy/client/proto/inventory.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ message UpstreamInventoryOneOf {
3838
UpstreamInventoryAgentMetadata AgentMetadata = 4;
3939
// UpstreamInventoryGoodbye advertises that the instance is terminating.
4040
UpstreamInventoryGoodbye Goodbye = 5;
41+
// UpstreamInventoryStopHeartbeat informs the upstream service that a
42+
// heartbeat is stopping.
43+
UpstreamInventoryStopHeartbeat stop_heartbeat = 6;
4144
}
4245
}
4346

@@ -166,6 +169,8 @@ message DownstreamInventoryHello {
166169
bool KubernetesHeartbeats = 17;
167170
// KubernetesCleanup indicates the ICS supports deleting kubernetes clusters when UpstreamInventoryGoodbye.DeleteResources is set.
168171
bool KubernetesCleanup = 18;
172+
// DatabaseHeartbeatGracefulStop indicates the ICS supports stopping an individual database heartbeat.
173+
bool database_heartbeat_graceful_stop = 19;
169174
}
170175

171176
// SupportedCapabilities advertises the supported features of the auth server.
@@ -255,3 +260,20 @@ message InventoryStatusSummary {
255260
// ServiceCounts aggregates the number of services.
256261
map<string, uint32> ServiceCounts = 5;
257262
}
263+
264+
// UpstreamInventoryStopHeartbeat informs the upstream service that the
265+
// heartbeat is stopping.
266+
message UpstreamInventoryStopHeartbeat {
267+
// Kind is the kind of heartbeat to stop.
268+
StopHeartbeatKind kind = 1;
269+
// Name is the name of the heatbeat to stop.
270+
string name = 2;
271+
}
272+
273+
// StopHeartbeatKind is the type of heartbeat to stop.
274+
enum StopHeartbeatKind {
275+
STOP_HEARTBEAT_KIND_UNSPECIFIED = 0;
276+
277+
// STOP_HEARTBEAT_KIND_DATABASE_SERVER means stop a database server heartbeat.
278+
STOP_HEARTBEAT_KIND_DATABASE_SERVER = 1;
279+
}

lib/auth/auth.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4958,13 +4958,14 @@ func (a *Server) RegisterInventoryControlStream(ics client.UpstreamInventoryCont
49584958
Version: teleport.Version,
49594959
ServerID: a.ServerID,
49604960
Capabilities: &proto.DownstreamInventoryHello_SupportedCapabilities{
4961-
NodeHeartbeats: true,
4962-
AppHeartbeats: true,
4963-
AppCleanup: true,
4964-
DatabaseHeartbeats: true,
4965-
DatabaseCleanup: true,
4966-
KubernetesHeartbeats: true,
4967-
KubernetesCleanup: true,
4961+
NodeHeartbeats: true,
4962+
AppHeartbeats: true,
4963+
AppCleanup: true,
4964+
DatabaseHeartbeats: true,
4965+
DatabaseHeartbeatGracefulStop: true,
4966+
DatabaseCleanup: true,
4967+
KubernetesHeartbeats: true,
4968+
KubernetesCleanup: true,
49684969
},
49694970
}
49704971
if err := ics.Send(a.CloseContext(), downstreamHello); err != nil {

lib/inventory/controller.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ const (
9494
dbUpsertRetryOk testEvent = "db-upsert-retry-ok"
9595
dbUpsertRetryErr testEvent = "db-upsert-retry-err"
9696

97+
dbDelOk testEvent = "db-del-ok"
98+
dbDelErr testEvent = "db-del-err"
99+
97100
kubeKeepAliveOk testEvent = "kube-keep-alive-ok"
98101
kubeKeepAliveErr testEvent = "kube-keep-alive-err"
99102
kubeKeepAliveDel testEvent = "kube-keep-alive-del"
@@ -554,6 +557,20 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
554557
c.handlePong(handle, m)
555558
case *proto.UpstreamInventoryGoodbye:
556559
handle.setGoodbye(m)
560+
case *proto.UpstreamInventoryStopHeartbeat:
561+
switch m.Kind {
562+
case proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER:
563+
if err := c.handleStopDatabaseServerHB(handle, m.Name); err != nil {
564+
handle.CloseWithError(err)
565+
return
566+
}
567+
default:
568+
slog.WarnContext(c.closeContext, "Unexpected upstream stop heartbeat kind on control stream",
569+
"server_id", handle.Hello().ServerID,
570+
"kind", logutils.StringerAttr(m.Kind),
571+
)
572+
}
573+
557574
default:
558575
slog.WarnContext(c.closeContext, "Unexpected upstream message type on control stream",
559576
"message_type", logutils.TypeAttr(m),
@@ -1294,6 +1311,41 @@ func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) e
12941311
return nil
12951312
}
12961313

1314+
func (c *Controller) handleStopDatabaseServerHB(handle *upstreamHandle, name string) error {
1315+
// the auth layer verifies that a stream's hello message matches the identity and capabilities of the
1316+
// client cert. after that point it is our responsibility to ensure that heartbeated information is
1317+
// consistent with the identity and capabilities claimed in the initial hello.
1318+
if !handle.HasService(types.RoleDatabase) {
1319+
return trace.AccessDenied("control stream not configured to support database server heartbeats")
1320+
}
1321+
key := resourceKey{hostID: handle.Hello().ServerID, name: name}
1322+
1323+
if _, ok := handle.databaseServers[key]; ok {
1324+
c.testEvent(dbKeepAliveDel)
1325+
c.onDisconnectFunc(constants.KeepAliveDatabase, 1)
1326+
if c.dbHBVariableDuration != nil {
1327+
c.dbHBVariableDuration.Dec()
1328+
}
1329+
delete(handle.databaseServers, key)
1330+
handle.dbKeepAliveDelay.Remove(key)
1331+
}
1332+
1333+
err := c.auth.DeleteDatabaseServer(c.closeContext, apidefaults.Namespace, key.hostID, key.name)
1334+
if err == nil {
1335+
c.testEvent(dbDelOk)
1336+
} else {
1337+
c.testEvent(dbDelErr)
1338+
if !trace.IsNotFound(err) {
1339+
slog.WarnContext(c.closeContext, "Failed to delete database server heartbeat",
1340+
"database_name", key.name,
1341+
"server_id", handle.Hello().ServerID,
1342+
"error", err,
1343+
)
1344+
}
1345+
}
1346+
return nil
1347+
}
1348+
12971349
func (c *Controller) createKeepAliveDelay(variableDuration *interval.VariableDuration) *delay.Delay {
12981350
return delay.New(delay.Params{
12991351
FirstInterval: retryutils.HalfJitter(c.serverKeepAlive),

lib/inventory/controller_test.go

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@ type fakeAuth struct {
5555
mu sync.Mutex
5656
failUpserts int
5757
failKeepAlives int
58+
failDeletes int
5859

5960
upserts int
6061
keepalives int
62+
deletes int
6163
err error
6264

6365
expectAddr string
@@ -125,6 +127,14 @@ func (a *fakeAuth) UpsertDatabaseServer(_ context.Context, server types.Database
125127
}
126128

127129
func (a *fakeAuth) DeleteDatabaseServer(ctx context.Context, namespace, hostID, name string) error {
130+
a.mu.Lock()
131+
defer a.mu.Unlock()
132+
a.deletes++
133+
134+
if a.failDeletes > 0 {
135+
a.failDeletes--
136+
return trace.Errorf("delete failed as test condition")
137+
}
128138
return nil
129139
}
130140

@@ -688,7 +698,7 @@ func TestDatabaseServerBasics(t *testing.T) {
688698
require.Equal(t, int64(1), controller.instanceHBVariableDuration.Count())
689699

690700
// send a fake db server heartbeat
691-
for i := 0; i < dbCount; i++ {
701+
for i := range dbCount {
692702
err := downstream.Send(ctx, &proto.InventoryHeartbeat{
693703
DatabaseServer: &types.DatabaseServerV3{
694704
Metadata: types.Metadata{
@@ -717,6 +727,50 @@ func TestDatabaseServerBasics(t *testing.T) {
717727
deny(dbUpsertErr, dbKeepAliveErr, handlerClose),
718728
)
719729

730+
// set up to induce delete failure
731+
auth.mu.Lock()
732+
auth.failDeletes = 2
733+
auth.mu.Unlock()
734+
735+
// stop a heartbeat
736+
err := downstream.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
737+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
738+
Name: "db-1",
739+
})
740+
require.NoError(t, err)
741+
742+
// verify that keep alive stops, even if the heartbeat couldn't be deleted
743+
awaitEvents(t, events,
744+
expect(dbKeepAliveDel, dbDelErr),
745+
deny(dbDelOk, dbUpsertErr, dbKeepAliveErr, handlerClose),
746+
)
747+
require.Equal(t, dbCount-1, rc.count())
748+
749+
// verify heartbeat stop keepalive idempotency
750+
err = downstream.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
751+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
752+
Name: "db-1",
753+
})
754+
require.NoError(t, err)
755+
require.Equal(t, dbCount-1, rc.count())
756+
757+
awaitEvents(t, events,
758+
expect(dbDelErr),
759+
deny(dbKeepAliveDel, dbDelOk, dbUpsertErr, dbKeepAliveErr, handlerClose),
760+
)
761+
require.Equal(t, dbCount-1, rc.count())
762+
763+
err = downstream.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
764+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
765+
Name: "db-1",
766+
})
767+
require.NoError(t, err)
768+
awaitEvents(t, events,
769+
expect(dbDelOk),
770+
deny(dbKeepAliveDel, dbDelErr, dbUpsertErr, dbKeepAliveErr, handlerClose),
771+
)
772+
require.Equal(t, dbCount-1, rc.count())
773+
720774
// set up to induce some failures, but not enough to cause the control
721775
// stream to be closed.
722776
auth.mu.Lock()
@@ -778,7 +832,7 @@ func TestDatabaseServerBasics(t *testing.T) {
778832
defer cancel()
779833

780834
// execute ping
781-
_, err := handle.Ping(pingCtx, 1)
835+
_, err = handle.Ping(pingCtx, 1)
782836
require.NoError(t, err)
783837

784838
// ensure that local db keepalive states have reset to healthy by waiting
@@ -1741,6 +1795,7 @@ func deny(events ...testEvent) eventOption {
17411795
}
17421796

17431797
func awaitEvents(t *testing.T, ch <-chan testEvent, opts ...eventOption) {
1798+
t.Helper()
17441799
options := eventOpts{
17451800
expect: make(map[testEvent]int),
17461801
deny: make(map[testEvent]struct{}),

lib/srv/db/server.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"golang.org/x/sync/errgroup"
3636

3737
"github.com/gravitational/teleport"
38+
"github.com/gravitational/teleport/api/client/proto"
3839
apidefaults "github.com/gravitational/teleport/api/defaults"
3940
"github.com/gravitational/teleport/api/types"
4041
apievents "github.com/gravitational/teleport/api/types/events"
@@ -673,8 +674,8 @@ func (s *Server) stopDynamicLabels(name string) {
673674
func (s *Server) registerDatabase(ctx context.Context, database types.Database) error {
674675
if err := s.startDatabase(ctx, database); err != nil {
675676
// Cleanup in case database was initialized only partially.
676-
if errStop := s.stopDatabase(ctx, database); errStop != nil {
677-
return trace.NewAggregate(err, errStop)
677+
if errUnregister := s.unregisterDatabase(ctx, database); errUnregister != nil {
678+
return trace.NewAggregate(err, errUnregister)
678679
}
679680
return trace.Wrap(err)
680681
}
@@ -691,10 +692,6 @@ func (s *Server) updateDatabase(ctx context.Context, database types.Database) er
691692
return trace.Wrap(err)
692693
}
693694
if err := s.registerDatabase(ctx, database); err != nil {
694-
// If we failed to re-register, don't keep proxying the old database.
695-
if errUnregister := s.unregisterDatabase(ctx, database); errUnregister != nil {
696-
return trace.NewAggregate(err, errUnregister)
697-
}
698695
return trace.Wrap(err)
699696
}
700697
return nil
@@ -721,10 +718,14 @@ func (s *Server) stopProxyingAndDeleteDatabase(ctx context.Context, database typ
721718
if err := s.stopDatabase(ctx, database); err != nil {
722719
return trace.Wrap(err)
723720
}
724-
// Heartbeat is stopped but if we don't remove this database server,
725-
// it can linger for up to ~10m until its TTL expires.
726-
if err := s.deleteDatabaseServer(ctx, database.GetName()); err != nil {
727-
return trace.Wrap(err)
721+
// stopping the upstream inventory heartbeat may incur a backend write
722+
// to delete the heartbeat, which is unnecessary when updating a DB,
723+
// since we will upsert a new heartbeat
724+
if err := s.stopUpstreamInventoryHeartbeat(ctx, database); err != nil {
725+
s.log.WarnContext(ctx, "Failed to stop upstream inventory database heartbeat",
726+
"db", log.StringerAttr(database),
727+
"error", err,
728+
)
728729
}
729730
s.mu.Lock()
730731
defer s.mu.Unlock()
@@ -781,7 +782,6 @@ func (s *Server) copyDatabaseWithUpdatedLabelsLocked(database types.Database) *t
781782
func (s *Server) startHeartbeat(ctx context.Context, database types.Database) error {
782783
heartbeat, err := srv.NewDatabaseServerHeartbeat(srv.HeartbeatV2Config[*types.DatabaseServerV3]{
783784
InventoryHandle: s.cfg.InventoryHandle,
784-
Announcer: s.cfg.AccessPoint,
785785
GetResource: s.getServerInfoFunc(database),
786786
OnHeartbeat: s.cfg.OnHeartbeat,
787787
})
@@ -807,6 +807,36 @@ func (s *Server) stopHeartbeat(name string) error {
807807
return heartbeat.Close()
808808
}
809809

810+
// stopUpstreamInventoryHeartbeat stops the upstream inventory control stream
811+
// heartbeat for a single database.
812+
// https://github.com/gravitational/teleport/issues/50237
813+
func (s *Server) stopUpstreamInventoryHeartbeat(ctx context.Context, db types.Database) error {
814+
if _, ok := s.cfg.InventoryHandle.GetSender(); ok {
815+
select {
816+
// get latest sender
817+
case sender := <-s.cfg.InventoryHandle.Sender():
818+
if sender.Hello().Capabilities.DatabaseHeartbeatGracefulStop {
819+
err := sender.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
820+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
821+
Name: db.GetName(),
822+
})
823+
return trace.Wrap(err)
824+
}
825+
case <-ctx.Done():
826+
return trace.Wrap(ctx.Err())
827+
}
828+
}
829+
// the heartbeat may have been created using a fallback method or the
830+
// upstream doesn't support graceful stop.
831+
// if we don't remove this database server, it can linger for up to ~10m
832+
// until its TTL expires.
833+
// TODO(gavin): DELETE IN 20.0.0
834+
if err := s.deleteDatabaseServer(ctx, db.GetName()); err != nil {
835+
return trace.Wrap(err)
836+
}
837+
return nil
838+
}
839+
810840
// getServerInfoFunc returns function that the heartbeater uses to report the
811841
// provided database to the auth server.
812842
//

0 commit comments

Comments
 (0)