Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage #10155

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft

storage #10155

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion guardian/cmd/guardian/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package main

import (
"context"
"flag"
"os"
"os/signal"
"syscall"

"github.com/sirupsen/logrus"

Expand Down Expand Up @@ -44,5 +47,20 @@ func main() {
}

logrus.Infof("Starting Calico Guardian %s", cfg.String())
daemon.Run(cfg.Config, cfg.Targets())
daemon.Run(GetShutdownContext(), cfg.Config, cfg.Targets())
}

// GetShutdownContext creates a context that's done when either syscall.SIGINT or syscall.SIGTERM notified.
func GetShutdownContext() context.Context {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-signalChan
logrus.Debug("Shutdown signal received, shutting down.")
cancel()
}()

return ctx
}
59 changes: 30 additions & 29 deletions guardian/pkg/asyncutil/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type CommandExecutor[C any, R any] interface {
type ExecutionController interface {
DrainAndBacklog() Signaler
Resume()
ShutdownSignaler() Signaler
WaitForShutdown() <-chan struct{}
}

type executorCoordinator []ExecutionController
Expand Down Expand Up @@ -65,35 +65,31 @@ func (coordinator executorCoordinator) Resume() {
}
}

func (coordinator executorCoordinator) ShutdownSignaler() Signaler {
var signalers []Signaler
for _, executor := range coordinator {
signalers = append(signalers, executor.ShutdownSignaler())
}

signal := NewSignaler()
func (coordinator executorCoordinator) WaitForShutdown() <-chan struct{} {
signal := make(chan struct{})
go func() {
defer signal.Send()
for _, signaler := range signalers {
<-signaler.Receive()
defer close(signal)
for _, executor := range coordinator {
<-executor.WaitForShutdown()
}
}()

return signal
}

type commandExecutor[C any, R any] struct {
command func(context.Context, C) (R, error)
drainAndBacklogSig chan Signaler
resumeBackloggedSig Signaler
cmdChan chan Command[C, R]
command func(context.Context, C) (R, error)
cmdChan chan Command[C, R]
// backlogChan contains all the commands that failed with EOF, waiting to be retried.
backlogChan chan Command[C, R]
// inflightCmds keeps track of the number of commands that are currently being executed.
inflightCmds sync.WaitGroup
executeSig Signaler
shutdownCompleteSig Signaler
errBuff ErrorBuffer
inflightCmds sync.WaitGroup

resumeBackloggedSig Signaler
drainAndBacklogSig chan Signaler
shutdownCompleteSig chan struct{}

errBuff ErrorBuffer

backLogCommands bool
backlog []Command[C, R]
Expand All @@ -119,9 +115,7 @@ func NewCommandExecutor[C any, R any](ctx context.Context, errBuff ErrorBuffer,
backlogChan: make(chan Command[C, R], 100),
drainAndBacklogSig: make(chan Signaler, 100),
resumeBackloggedSig: NewSignaler(),

executeSig: NewSignaler(),
shutdownCompleteSig: NewSignaler(),
shutdownCompleteSig: make(chan struct{}),
}

go executor.loop(ctx)
Expand All @@ -135,7 +129,7 @@ func (executor *commandExecutor[C, R]) loop(shutdownCtx context.Context) {
// doesn't wait on the channel provided.
defer func() {
defer stopCommands()
defer executor.shutdownCompleteSig.Close()
defer close(executor.shutdownCompleteSig)

// close the cmdChan in case anything tries to write to it. This will ensure a panic occurs while trying to
// clean up any outstanding cmd.
Expand All @@ -150,13 +144,16 @@ func (executor *commandExecutor[C, R]) loop(shutdownCtx context.Context) {
executor.drainBacklogChannel()
executor.backlog = append(executor.backlog, ReadAll(executor.cmdChan)...)

logrus.Debug("Returning errors for outstanding requests due to shutdown.")
for _, cmd := range executor.backlog {
cmd.ReturnError(context.Canceled)
if len(executor.backlog) > 0 {
logrus.Debug("Returning errors for outstanding commands due to shutdown...")
for _, cmd := range executor.backlog {
cmd.ReturnError(context.Canceled)
}
logrus.Debug("Finished returning errors for outstanding commands.")
} else {
logrus.Debug("No outstanding commands, shutting down..")
}
logrus.Debug("Finished returning errors for outstanding requests due to shutdown.")

executor.executeSig.Close()
close(executor.drainAndBacklogSig)
executor.resumeBackloggedSig.Close()
}()
Expand All @@ -179,6 +176,9 @@ func (executor *commandExecutor[C, R]) loop(shutdownCtx context.Context) {
}
case cmd := <-executor.backlogChan:
logrus.Debugf("Received backlog command (current backlog size: %d).", len(executor.backlog))
if len(executor.backlog) > 50 {
logrus.Warningf("Backlog size exceeded has exceed 50.")
}
executor.backlog = append(executor.backlog, cmd)
case signal := <-executor.drainAndBacklogSig:
logrus.Debugf("Received requeue signal.")
Expand Down Expand Up @@ -250,6 +250,7 @@ func (executor *commandExecutor[C, R]) executeCommand(ctx context.Context, req C
defer executor.inflightCmds.Done()
result, err := executor.command(ctx, req.Get())
if err != nil {
logrus.Debugf("Error executing command: %v", err)
executor.errBuff.Write(err)
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
executor.backlogChan <- req
Expand Down Expand Up @@ -287,6 +288,6 @@ func (executor *commandExecutor[Req, Resp]) Resume() {
executor.resumeBackloggedSig.Send()
}

func (executor *commandExecutor[C, R]) ShutdownSignaler() Signaler {
func (executor *commandExecutor[C, R]) WaitForShutdown() <-chan struct{} {
return executor.shutdownCompleteSig
}
6 changes: 4 additions & 2 deletions guardian/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ type Config struct {

K8sEndpoint string `default:"https://kubernetes.default" split_words:"true"`

TunnelDialRetryAttempts int `default:"20" split_words:"true"`
// TunnelDialRetryAttempts is the number of times to the tunnel dialer should retry before failing.
// -1 means dial indefinitely.
TunnelDialRetryAttempts int `default:"-1" split_words:"true"`
TunnelDialRetryInterval time.Duration `default:"5s" split_words:"true"`
TunnelDialTimeout time.Duration `default:"60s" split_words:"true"`
TunnelDialTimeout time.Duration `default:"10s" split_words:"true"`

TunnelDialRecreateOnTunnelClose bool `default:"true" split_words:"true"`
ConnectionRetryAttempts int `default:"25" split_words:"true"`
Expand Down
27 changes: 4 additions & 23 deletions guardian/pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ package daemon

import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -30,10 +27,11 @@ import (
)

// Run starts the daemon, which configures and starts the services needed for guardian to run.
func Run(cfg config.Config, proxyTargets []server.Target) {
func Run(ctx context.Context, cfg config.Config, proxyTargets []server.Target) {
tunnelDialOpts := []tunnel.DialerOption{
tunnel.WithDialerRetryInterval(cfg.TunnelDialRetryInterval),
tunnel.WithDialerTimeout(cfg.TunnelDialTimeout),
tunnel.WithDialerRetryAttempts(cfg.TunnelDialRetryAttempts),
tunnel.WithDialerKeepAliveSettings(cfg.KeepAliveEnable, time.Duration(cfg.KeepAliveInterval)*time.Millisecond),
}

Expand All @@ -57,8 +55,6 @@ func Run(cfg config.Config, proxyTargets []server.Target) {

logrus.Infof("Using server name %s", tlsConfig.ServerName)

ctx := GetShutdownContext()

dialer, err := tunnel.NewTLSSessionDialer(cfg.VoltronURL, tlsConfig, tunnelDialOpts...)
if err != nil {
logrus.WithError(err).Fatal("Failed to create session dialer.")
Expand Down Expand Up @@ -87,7 +83,7 @@ func Run(cfg config.Config, proxyTargets []server.Target) {
defer wg.Done()
// Allow requests to come down from the management cluster.
if err := srv.ListenAndServeManagementCluster(); err != nil {
logrus.WithError(err).Fatal("Serving the tunnel exited.")
logrus.WithError(err).Info("Serving the tunnel exited.")
}
}()

Expand All @@ -98,7 +94,7 @@ func Run(cfg config.Config, proxyTargets []server.Target) {
defer wg.Done()

if err := srv.ListenAndServeCluster(); err != nil {
logrus.WithError(err).Fatal("proxy tunnel exited with an error")
logrus.WithError(err).Info("proxy tunnel exited with an error")
}
}()
}
Expand All @@ -107,18 +103,3 @@ func Run(cfg config.Config, proxyTargets []server.Target) {
logrus.WithError(err).Fatal("proxy tunnel exited with an error")
}
}

// GetShutdownContext creates a context that's done when either syscall.SIGINT or syscall.SIGTERM notified.
func GetShutdownContext() context.Context {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-signalChan
logrus.Debug("Shutdown signal received, shutting down.")
cancel()
}()

return ctx
}
34 changes: 24 additions & 10 deletions guardian/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
calicotls "github.com/projectcalico/calico/crypto/pkg/tls"
"github.com/projectcalico/calico/guardian/pkg/conn"
"github.com/projectcalico/calico/guardian/pkg/tunnel"
"github.com/projectcalico/calico/lib/std/chanutil"
)

// Server represents a server interface with methods for cluster and management cluster operations and graceful shutdown.
Expand All @@ -51,18 +52,20 @@ type server struct {
listenPort string
listenHost string

shutdownCtx context.Context
shutdownCtx context.Context
preemptiveShutdown chan struct{}
}

func New(shutdownCtx context.Context, tunnelCert *tls.Certificate, dialer tunnel.SessionDialer, opts ...Option) (Server, error) {
var err error
srv := &server{
http: new(http.Server),
shutdownCtx: shutdownCtx,
connRetryAttempts: 5,
connRetryInterval: 2 * time.Second,
listenPort: "8080",
tunnelCert: tunnelCert,
http: new(http.Server),
shutdownCtx: shutdownCtx,
connRetryAttempts: 5,
connRetryInterval: 2 * time.Second,
listenPort: "8080",
tunnelCert: tunnelCert,
preemptiveShutdown: make(chan struct{}),
}

for _, o := range opts {
Expand Down Expand Up @@ -93,6 +96,7 @@ func New(shutdownCtx context.Context, tunnelCert *tls.Certificate, dialer tunnel
}

func (srv *server) ListenAndServeManagementCluster() error {
defer close(srv.preemptiveShutdown)
if err := srv.tunnel.Connect(srv.shutdownCtx); err != nil {
return fmt.Errorf("failed to connect to tunnel: %w", err)
}
Expand All @@ -117,6 +121,7 @@ func (srv *server) ListenAndServeManagementCluster() error {
}

func (srv *server) ListenAndServeCluster() error {
defer close(srv.preemptiveShutdown)
logrus.Infof("Listening on %s:%s for connections to proxy to voltron", srv.listenHost, srv.listenPort)
if err := srv.tunnel.Connect(srv.shutdownCtx); err != nil {
return fmt.Errorf("failed to connect to tunnel: %w", err)
Expand Down Expand Up @@ -153,14 +158,23 @@ func (srv *server) ListenAndServeCluster() error {
}

func (srv *server) WaitForShutdown() error {
<-srv.shutdownCtx.Done()
logrus.Info("Received shutdown signal, shutting server down.")
// TODO might just want to wrap the context we have.
select {
case <-srv.preemptiveShutdown:
logrus.Info("Received preemptive shutdown signal, shutting server down.")
case <-srv.shutdownCtx.Done():
logrus.Info("Received shutdown signal, shutting server down.")
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

logrus.Info("Waiting for tunnel to close...")
chanutil.Read(ctx, srv.tunnel.WaitForClose())
logrus.Info("Tunnel is closed.")

err := srv.http.Shutdown(ctx)
logrus.Info("Server shutdown complete.")

return err
}

Expand Down
2 changes: 1 addition & 1 deletion guardian/pkg/thirdpartymocks/net/Conn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading