Skip to content

Commit c3960df

Browse files
arschlespiovesanaEsteco
authored andcommitted
Allowing cluster-global operation (kedacore#269)
* Allowing cluster-global operation Signed-off-by: Aaron Schlesinger <[email protected]> * passing less unnecessary data in the operator Signed-off-by: Aaron Schlesinger <[email protected]> * setting namespace Signed-off-by: Aaron Schlesinger <[email protected]> * updating tests: Signed-off-by: Aaron Schlesinger <[email protected]> * Updating deployment cache interfaces To accommodate multi-namespace gets and watches Signed-off-by: Aaron Schlesinger <[email protected]> * starting on deployment cache informer Signed-off-by: Aaron Schlesinger <[email protected]> * switching over to informer-based deployment cache Signed-off-by: Aaron Schlesinger <[email protected]> * fixing test compile errors and merging fake and in-memory deployment caches Signed-off-by: Aaron <[email protected]> * fixing tests Signed-off-by: Aaron Schlesinger <[email protected]> * fixing more compile errs Signed-off-by: Aaron Schlesinger <[email protected]> * improving logging, and several other small changes Signed-off-by: Aaron Schlesinger <[email protected]> * go mod tidy Signed-off-by: Aaron Schlesinger <[email protected]> * Adding config validation Signed-off-by: Aaron Schlesinger <[email protected]> * improving ns and svc name var names to indicate interceptor Signed-off-by: Aaron Schlesinger <[email protected]> * running go mod tidy Signed-off-by: Aaron Schlesinger <[email protected]> * removing unused code Signed-off-by: Aaron Schlesinger <[email protected]> * passing a function to transform target to in-cluster URL Signed-off-by: Aaron Schlesinger <[email protected]> * not requiring namespace for operator Signed-off-by: Aaron Schlesinger <[email protected]> * splitting namespace config for operator into watch and current Signed-off-by: Aaron Schlesinger <[email protected]> * removing appInfo, passing current namespace everywhere in its place Signed-off-by: Aaron Schlesinger <[email protected]> * using proper namespace when creating scaled object Signed-off-by: Aaron Schlesinger <[email protected]> * allowing xkcd chart to set ingress namespace Signed-off-by: Aaron Schlesinger <[email protected]> * printing namespace in error Signed-off-by: Aaron Schlesinger <[email protected]> * using proper fully-qualified hostname of external scaler in scaledobject Signed-off-by: Aaron Schlesinger <[email protected]> * adding note on cluster-global vs. namespaced mode Signed-off-by: Aaron Schlesinger <[email protected]> * adding note about installing the xkcd chart in cluster-global mode Signed-off-by: Aaron Schlesinger <[email protected]> * fixing hostname test Signed-off-by: Aaron Schlesinger <[email protected]> * merging scaler queue counts with routing table hosts. removing merge functionality from interceptors Signed-off-by: Aaron Schlesinger <[email protected]> * fix assumption in tests that queue has all hosts from routing table Signed-off-by: Aaron Schlesinger <[email protected]> * adding test for MergeCountsWithRoutingTable Signed-off-by: Aaron Schlesinger <[email protected]> * sleep for longer to wait for server to start Signed-off-by: Aaron Schlesinger <[email protected]> * adding handler test for merging hosts Signed-off-by: Aaron Schlesinger <[email protected]> * adding test to GetMetrics test cases for host not in queue pinger Signed-off-by: Aaron Schlesinger <[email protected]> Signed-off-by: Marco Piovesana <[email protected]>
1 parent ad5ce56 commit c3960df

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+913
-1555
lines changed

docs/install.md

+10-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@ The HTTP Add On is highly modular and, as expected, builds on top of KEDA core.
66
- **Scaler** - communicates scaling-related metrics to KEDA. By default, the operator will install this for you as necessary.
77
- **Interceptor** - a cluster-internal proxy that proxies incoming HTTP requests, communicating HTTP queue size metrics to the scaler, and holding requests in a temporary request queue when there are not yet any available app `Pod`s ready to serve. By default, the operator will install this for you as necessary.
88

9-
>There is [pending work in KEDA](https://github.com/kedacore/keda/issues/615) that will eventually make this component optional. See [issue #6 in this repository](https://github.com/kedacore/http-add-on/issues/6) for even more background
9+
>There is [pending work](https://github.com/kedacore/http-add-on/issues/354) that may eventually make this component optional.
1010
11+
## Before You Start: Cluster-global vs. Namespaced installation
12+
13+
Both KEDA and the HTTP Addon can be installed in either cluster-global or namespaced mode. In the former case, your `ScaledObject`s and `HTTPScaledObject`s (respectively) can be installed in any namespace, and one installation will detect and process it. In the latter case, you must install your `ScaledObject`s and `HTTPScaledObject`s in a specific namespace.
14+
15+
You have the option of installing KEDA and the HTTP Addon in either mode, but if you install one as cluster-global, the other must also be cluster-global. Similarly, if you install one as namespaced, the also must also be namespaced in the same namespace.
1116
## Installing KEDA
1217

1318
Before you install any of these components, you need to install KEDA. Below are simplified instructions for doing so with [Helm](https://helm.sh), but if you need anything more customized, please see the [official KEDA deployment documentation](https://keda.sh/docs/2.0/deploy/). If you need to install Helm, refer to the [installation guide](https://helm.sh/docs/intro/install/).
@@ -17,16 +22,19 @@ Before you install any of these components, you need to install KEDA. Below are
1722
```console
1823
helm repo add kedacore https://kedacore.github.io/charts
1924
helm repo update
20-
helm install keda kedacore/keda --namespace ${NAMESPACE} --set watchNamespace=${NAMESPACE} --create-namespace
25+
helm install keda kedacore/keda --namespace ${NAMESPACE} --create-namespace
2126
```
2227

28+
>The above command installs KEDA in cluster-global mode. Add `--set watchNamespace=<target namespace>` to install KEDA in namespaced mode.
29+
2330
## Install via Helm Chart
2431

2532
The Helm chart for this project is within KEDA's default helm repository at [kedacore/charts](http://github.com/kedacore/charts), you can install it by running:
2633

2734
```console
2835
helm install http-add-on kedacore/keda-add-ons-http --namespace ${NAMESPACE}
2936
```
37+
>The above command installed the HTTP Addon in cluster-global mode. Add `--set operator.watchNamespace=<target namespace>` to install the HTTP Addon in namepaced mode. If you do this, you must also install KEDA in namespaced mode and use the same target namespace.
3038
3139
>Installing the HTTP add on won't affect any running workloads in your cluster. You'll need to install an `HTTPScaledObject` for each individual `Deployment` you want to scale. For more on how to do that, please see the [walkthrough](./walkthrough.md).
3240

docs/walkthrough.md

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ helm install xkcd ./examples/xkcd -n ${NAMESPACE}
1616

1717
You'll need to clone the repository to get access to this chart. If you have your own `Deployment` and `Service` installed, you can go right to creating an `HTTPScaledObject` in the next section.
1818

19+
>If you are running KEDA and the HTTP Addon in cluster-global mode, you can install the XKCD chart in any namespace you choose. If you do so, make sure you add `--set ingressNamespace=${NAMESPACE}` to the above installation command.
20+
1921
>To remove the app, run `helm delete xkcd -n ${NAMESPACE}`
2022
2123
## Creating an `HTTPScaledObject`

examples/xkcd/templates/ingress.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ apiVersion: networking.k8s.io/v1
22
kind: Ingress
33
metadata:
44
name: {{ include "xkcd.fullname" . }}
5+
namespace: {{ .Values.ingressNamespace | default .Release.Namespace }}
56
annotations:
67
nginx.ingress.kubernetes.io/rewrite-target: /
78
kubernetes.io/ingress.class: nginx

examples/xkcd/values.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
replicaCount: 1
22
host: myhost.com
3+
# This is the namespace that the ingress should be installed
4+
# into. It should be set to the same namespace as the
5+
# KEDA HTTP componentry is installed in. Defaults to the Helm
6+
# chart release namespace
7+
ingressNamespace:
38
image:
49
repository: arschles/xkcd
510
pullPolicy: Always

go.mod

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ require (
88
github.com/golang/protobuf v1.5.2
99
github.com/kelseyhightower/envconfig v1.4.0
1010
github.com/magefile/mage v1.12.1
11-
github.com/mitchellh/hashstructure/v2 v2.0.2
1211
github.com/onsi/ginkgo v1.16.5
1312
github.com/onsi/gomega v1.17.0
1413
github.com/pkg/errors v0.9.1

go.sum

-2
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,6 @@ github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
286286
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
287287
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
288288
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
289-
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
290-
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
291289
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
292290
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
293291
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=

interceptor/config/timeouts.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (t *Timeouts) Backoff(factor, jitter float64, steps int) wait.Backoff {
4848

4949
// DefaultBackoff calls t.Backoff with reasonable defaults and returns
5050
// the result
51-
func (t *Timeouts) DefaultBackoff() wait.Backoff {
51+
func (t Timeouts) DefaultBackoff() wait.Backoff {
5252
return t.Backoff(2, 0.5, 5)
5353
}
5454

interceptor/config/validate.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package config
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
func Validate(srvCfg Serving, timeoutsCfg Timeouts) error {
9+
deplCachePollInterval := time.Duration(srvCfg.DeploymentCachePollIntervalMS) * time.Millisecond
10+
if timeoutsCfg.DeploymentReplicas < deplCachePollInterval {
11+
return fmt.Errorf(
12+
"deployment replicas timeout (%s) should not be less than the Deployment Cache Poll Interval (%s)",
13+
timeoutsCfg.DeploymentReplicas,
14+
deplCachePollInterval,
15+
)
16+
17+
}
18+
return nil
19+
}

interceptor/forward_wait_func.go

+29-14
Original file line numberDiff line numberDiff line change
@@ -3,41 +3,56 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"log"
76

7+
"github.com/go-logr/logr"
88
"github.com/kedacore/http-add-on/pkg/k8s"
99
appsv1 "k8s.io/api/apps/v1"
1010
)
1111

12-
type forwardWaitFunc func(context.Context, string) error
12+
// forwardWaitFunc is a function that waits for a condition
13+
// before proceeding to serve the request.
14+
type forwardWaitFunc func(context.Context, string, string) error
15+
16+
func deploymentCanServe(depl appsv1.Deployment) bool {
17+
return depl.Status.ReadyReplicas > 0
18+
}
1319

1420
func newDeployReplicasForwardWaitFunc(
21+
lggr logr.Logger,
1522
deployCache k8s.DeploymentCache,
1623
) forwardWaitFunc {
17-
return func(ctx context.Context, deployName string) error {
18-
deployment, err := deployCache.Get(deployName)
24+
return func(ctx context.Context, deployNS, deployName string) error {
25+
// get a watcher & its result channel before querying the
26+
// deployment cache, to ensure we don't miss events
27+
watcher := deployCache.Watch(deployNS, deployName)
28+
eventCh := watcher.ResultChan()
29+
defer watcher.Stop()
30+
31+
deployment, err := deployCache.Get(deployNS, deployName)
1932
if err != nil {
2033
// if we didn't get the initial deployment state, bail out
21-
return fmt.Errorf("error getting state for deployment %s (%s)", deployName, err)
34+
return fmt.Errorf(
35+
"error getting state for deployment %s/%s (%s)",
36+
deployNS,
37+
deployName,
38+
err,
39+
)
2240
}
2341
// if there is 1 or more replica, we're done waiting
24-
if deployment.Status.ReadyReplicas > 0 {
42+
if deploymentCanServe(deployment) {
2543
return nil
2644
}
27-
watcher := deployCache.Watch(deployName)
28-
if err != nil {
29-
return fmt.Errorf("error getting the stream of deployment changes")
30-
}
31-
defer watcher.Stop()
32-
eventCh := watcher.ResultChan()
45+
3346
for {
3447
select {
3548
case event := <-eventCh:
3649
deployment, ok := event.Object.(*appsv1.Deployment)
3750
if !ok {
38-
log.Println("Didn't get a deployment back in event")
51+
lggr.Info(
52+
"Didn't get a deployment back in event",
53+
)
3954
}
40-
if deployment.Status.ReadyReplicas > 0 {
55+
if deploymentCanServe(*deployment) {
4156
return nil
4257
}
4358
case <-ctx.Done():

interceptor/forward_wait_func_test.go

+29-25
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/go-logr/logr"
89
"github.com/kedacore/http-add-on/pkg/k8s"
910
"github.com/stretchr/testify/require"
1011
"golang.org/x/sync/errgroup"
11-
appsv1 "k8s.io/api/apps/v1"
1212
corev1 "k8s.io/api/core/v1"
1313
"k8s.io/apimachinery/pkg/watch"
1414
)
@@ -22,28 +22,28 @@ func TestForwardWaitFuncOneReplica(t *testing.T) {
2222
r := require.New(t)
2323
const ns = "testNS"
2424
const deployName = "TestForwardingHandlerDeploy"
25-
cache := k8s.NewMemoryDeploymentCache(map[string]appsv1.Deployment{
26-
deployName: *newDeployment(
27-
ns,
28-
deployName,
29-
"myimage",
30-
[]int32{123},
31-
nil,
32-
map[string]string{},
33-
corev1.PullAlways,
34-
),
35-
})
25+
cache := k8s.NewFakeDeploymentCache()
26+
cache.AddDeployment(*newDeployment(
27+
ns,
28+
deployName,
29+
"myimage",
30+
[]int32{123},
31+
nil,
32+
map[string]string{},
33+
corev1.PullAlways,
34+
))
3635

3736
ctx, done := context.WithTimeout(ctx, waitFuncWait)
3837
defer done()
3938
group, ctx := errgroup.WithContext(ctx)
4039

4140
waitFunc := newDeployReplicasForwardWaitFunc(
41+
logr.Discard(),
4242
cache,
4343
)
4444

4545
group.Go(func() error {
46-
return waitFunc(ctx, deployName)
46+
return waitFunc(ctx, ns, deployName)
4747
})
4848
r.NoError(group.Wait(), "wait function failed, but it shouldn't have")
4949
}
@@ -66,17 +66,17 @@ func TestForwardWaitFuncNoReplicas(t *testing.T) {
6666
corev1.PullAlways,
6767
)
6868
deployment.Status.ReadyReplicas = 0
69-
cache := k8s.NewMemoryDeploymentCache(map[string]appsv1.Deployment{
70-
deployName: *deployment,
71-
})
69+
cache := k8s.NewFakeDeploymentCache()
70+
cache.AddDeployment(*deployment)
7271

7372
ctx, done := context.WithTimeout(ctx, waitFuncWait)
7473
defer done()
7574
waitFunc := newDeployReplicasForwardWaitFunc(
75+
logr.Discard(),
7676
cache,
7777
)
7878

79-
err := waitFunc(ctx, deployName)
79+
err := waitFunc(ctx, ns, deployName)
8080
r.Error(err)
8181
}
8282

@@ -97,25 +97,29 @@ func TestWaitFuncWaitsUntilReplicas(t *testing.T) {
9797
corev1.PullAlways,
9898
)
9999
deployment.Spec.Replicas = k8s.Int32P(0)
100-
cache := k8s.NewMemoryDeploymentCache(map[string]appsv1.Deployment{
101-
deployName: *deployment,
102-
})
100+
cache := k8s.NewFakeDeploymentCache()
101+
cache.AddDeployment(*deployment)
102+
// create a watcher first so that the goroutine
103+
// can later fetch it and send a message on it
104+
cache.Watch(ns, deployName)
105+
103106
ctx, done := context.WithTimeout(ctx, totalWaitDur)
104-
defer done()
105107
waitFunc := newDeployReplicasForwardWaitFunc(
108+
logr.Discard(),
106109
cache,
107110
)
111+
108112
// this channel will be closed immediately after the replicas were increased
109113
replicasIncreasedCh := make(chan struct{})
110114
go func() {
111115
time.Sleep(totalWaitDur / 2)
112-
cache.RWM.RLock()
113-
defer cache.RWM.RUnlock()
114-
watcher := cache.Watchers[deployName]
116+
watcher := cache.GetWatcher(ns, deployName)
117+
r.NotNil(watcher, "watcher was not found")
115118
modifiedDeployment := deployment.DeepCopy()
116119
modifiedDeployment.Spec.Replicas = k8s.Int32P(1)
117120
watcher.Action(watch.Modified, modifiedDeployment)
118121
close(replicasIncreasedCh)
119122
}()
120-
r.NoError(waitFunc(ctx, deployName))
123+
r.NoError(waitFunc(ctx, ns, deployName))
124+
done()
121125
}

interceptor/main.go

+17-13
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,20 @@ func main() {
3434
}
3535
timeoutCfg := config.MustParseTimeouts()
3636
servingCfg := config.MustParseServing()
37+
if err := config.Validate(*servingCfg, *timeoutCfg); err != nil {
38+
lggr.Error(err, "invalid configuration")
39+
os.Exit(1)
40+
}
3741
ctx, ctxDone := context.WithCancel(
3842
context.Background(),
3943
)
44+
lggr.Info(
45+
"starting interceptor",
46+
"timeoutConfig",
47+
timeoutCfg,
48+
"servingConfig",
49+
servingCfg,
50+
)
4051

4152
proxyPort := servingCfg.ProxyPort
4253
adminPort := servingCfg.AdminPort
@@ -51,13 +62,10 @@ func main() {
5162
lggr.Error(err, "creating new Kubernetes ClientSet")
5263
os.Exit(1)
5364
}
54-
deployInterface := cl.AppsV1().Deployments(
55-
servingCfg.CurrentNamespace,
56-
)
57-
deployCache, err := k8s.NewK8sDeploymentCache(
58-
ctx,
65+
deployCache := k8s.NewInformerBackedDeploymentCache(
5966
lggr,
60-
deployInterface,
67+
cl,
68+
time.Millisecond*time.Duration(servingCfg.DeploymentCachePollIntervalMS),
6169
)
6270
if err != nil {
6371
lggr.Error(err, "creating new deployment cache")
@@ -66,7 +74,7 @@ func main() {
6674

6775
configMapsInterface := cl.CoreV1().ConfigMaps(servingCfg.CurrentNamespace)
6876

69-
waitFunc := newDeployReplicasForwardWaitFunc(deployCache)
77+
waitFunc := newDeployReplicasForwardWaitFunc(lggr, deployCache)
7078

7179
lggr.Info("Interceptor starting")
7280

@@ -101,11 +109,7 @@ func main() {
101109
// start the deployment cache updater
102110
errGrp.Go(func() error {
103111
defer ctxDone()
104-
err := deployCache.StartWatcher(
105-
ctx,
106-
lggr,
107-
time.Duration(servingCfg.DeploymentCachePollIntervalMS)*time.Millisecond,
108-
)
112+
err := deployCache.Start(ctx)
109113
lggr.Error(err, "deployment cache watcher failed")
110114
return err
111115
})
@@ -121,7 +125,6 @@ func main() {
121125
configMapInformer,
122126
servingCfg.CurrentNamespace,
123127
routingTable,
124-
q,
125128
nil,
126129
)
127130
lggr.Error(err, "config map routing table updater failed")
@@ -246,6 +249,7 @@ func runProxyServer(
246249
routingTable,
247250
dialContextFunc,
248251
waitFunc,
252+
routing.ServiceURL,
249253
newForwardingConfigFromTimeouts(timeouts),
250254
),
251255
)

0 commit comments

Comments
 (0)