Skip to content

add aeraki health check #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/aeraki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func main() {
"Generate Envoy Filters in the service namespace")
flag.StringVar(&args.KubeDomainSuffix, "domain", defaultKubernetesDomain, "Kubernetes DNS domain suffix")
flag.StringVar(&args.HTTPSAddr, "httpsAddr", ":15017", "validation service HTTPS address")
flag.StringVar(&args.HTTPAddr, "httpAddr", ":8080", "Aeraki readiness service HTTP address")

flag.Parse()
if args.ServerID == "" {
Expand Down
3 changes: 3 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@ ARG AERAKI_ROOT_BIN_DIR
ARG ARCH
ARG OS

RUN apk update && \
apk add curl

COPY ${AERAKI_ROOT_BIN_DIR}/${ARCH}/${OS}/aeraki /usr/local/bin/
ENTRYPOINT /usr/local/bin/aeraki
17 changes: 17 additions & 0 deletions k8s/aeraki.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ spec:
image: ${AERAKI_IMAGE}:${AERAKI_TAG}
# imagePullPolicy should be set to Never so Minikube can use local image for e2e testing
imagePullPolicy: ${AERAKI_IMG_PULL_POLICY}
ports:
- containerPort: 8080
protocol: TCP
- containerPort: 15010
protocol: TCP
- containerPort: 15017
protocol: TCP
readinessProbe:
failureThreshold: 3
httpGet:
path: /ready
port: 8080
scheme: HTTP
initialDelaySeconds: 1
periodSeconds: 3
successThreshold: 1
timeoutSeconds: 5
resources:
requests:
memory: "1Gi"
Expand Down
30 changes: 16 additions & 14 deletions manifests/charts/aeraki/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ spec:
- name: {{ .Chart.Name }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- containerPort: 8080
protocol: TCP
- containerPort: 15010
protocol: TCP
- containerPort: 15017
protocol: TCP
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 1
periodSeconds: 3
timeoutSeconds: 5
resources:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still keep the resource field now.

{ { - toYaml .Values.resources | nindent 12 } }
env:
- name: AERAKI_IS_MASTER
value: {{ .Values.AERAKI_ENV.AERAKI_IS_MASTER }}
Expand Down Expand Up @@ -71,20 +87,6 @@ spec:
- name: istiod-ca-cert
mountPath: /var/run/secrets/istio
readOnly: true
resources:
{{- toYaml .Values.resources | nindent 12 }}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I see that it not being used.

# ports:
# - name: http
# containerPort: 80
# protocol: TCP
# livenessProbe:
# httpGet:
# path: /
# port: http
# readinessProbe:
# httpGet:
# path: /
# port: http
volumes:
- name: istiod-ca-cert
configMap:
Expand Down
16 changes: 8 additions & 8 deletions pkg/bootstrap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (

// AerakiArgs provides all of the configuration parameters for the Aeraki service.
type AerakiArgs struct {
Master bool
IstiodAddr string
AerakiXdsAddr string
AerakiXdsPort string
PodName string
IstioConfigMapName string
// The listening address for HTTPS (webhooks).
HTTPSAddr string
Master bool
IstiodAddr string
AerakiXdsAddr string
AerakiXdsPort string
PodName string
IstioConfigMapName string
HTTPSAddr string // The listening address for HTTPS (webhooks).
HTTPAddr string // The listening address for HTTP (health).
RootNamespace string
ClusterID string
ConfigStoreSecret string
Expand Down
96 changes: 82 additions & 14 deletions pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"net"
"net/http"
"sync"
"sync/atomic"
"time"

//nolint: gosec
_ "net/http/pprof" // pprof
Expand All @@ -48,6 +50,7 @@ import (
"github.com/aeraki-mesh/aeraki/pkg/envoyfilter"
"github.com/aeraki-mesh/aeraki/pkg/leaderelection"
"github.com/aeraki-mesh/aeraki/pkg/model/protocol"
"github.com/aeraki-mesh/aeraki/pkg/util"
"github.com/aeraki-mesh/aeraki/pkg/xds"
"github.com/aeraki-mesh/aeraki/plugin/dubbo"
"github.com/aeraki-mesh/aeraki/plugin/redis"
Expand All @@ -57,6 +60,9 @@ var (
aerakiLog = log.RegisterScope("aeraki-server", "aeraki-server debugging", 0)
)

// readinessProbe defines a function that will be used indicate whether a server is ready.
type readinessProbe func() bool

// Server contains the runtime configuration for the Aeraki service.
type Server struct {
args *AerakiArgs
Expand All @@ -75,6 +81,13 @@ type Server struct {
istiodCert *tls.Certificate
CABundle *bytes.Buffer
stopControllers func()
// serverReady indicates server is ready to process requests.
serverReady atomic.Bool
readinessProbes map[string]readinessProbe
// httpMux listens on the httpAddr (8080).
// monitoring and readiness Server.
httpServer *http.Server
httpMux *http.ServeMux

// internalStop is closed when the server is shutdown. This should be avoided as much as possible, in
// favor of AddStartFunc. This is only required if we *must* start something outside of this process.
Expand Down Expand Up @@ -115,7 +128,6 @@ func NewServer(args *AerakiArgs) (*Server, error) {
})
// xdsServer is the RDS server for metaProtocol proxy
xdsServer := xds.NewServer(args.AerakiXdsPort, routeCacheMgr)

// crdCtrlMgr watches Aeraki CRDs, such as MetaRouter, ApplicationProtocol, etc.
scalableCtrlMgr, err := createScalableControllers(args, kubeConfig, envoyFilterController, routeCacheMgr)
if err != nil {
Expand All @@ -125,12 +137,10 @@ func NewServer(args *AerakiArgs) (*Server, error) {
routeCacheMgr.MetaRouterControllerClient = scalableCtrlMgr.GetClient()
// envoyFilterController uses controller manager client to get the rate limit configuration in MetaRouters
envoyFilterController.MetaRouterControllerClient = scalableCtrlMgr.GetClient()

// todo replace config with cached client
cfg := scalableCtrlMgr.GetConfig()
args.Protocols[protocol.Dubbo] = dubbo.NewGenerator(scalableCtrlMgr.GetConfig())
args.Protocols[protocol.Redis] = redis.New(cfg, configController.Store)

// singletonCtrlMgr
singletonCtrlMgr, err := createSingletonControllers(args, kubeConfig)
if err != nil {
Expand All @@ -145,6 +155,7 @@ func NewServer(args *AerakiArgs) (*Server, error) {
xdsCacheMgr: routeCacheMgr,
xdsServer: xdsServer,
internalStop: make(chan struct{}),
readinessProbes: make(map[string]readinessProbe),
}
if err := server.initKubeClient(); err != nil {
return nil, fmt.Errorf("error initializing kube client: %v", err)
Expand All @@ -162,9 +173,59 @@ func NewServer(args *AerakiArgs) (*Server, error) {
envoyFilterController.ConfigUpdated(model.EventUpdate)
})
envoyFilterController.InitMeshConfig(server.configMapWatcher)
server.initAerakiServer(args)
return server, err
}

func (s *Server) initAerakiServer(args *AerakiArgs) {
// make sure we have a readiness probe before serving HTTP to avoid marking ready too soon
s.initReadinessProbes()
s.initServers(args)
// Readiness Handler.
s.httpMux.HandleFunc("/ready", s.aerakiReadyHandler)
}

// aerakiReadyHandler handler readiness event
func (s *Server) aerakiReadyHandler(w http.ResponseWriter, _ *http.Request) {
for name, fn := range s.readinessProbes {
if ready := fn(); !ready {
log.Warnf("%s is not ready", name)
w.WriteHeader(http.StatusServiceUnavailable)
return
}
}
w.WriteHeader(http.StatusOK)
}

func (s *Server) initReadinessProbes() {
probes := map[string]readinessProbe{
"aeraki": func() bool {
return s.serverReady.Load()
},
}
for name, probe := range probes {
s.addReadinessProbe(name, probe)
}
}

// adds a readiness probe for Aeraki Server.
func (s *Server) addReadinessProbe(name string, fn readinessProbe) {
s.readinessProbes[name] = fn
}

// initHttpServer init servers
func (s *Server) initServers(args *AerakiArgs) {
aerakiLog.Info("initializing HTTP server for aeraki")
s.httpMux = http.NewServeMux()
s.httpServer = &http.Server{
Addr: args.HTTPAddr,
Handler: s.httpMux,
WriteTimeout: 30 * time.Second,
ReadTimeout: 30 * time.Second,
ReadHeaderTimeout: 30 * time.Second,
}
}

// These controllers are horizontally scalable, multiple instances can be deployed to share the load
func createScalableControllers(args *AerakiArgs, kubeConfig *rest.Config,
envoyFilterController *envoyfilter.Controller, xdsCacheMgr *xds.CacheMgr) (manager.Manager, error) {
Expand Down Expand Up @@ -299,25 +360,32 @@ func (s *Server) Start(stop <-chan struct{}) {
}
go func() {
log.Infof("starting webhook service at %s", httpsListener.Addr())
if err := s.httpsServer.ServeTLS(httpsListener, "", ""); isUnexpectedListenerError(err) {
if err := s.httpsServer.ServeTLS(httpsListener, "", ""); util.IsUnexpectedListenerError(err) {
log.Errorf("error serving https server: %v", err)
}
}()

if err = s.serveHTTP(); err != nil {
aerakiLog.Errorf("failed to http server: %v", err)
}
s.serverReady.Store(true)
s.waitForShutdown(stop)
}

func isUnexpectedListenerError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, net.ErrClosed) {
return false
}
if errors.Is(err, http.ErrServerClosed) {
return false
// serveHTTP starts Http Listener so that it can respond to readiness events.
func (s *Server) serveHTTP() error {
log.Infof("starting HTTP service at %s", s.httpServer.Addr)
httpListener, err := net.Listen("tcp", s.httpServer.Addr)
if err != nil {
return err
}
return true
go func() {
log.Infof("starting HTTP service at %s", httpListener.Addr())
if err := s.httpServer.Serve(httpListener); util.IsUnexpectedListenerError(err) {
log.Errorf("error serving http server: %v", err)
}
}()
return nil
}

// Wait for the stop, and do cleanups
Expand Down
35 changes: 35 additions & 0 deletions pkg/util/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright Aeraki Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"errors"
"net"
"net/http"
)

// IsUnexpectedListenerError handles the error returned by the listener
func IsUnexpectedListenerError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, net.ErrClosed) {
return false
}
if errors.Is(err, http.ErrServerClosed) {
return false
}
return true
}