Skip to content

Commit fc98060

Browse files
asw101arschles
authored andcommitted
Return custom header when request was returned from a cold start (kedacore#366)
* Add X-KEDA-HTTP-Cold-Start header - Add X-KEDA-HTTP-Cold-Start header - Change signature of forwardWaitFunc to include deployment.Status.ReadyReplicas Signed-off-by: Aaron Wislang <[email protected]> * Update tests for X-KEDA-HTTP-Cold-Start header Signed-off-by: Aaron Wislang <[email protected]> * Update interceptor/main_test.go Co-authored-by: Aaron Schlesinger <[email protected]> Signed-off-by: Aaron Wislang <[email protected]> * add bracket Signed-off-by: Aaron Wislang <[email protected]> Co-authored-by: Aaron Schlesinger <[email protected]> Signed-off-by: Marco Piovesana <[email protected]>
1 parent c3960df commit fc98060

File tree

5 files changed

+47
-28
lines changed

5 files changed

+47
-28
lines changed

interceptor/forward_wait_func.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
// forwardWaitFunc is a function that waits for a condition
1313
// before proceeding to serve the request.
14-
type forwardWaitFunc func(context.Context, string, string) error
14+
type forwardWaitFunc func(context.Context, string, string) (int, error)
1515

1616
func deploymentCanServe(depl appsv1.Deployment) bool {
1717
return depl.Status.ReadyReplicas > 0
@@ -21,7 +21,7 @@ func newDeployReplicasForwardWaitFunc(
2121
lggr logr.Logger,
2222
deployCache k8s.DeploymentCache,
2323
) forwardWaitFunc {
24-
return func(ctx context.Context, deployNS, deployName string) error {
24+
return func(ctx context.Context, deployNS, deployName string) (int, error) {
2525
// get a watcher & its result channel before querying the
2626
// deployment cache, to ensure we don't miss events
2727
watcher := deployCache.Watch(deployNS, deployName)
@@ -31,7 +31,7 @@ func newDeployReplicasForwardWaitFunc(
3131
deployment, err := deployCache.Get(deployNS, deployName)
3232
if err != nil {
3333
// if we didn't get the initial deployment state, bail out
34-
return fmt.Errorf(
34+
return 0, fmt.Errorf(
3535
"error getting state for deployment %s/%s (%s)",
3636
deployNS,
3737
deployName,
@@ -40,7 +40,7 @@ func newDeployReplicasForwardWaitFunc(
4040
}
4141
// if there is 1 or more replica, we're done waiting
4242
if deploymentCanServe(deployment) {
43-
return nil
43+
return int(deployment.Status.ReadyReplicas), nil
4444
}
4545

4646
for {
@@ -51,14 +51,13 @@ func newDeployReplicasForwardWaitFunc(
5151
lggr.Info(
5252
"Didn't get a deployment back in event",
5353
)
54-
}
55-
if deploymentCanServe(*deployment) {
56-
return nil
54+
} else if deploymentCanServe(*deployment) {
55+
return 0, nil
5756
}
5857
case <-ctx.Done():
5958
// otherwise, if the context is marked done before
6059
// we're done waiting, fail.
61-
return fmt.Errorf(
60+
return 0, fmt.Errorf(
6261
"context marked done while waiting for deployment %s to reach > 0 replicas (%w)",
6362
deployName,
6463
ctx.Err(),

interceptor/forward_wait_func_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ func TestForwardWaitFuncOneReplica(t *testing.T) {
4343
)
4444

4545
group.Go(func() error {
46-
return waitFunc(ctx, ns, deployName)
46+
_, err := waitFunc(ctx, ns, deployName)
47+
return err
4748
})
4849
r.NoError(group.Wait(), "wait function failed, but it shouldn't have")
4950
}
@@ -76,7 +77,7 @@ func TestForwardWaitFuncNoReplicas(t *testing.T) {
7677
cache,
7778
)
7879

79-
err := waitFunc(ctx, ns, deployName)
80+
_, err := waitFunc(ctx, ns, deployName)
8081
r.Error(err)
8182
}
8283

@@ -120,6 +121,7 @@ func TestWaitFuncWaitsUntilReplicas(t *testing.T) {
120121
watcher.Action(watch.Modified, modifiedDeployment)
121122
close(replicasIncreasedCh)
122123
}()
123-
r.NoError(waitFunc(ctx, ns, deployName))
124+
_, err := waitFunc(ctx, ns, deployName)
125+
r.NoError(err)
124126
done()
125127
}

interceptor/main_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ func TestRunProxyServerCountMiddleware(t *testing.T) {
6666
)
6767
timeouts := &config.Timeouts{}
6868
waiterCh := make(chan struct{})
69-
waitFunc := func(ctx context.Context, ns, name string) error {
69+
waitFunc := func(ctx context.Context, ns, name string) (int, error) {
7070
<-waiterCh
71-
return nil
71+
return 1, nil
7272
}
7373
g.Go(func() error {
7474
return runProxyServer(
@@ -106,6 +106,9 @@ func TestRunProxyServerCountMiddleware(t *testing.T) {
106106
resp.StatusCode,
107107
)
108108
}
109+
if resp.Header.Get("X-KEDA-HTTP-Cold-Start") != "false" {
110+
return fmt.Errorf("expected X-KEDA-HTTP-Cold-Start false, but got %s", resp.Header.Get("X-KEDA-HTTP-Cold-Start"))
111+
}
109112
return nil
110113
})
111114
time.Sleep(100 * time.Millisecond)

interceptor/proxy_handlers.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,12 @@ func newForwardingHandler(
7474

7575
waitFuncCtx, done := context.WithTimeout(r.Context(), fwdCfg.waitTimeout)
7676
defer done()
77-
if err := waitFunc(
77+
replicas, err := waitFunc(
7878
waitFuncCtx,
7979
routingTarget.Namespace,
8080
routingTarget.Deployment,
81-
); err != nil {
81+
)
82+
if err != nil {
8283
lggr.Error(err, "wait function failed, not forwarding request")
8384
w.WriteHeader(502)
8485
w.Write([]byte(fmt.Sprintf("error on backend (%s)", err)))
@@ -91,6 +92,11 @@ func newForwardingHandler(
9192
w.Write([]byte("error getting backend service URL"))
9293
return
9394
}
95+
isColdStart := "false"
96+
if replicas == 0 {
97+
isColdStart = "true"
98+
}
99+
w.Header().Add("X-KEDA-HTTP-Cold-Start", isColdStart)
94100
forwardRequest(w, r, roundTripper, targetSvcURL)
95101
})
96102
}

interceptor/proxy_handlers_test.go

+22-13
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ func TestImmediatelySuccessfulProxy(t *testing.T) {
4545

4646
timeouts := defaultTimeouts()
4747
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
48-
waitFunc := func(context.Context, string, string) error {
49-
return nil
48+
waitFunc := func(context.Context, string, string) (int, error) {
49+
return 1, nil
5050
}
5151
hdl := newForwardingHandler(
5252
logr.Discard(),
@@ -68,6 +68,7 @@ func TestImmediatelySuccessfulProxy(t *testing.T) {
6868

6969
hdl.ServeHTTP(res, req)
7070

71+
r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false")
7172
r.Equal(200, res.Code, "expected response code 200")
7273
r.Equal("test response", res.Body.String())
7374
}
@@ -85,8 +86,8 @@ func TestWaitFailedConnection(t *testing.T) {
8586
timeouts,
8687
backoff,
8788
)
88-
waitFunc := func(context.Context, string, string) error {
89-
return nil
89+
waitFunc := func(context.Context, string, string) (int, error) {
90+
return 1, nil
9091
}
9192
routingTable := routing.NewTable()
9293
routingTable.AddTarget(host, routing.NewTarget(
@@ -117,6 +118,7 @@ func TestWaitFailedConnection(t *testing.T) {
117118

118119
hdl.ServeHTTP(res, req)
119120

121+
r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false")
120122
r.Equal(502, res.Code, "response code was unexpected")
121123
}
122124

@@ -166,13 +168,19 @@ func TestTimesOutOnWaitFunc(t *testing.T) {
166168

167169
t.Logf("elapsed time was %s", elapsed)
168170
// serving should take at least timeouts.DeploymentReplicas, but no more than
169-
// timeouts.DeploymentReplicas*2
170-
// elapsed time should be more than the deployment replicas wait time
171-
// but not an amount that is much greater than that
171+
// timeouts.DeploymentReplicas*4
172172
r.GreaterOrEqual(elapsed, timeouts.DeploymentReplicas)
173173
r.LessOrEqual(elapsed, timeouts.DeploymentReplicas*4)
174174
r.Equal(502, res.Code, "response code was unexpected")
175175

176+
// we will always return the X-KEDA-HTTP-Cold-Start header
177+
// when we are able to forward the
178+
// request to the backend but not if we have failed due
179+
// to a timeout from a waitFunc or earlier in the pipeline,
180+
// for example, if we cannot reach the Kubernetes control
181+
// plane.
182+
r.Equal("", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start to be empty")
183+
176184
// waitFunc should have been called, even though it timed out
177185
waitFuncCalled := false
178186
select {
@@ -277,8 +285,8 @@ func TestWaitHeaderTimeout(t *testing.T) {
277285

278286
timeouts := defaultTimeouts()
279287
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
280-
waitFunc := func(context.Context, string, string) error {
281-
return nil
288+
waitFunc := func(context.Context, string, string) (int, error) {
289+
return 1, nil
282290
}
283291
routingTable := routing.NewTable()
284292
target := routing.NewTarget(
@@ -309,6 +317,7 @@ func TestWaitHeaderTimeout(t *testing.T) {
309317

310318
hdl.ServeHTTP(res, req)
311319

320+
r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false")
312321
r.Equal(502, res.Code, "response code was unexpected")
313322
close(originHdlCh)
314323
}
@@ -346,19 +355,19 @@ func waitForSignal(sig <-chan struct{}, waitDur time.Duration) error {
346355
// is called, or the context that is passed to it is done (e.g. cancelled, timed out,
347356
// etc...). in the former case, the returned func itself returns nil. in the latter,
348357
// it returns ctx.Err()
349-
func notifyingFunc() (func(context.Context, string, string) error, <-chan struct{}, func()) {
358+
func notifyingFunc() (forwardWaitFunc, <-chan struct{}, func()) {
350359
calledCh := make(chan struct{})
351360
finishCh := make(chan struct{})
352361
finishFunc := func() {
353362
close(finishCh)
354363
}
355-
return func(ctx context.Context, _, _ string) error {
364+
return func(ctx context.Context, _, _ string) (int, error) {
356365
close(calledCh)
357366
select {
358367
case <-finishCh:
359-
return nil
368+
return 0, nil
360369
case <-ctx.Done():
361-
return fmt.Errorf("TEST FUNCTION CONTEXT ERROR: %w", ctx.Err())
370+
return 0, fmt.Errorf("TEST FUNCTION CONTEXT ERROR: %w", ctx.Err())
362371
}
363372
}, calledCh, finishFunc
364373
}

0 commit comments

Comments
 (0)