Skip to content

Commit 8645241

Browse files
committed
Graceful stop database heartbeats in ICS
Support stopping an individual database heartbeat in the inventory control stream instead of waiting for multiple keepalives to fail.
1 parent e71690a commit 8645241

File tree

9 files changed

+437
-91
lines changed

9 files changed

+437
-91
lines changed

api/client/inventory.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,10 @@ func (i *downstreamICS) runSendLoop(stream proto.AuthService_InventoryControlStr
343343
oneOf.Msg = &proto.UpstreamInventoryOneOf_Goodbye{
344344
Goodbye: msg,
345345
}
346+
case *proto.UpstreamInventoryStopHeartbeat:
347+
oneOf.Msg = &proto.UpstreamInventoryOneOf_StopHeartbeat{
348+
StopHeartbeat: msg,
349+
}
346350
default:
347351
sendMsg.errC <- trace.BadParameter("cannot send unexpected upstream msg type: %T", msg)
348352
continue
@@ -484,6 +488,8 @@ func (i *upstreamICS) runRecvLoop(stream proto.AuthService_InventoryControlStrea
484488
msg = oneOf.GetAgentMetadata()
485489
case oneOf.GetGoodbye() != nil:
486490
msg = oneOf.GetGoodbye()
491+
case oneOf.GetStopHeartbeat() != nil:
492+
msg = oneOf.GetStopHeartbeat()
487493
default:
488494
slog.WarnContext(stream.Context(), "received unknown upstream message", "message", oneOf)
489495
continue

api/client/proto/inventory.pb.go

Lines changed: 214 additions & 70 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/client/proto/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ func (a *UpstreamInventoryAgentMetadata) sealedUpstreamInventoryMessage() {}
107107

108108
func (h *UpstreamInventoryGoodbye) sealedUpstreamInventoryMessage() {}
109109

110+
func (h *UpstreamInventoryStopHeartbeat) sealedUpstreamInventoryMessage() {}
111+
110112
// DownstreamInventoryMessage is a sealed interface representing the possible
111113
// downstream messages of the inventory controls stream after initial hello.
112114
type DownstreamInventoryMessage interface {

api/proto/teleport/legacy/client/proto/inventory.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ message UpstreamInventoryOneOf {
3939
UpstreamInventoryAgentMetadata AgentMetadata = 4;
4040
// UpstreamInventoryGoodbye advertises that the instance is terminating.
4141
UpstreamInventoryGoodbye Goodbye = 5;
42+
// UpstreamInventoryStopHeartbeat informs the upstream service that a
43+
// heartbeat is stopping.
44+
UpstreamInventoryStopHeartbeat stop_heartbeat = 6;
4245
}
4346
}
4447

@@ -169,6 +172,8 @@ message DownstreamInventoryHello {
169172
bool KubernetesCleanup = 18;
170173
// Indicates that the ICS supports heartbeating relay_server entries as well as deleting them on disconnect if UpstreamInventoryGoodbye.DeleteResources is set.
171174
bool relay_server_heartbeats_cleanup = 19;
175+
// DatabaseHeartbeatGracefulStop indicates the ICS supports stopping an individual database heartbeat.
176+
bool database_heartbeat_graceful_stop = 20;
172177
}
173178

174179
// SupportedCapabilities advertises the supported features of the auth server.
@@ -260,3 +265,20 @@ message InventoryStatusSummary {
260265
// ServiceCounts aggregates the number of services.
261266
map<string, uint32> ServiceCounts = 5;
262267
}
268+
269+
// UpstreamInventoryStopHeartbeat informs the upstream service that the
270+
// heartbeat is stopping.
271+
message UpstreamInventoryStopHeartbeat {
272+
// Kind is the kind of heartbeat to stop.
273+
StopHeartbeatKind kind = 1;
274+
// Name is the name of the heatbeat to stop.
275+
string name = 2;
276+
}
277+
278+
// StopHeartbeatKind is the type of heartbeat to stop.
279+
enum StopHeartbeatKind {
280+
STOP_HEARTBEAT_KIND_UNSPECIFIED = 0;
281+
282+
// STOP_HEARTBEAT_KIND_DATABASE_SERVER means stop a database server heartbeat.
283+
STOP_HEARTBEAT_KIND_DATABASE_SERVER = 1;
284+
}

lib/auth/auth.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4963,14 +4963,15 @@ func (a *Server) RegisterInventoryControlStream(ics client.UpstreamInventoryCont
49634963
Version: teleport.Version,
49644964
ServerID: a.ServerID,
49654965
Capabilities: &proto.DownstreamInventoryHello_SupportedCapabilities{
4966-
NodeHeartbeats: true,
4967-
AppHeartbeats: true,
4968-
AppCleanup: true,
4969-
DatabaseHeartbeats: true,
4970-
DatabaseCleanup: true,
4971-
KubernetesHeartbeats: true,
4972-
KubernetesCleanup: true,
4973-
RelayServerHeartbeatsCleanup: true,
4966+
NodeHeartbeats: true,
4967+
AppHeartbeats: true,
4968+
AppCleanup: true,
4969+
DatabaseHeartbeats: true,
4970+
DatabaseHeartbeatGracefulStop: true,
4971+
DatabaseCleanup: true,
4972+
KubernetesHeartbeats: true,
4973+
KubernetesCleanup: true,
4974+
RelayServerHeartbeatsCleanup: true,
49744975
},
49754976
}
49764977
if err := ics.Send(a.CloseContext(), downstreamHello); err != nil {

lib/inventory/controller.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ const (
101101
dbUpsertRetryOk testEvent = "db-upsert-retry-ok"
102102
dbUpsertRetryErr testEvent = "db-upsert-retry-err"
103103

104+
dbDelOk testEvent = "db-del-ok"
105+
dbDelErr testEvent = "db-del-err"
106+
104107
kubeKeepAliveOk testEvent = "kube-keep-alive-ok"
105108
kubeKeepAliveErr testEvent = "kube-keep-alive-err"
106109
kubeKeepAliveDel testEvent = "kube-keep-alive-del"
@@ -591,6 +594,20 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
591594
c.handlePong(handle, m)
592595
case *proto.UpstreamInventoryGoodbye:
593596
handle.setGoodbye(m)
597+
case *proto.UpstreamInventoryStopHeartbeat:
598+
switch m.Kind {
599+
case proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER:
600+
if err := c.handleStopDatabaseServerHB(handle, m.Name); err != nil {
601+
handle.CloseWithError(err)
602+
return
603+
}
604+
default:
605+
slog.WarnContext(c.closeContext, "Unexpected upstream stop heartbeat kind on control stream",
606+
"server_id", handle.Hello().ServerID,
607+
"kind", logutils.StringerAttr(m.Kind),
608+
)
609+
}
610+
594611
default:
595612
slog.WarnContext(c.closeContext, "Unexpected upstream message type on control stream",
596613
"message_type", logutils.TypeAttr(m),
@@ -1440,6 +1457,41 @@ func (c *Controller) keepAliveRelayServer(handle *upstreamHandle, now time.Time)
14401457
return nil
14411458
}
14421459

1460+
func (c *Controller) handleStopDatabaseServerHB(handle *upstreamHandle, name string) error {
1461+
// the auth layer verifies that a stream's hello message matches the identity and capabilities of the
1462+
// client cert. after that point it is our responsibility to ensure that heartbeated information is
1463+
// consistent with the identity and capabilities claimed in the initial hello.
1464+
if !handle.HasService(types.RoleDatabase) {
1465+
return trace.AccessDenied("control stream not configured to support database server heartbeats")
1466+
}
1467+
key := resourceKey{hostID: handle.Hello().ServerID, name: name}
1468+
1469+
if _, ok := handle.databaseServers[key]; ok {
1470+
c.testEvent(dbKeepAliveDel)
1471+
c.onDisconnectFunc(constants.KeepAliveDatabase, 1)
1472+
if c.dbHBVariableDuration != nil {
1473+
c.dbHBVariableDuration.Dec()
1474+
}
1475+
delete(handle.databaseServers, key)
1476+
handle.dbKeepAliveDelay.Remove(key)
1477+
}
1478+
1479+
err := c.auth.DeleteDatabaseServer(c.closeContext, apidefaults.Namespace, key.hostID, key.name)
1480+
if err == nil {
1481+
c.testEvent(dbDelOk)
1482+
} else {
1483+
c.testEvent(dbDelErr)
1484+
if !trace.IsNotFound(err) {
1485+
slog.WarnContext(c.closeContext, "Failed to delete database server heartbeat",
1486+
"database_name", key.name,
1487+
"server_id", handle.Hello().ServerID,
1488+
"error", err,
1489+
)
1490+
}
1491+
}
1492+
return nil
1493+
}
1494+
14431495
func (c *Controller) createKeepAliveDelay(variableDuration *interval.VariableDuration) *delay.Delay {
14441496
return delay.New(delay.Params{
14451497
FirstInterval: retryutils.HalfJitter(c.serverKeepAlive),

lib/inventory/controller_test.go

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ type fakeAuth struct {
5656
mu sync.Mutex
5757
failUpserts int
5858
failKeepAlives int
59+
failDeletes int
5960

6061
upserts int
6162
keepalives int
63+
deletes int
6264
err error
6365

6466
expectAddr string
@@ -126,6 +128,14 @@ func (a *fakeAuth) UpsertDatabaseServer(_ context.Context, server types.Database
126128
}
127129

128130
func (a *fakeAuth) DeleteDatabaseServer(ctx context.Context, namespace, hostID, name string) error {
131+
a.mu.Lock()
132+
defer a.mu.Unlock()
133+
a.deletes++
134+
135+
if a.failDeletes > 0 {
136+
a.failDeletes--
137+
return trace.Errorf("delete failed as test condition")
138+
}
129139
return nil
130140
}
131141

@@ -699,7 +709,7 @@ func TestDatabaseServerBasics(t *testing.T) {
699709
require.Equal(t, int64(1), controller.instanceHBVariableDuration.Count())
700710

701711
// send a fake db server heartbeat
702-
for i := 0; i < dbCount; i++ {
712+
for i := range dbCount {
703713
err := downstream.Send(ctx, &proto.InventoryHeartbeat{
704714
DatabaseServer: &types.DatabaseServerV3{
705715
Metadata: types.Metadata{
@@ -728,6 +738,50 @@ func TestDatabaseServerBasics(t *testing.T) {
728738
deny(dbUpsertErr, dbKeepAliveErr, handlerClose),
729739
)
730740

741+
// set up to induce delete failure
742+
auth.mu.Lock()
743+
auth.failDeletes = 2
744+
auth.mu.Unlock()
745+
746+
// stop a heartbeat
747+
err := downstream.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
748+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
749+
Name: "db-1",
750+
})
751+
require.NoError(t, err)
752+
753+
// verify that keep alive stops, even if the heartbeat couldn't be deleted
754+
awaitEvents(t, events,
755+
expect(dbKeepAliveDel, dbDelErr),
756+
deny(dbDelOk, dbUpsertErr, dbKeepAliveErr, handlerClose),
757+
)
758+
require.Equal(t, dbCount-1, rc.count())
759+
760+
// verify heartbeat stop keepalive idempotency
761+
err = downstream.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
762+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
763+
Name: "db-1",
764+
})
765+
require.NoError(t, err)
766+
require.Equal(t, dbCount-1, rc.count())
767+
768+
awaitEvents(t, events,
769+
expect(dbDelErr),
770+
deny(dbKeepAliveDel, dbDelOk, dbUpsertErr, dbKeepAliveErr, handlerClose),
771+
)
772+
require.Equal(t, dbCount-1, rc.count())
773+
774+
err = downstream.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
775+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
776+
Name: "db-1",
777+
})
778+
require.NoError(t, err)
779+
awaitEvents(t, events,
780+
expect(dbDelOk),
781+
deny(dbKeepAliveDel, dbDelErr, dbUpsertErr, dbKeepAliveErr, handlerClose),
782+
)
783+
require.Equal(t, dbCount-1, rc.count())
784+
731785
// set up to induce some failures, but not enough to cause the control
732786
// stream to be closed.
733787
auth.mu.Lock()
@@ -789,7 +843,7 @@ func TestDatabaseServerBasics(t *testing.T) {
789843
defer cancel()
790844

791845
// execute ping
792-
_, err := handle.Ping(pingCtx, 1)
846+
_, err = handle.Ping(pingCtx, 1)
793847
require.NoError(t, err)
794848

795849
// ensure that local db keepalive states have reset to healthy by waiting
@@ -1752,6 +1806,7 @@ func deny(events ...testEvent) eventOption {
17521806
}
17531807

17541808
func awaitEvents(t *testing.T, ch <-chan testEvent, opts ...eventOption) {
1809+
t.Helper()
17551810
options := eventOpts{
17561811
expect: make(map[testEvent]int),
17571812
deny: make(map[testEvent]struct{}),

lib/srv/db/server.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"golang.org/x/sync/errgroup"
3737

3838
"github.com/gravitational/teleport"
39+
"github.com/gravitational/teleport/api/client/proto"
3940
apidefaults "github.com/gravitational/teleport/api/defaults"
4041
"github.com/gravitational/teleport/api/types"
4142
apievents "github.com/gravitational/teleport/api/types/events"
@@ -663,8 +664,8 @@ func (s *Server) stopDynamicLabels(name string) {
663664
func (s *Server) registerDatabase(ctx context.Context, database types.Database) error {
664665
if err := s.startDatabase(ctx, database); err != nil {
665666
// Cleanup in case database was initialized only partially.
666-
if errStop := s.stopDatabase(ctx, database); errStop != nil {
667-
return trace.NewAggregate(err, errStop)
667+
if errUnregister := s.unregisterDatabase(ctx, database); errUnregister != nil {
668+
return trace.NewAggregate(err, errUnregister)
668669
}
669670
return trace.Wrap(err)
670671
}
@@ -681,10 +682,6 @@ func (s *Server) updateDatabase(ctx context.Context, database types.Database) er
681682
return trace.Wrap(err)
682683
}
683684
if err := s.registerDatabase(ctx, database); err != nil {
684-
// If we failed to re-register, don't keep proxying the old database.
685-
if errUnregister := s.unregisterDatabase(ctx, database); errUnregister != nil {
686-
return trace.NewAggregate(err, errUnregister)
687-
}
688685
return trace.Wrap(err)
689686
}
690687
return nil
@@ -711,10 +708,14 @@ func (s *Server) stopProxyingAndDeleteDatabase(ctx context.Context, database typ
711708
if err := s.stopDatabase(ctx, database); err != nil {
712709
return trace.Wrap(err)
713710
}
714-
// Heartbeat is stopped but if we don't remove this database server,
715-
// it can linger for up to ~10m until its TTL expires.
716-
if err := s.deleteDatabaseServer(ctx, database.GetName()); err != nil {
717-
return trace.Wrap(err)
711+
// stopping the upstream inventory heartbeat may incur a backend write
712+
// to delete the heartbeat, which is unnecessary when updating a DB,
713+
// since we will upsert a new heartbeat
714+
if err := s.stopUpstreamInventoryHeartbeat(ctx, database); err != nil {
715+
s.log.WarnContext(ctx, "Failed to stop upstream inventory database heartbeat",
716+
"db", log.StringerAttr(database),
717+
"error", err,
718+
)
718719
}
719720
s.mu.Lock()
720721
defer s.mu.Unlock()
@@ -771,7 +772,6 @@ func (s *Server) copyDatabaseWithUpdatedLabelsLocked(database types.Database) *t
771772
func (s *Server) startHeartbeat(ctx context.Context, database types.Database) error {
772773
heartbeat, err := srv.NewDatabaseServerHeartbeat(srv.HeartbeatV2Config[*types.DatabaseServerV3]{
773774
InventoryHandle: s.cfg.InventoryHandle,
774-
Announcer: s.cfg.AccessPoint,
775775
GetResource: s.getServerInfoFunc(database),
776776
OnHeartbeat: s.cfg.OnHeartbeat,
777777
})
@@ -797,6 +797,36 @@ func (s *Server) stopHeartbeat(name string) error {
797797
return heartbeat.Close()
798798
}
799799

800+
// stopUpstreamInventoryHeartbeat stops the upstream inventory control stream
801+
// heartbeat for a single database.
802+
// https://github.com/gravitational/teleport/issues/50237
803+
func (s *Server) stopUpstreamInventoryHeartbeat(ctx context.Context, db types.Database) error {
804+
if _, ok := s.cfg.InventoryHandle.GetSender(); ok {
805+
select {
806+
// get latest sender
807+
case sender := <-s.cfg.InventoryHandle.Sender():
808+
if sender.Hello().Capabilities.DatabaseHeartbeatGracefulStop {
809+
err := sender.Send(ctx, &proto.UpstreamInventoryStopHeartbeat{
810+
Kind: proto.StopHeartbeatKind_STOP_HEARTBEAT_KIND_DATABASE_SERVER,
811+
Name: db.GetName(),
812+
})
813+
return trace.Wrap(err)
814+
}
815+
case <-ctx.Done():
816+
return trace.Wrap(ctx.Err())
817+
}
818+
}
819+
// the heartbeat may have been created using a fallback method or the
820+
// upstream doesn't support graceful stop.
821+
// if we don't remove this database server, it can linger for up to ~10m
822+
// until its TTL expires.
823+
// TODO(gavin): DELETE IN 20.0.0
824+
if err := s.deleteDatabaseServer(ctx, db.GetName()); err != nil {
825+
return trace.Wrap(err)
826+
}
827+
return nil
828+
}
829+
800830
// getServerInfoFunc returns function that the heartbeater uses to report the
801831
// provided database to the auth server.
802832
//

0 commit comments

Comments
 (0)