From 9ff5e59f308b8121ea0c727e7518b149b8cf5cbd Mon Sep 17 00:00:00 2001 From: Aaron Wislang Date: Fri, 14 Jan 2022 17:54:53 -0500 Subject: [PATCH 1/4] Add X-KEDA-HTTP-Cold-Start header - Add X-KEDA-HTTP-Cold-Start header - Change signature of forwardWaitFunc to include deployment.Status.ReadyReplicas Signed-off-by: Aaron Wislang --- interceptor/forward_wait_func.go | 15 +++++++-------- interceptor/proxy_handlers.go | 10 ++++++++-- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/interceptor/forward_wait_func.go b/interceptor/forward_wait_func.go index 7dc23cb71..50a850437 100644 --- a/interceptor/forward_wait_func.go +++ b/interceptor/forward_wait_func.go @@ -11,7 +11,7 @@ import ( // forwardWaitFunc is a function that waits for a condition // before proceeding to serve the request. -type forwardWaitFunc func(context.Context, string, string) error +type forwardWaitFunc func(context.Context, string, string) (int, error) func deploymentCanServe(depl appsv1.Deployment) bool { return depl.Status.ReadyReplicas > 0 @@ -21,7 +21,7 @@ func newDeployReplicasForwardWaitFunc( lggr logr.Logger, deployCache k8s.DeploymentCache, ) forwardWaitFunc { - return func(ctx context.Context, deployNS, deployName string) error { + return func(ctx context.Context, deployNS, deployName string) (int, error) { // get a watcher & its result channel before querying the // deployment cache, to ensure we don't miss events watcher := deployCache.Watch(deployNS, deployName) @@ -31,7 +31,7 @@ func newDeployReplicasForwardWaitFunc( deployment, err := deployCache.Get(deployNS, deployName) if err != nil { // if we didn't get the initial deployment state, bail out - return fmt.Errorf( + return 0, fmt.Errorf( "error getting state for deployment %s/%s (%s)", deployNS, deployName, @@ -40,7 +40,7 @@ func newDeployReplicasForwardWaitFunc( } // if there is 1 or more replica, we're done waiting if deploymentCanServe(deployment) { - return nil + return int(deployment.Status.ReadyReplicas), nil } for { @@ -51,14 +51,13 @@ func newDeployReplicasForwardWaitFunc( lggr.Info( "Didn't get a deployment back in event", ) - } - if deploymentCanServe(*deployment) { - return nil + } else if deploymentCanServe(*deployment) { + return 0, nil } case <-ctx.Done(): // otherwise, if the context is marked done before // we're done waiting, fail. - return fmt.Errorf( + return 0, fmt.Errorf( "context marked done while waiting for deployment %s to reach > 0 replicas (%w)", deployName, ctx.Err(), diff --git a/interceptor/proxy_handlers.go b/interceptor/proxy_handlers.go index 9f1b27840..1d3860098 100644 --- a/interceptor/proxy_handlers.go +++ b/interceptor/proxy_handlers.go @@ -74,11 +74,12 @@ func newForwardingHandler( waitFuncCtx, done := context.WithTimeout(r.Context(), fwdCfg.waitTimeout) defer done() - if err := waitFunc( + replicas, err := waitFunc( waitFuncCtx, routingTarget.Namespace, routingTarget.Deployment, - ); err != nil { + ) + if err != nil { lggr.Error(err, "wait function failed, not forwarding request") w.WriteHeader(502) w.Write([]byte(fmt.Sprintf("error on backend (%s)", err))) @@ -91,6 +92,11 @@ func newForwardingHandler( w.Write([]byte("error getting backend service URL")) return } + isColdStart := "false" + if replicas == 0 { + isColdStart = "true" + } + w.Header().Add("X-KEDA-HTTP-Cold-Start", isColdStart) forwardRequest(w, r, roundTripper, targetSvcURL) }) } From a9d4c42bd30731f875310abe60ecb606b8776a5f Mon Sep 17 00:00:00 2001 From: Aaron Wislang Date: Fri, 14 Jan 2022 18:17:52 -0500 Subject: [PATCH 2/4] Update tests for X-KEDA-HTTP-Cold-Start header Signed-off-by: Aaron Wislang --- interceptor/forward_wait_func_test.go | 8 +++--- interceptor/main_test.go | 8 ++++-- interceptor/proxy_handlers_test.go | 35 +++++++++++++++++---------- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/interceptor/forward_wait_func_test.go b/interceptor/forward_wait_func_test.go index 77bc10827..21959ba0e 100644 --- a/interceptor/forward_wait_func_test.go +++ b/interceptor/forward_wait_func_test.go @@ -43,7 +43,8 @@ func TestForwardWaitFuncOneReplica(t *testing.T) { ) group.Go(func() error { - return waitFunc(ctx, ns, deployName) + _, err := waitFunc(ctx, ns, deployName) + return err }) r.NoError(group.Wait(), "wait function failed, but it shouldn't have") } @@ -76,7 +77,7 @@ func TestForwardWaitFuncNoReplicas(t *testing.T) { cache, ) - err := waitFunc(ctx, ns, deployName) + _, err := waitFunc(ctx, ns, deployName) r.Error(err) } @@ -120,6 +121,7 @@ func TestWaitFuncWaitsUntilReplicas(t *testing.T) { watcher.Action(watch.Modified, modifiedDeployment) close(replicasIncreasedCh) }() - r.NoError(waitFunc(ctx, ns, deployName)) + _, err := waitFunc(ctx, ns, deployName) + r.NoError(err) done() } diff --git a/interceptor/main_test.go b/interceptor/main_test.go index 33c036d77..a0ab432e3 100644 --- a/interceptor/main_test.go +++ b/interceptor/main_test.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -66,9 +67,9 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { ) timeouts := &config.Timeouts{} waiterCh := make(chan struct{}) - waitFunc := func(ctx context.Context, ns, name string) error { + waitFunc := func(ctx context.Context, ns, name string) (int, error) { <-waiterCh - return nil + return 1, nil } g.Go(func() error { return runProxyServer( @@ -106,6 +107,9 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { resp.StatusCode, ) } + if resp.Header.Get("X-KEDA-HTTP-Cold-Start") != "false" { + return errors.New("expected X-KEDA-HTTP-Cold-Start false") + } return nil }) time.Sleep(100 * time.Millisecond) diff --git a/interceptor/proxy_handlers_test.go b/interceptor/proxy_handlers_test.go index 7af8de95b..788f10c57 100644 --- a/interceptor/proxy_handlers_test.go +++ b/interceptor/proxy_handlers_test.go @@ -45,8 +45,8 @@ func TestImmediatelySuccessfulProxy(t *testing.T) { timeouts := defaultTimeouts() dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff()) - waitFunc := func(context.Context, string, string) error { - return nil + waitFunc := func(context.Context, string, string) (int, error) { + return 1, nil } hdl := newForwardingHandler( logr.Discard(), @@ -68,6 +68,7 @@ func TestImmediatelySuccessfulProxy(t *testing.T) { hdl.ServeHTTP(res, req) + r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false") r.Equal(200, res.Code, "expected response code 200") r.Equal("test response", res.Body.String()) } @@ -85,8 +86,8 @@ func TestWaitFailedConnection(t *testing.T) { timeouts, backoff, ) - waitFunc := func(context.Context, string, string) error { - return nil + waitFunc := func(context.Context, string, string) (int, error) { + return 1, nil } routingTable := routing.NewTable() routingTable.AddTarget(host, routing.NewTarget( @@ -117,6 +118,7 @@ func TestWaitFailedConnection(t *testing.T) { hdl.ServeHTTP(res, req) + r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false") r.Equal(502, res.Code, "response code was unexpected") } @@ -166,13 +168,19 @@ func TestTimesOutOnWaitFunc(t *testing.T) { t.Logf("elapsed time was %s", elapsed) // serving should take at least timeouts.DeploymentReplicas, but no more than - // timeouts.DeploymentReplicas*2 - // elapsed time should be more than the deployment replicas wait time - // but not an amount that is much greater than that + // timeouts.DeploymentReplicas*4 r.GreaterOrEqual(elapsed, timeouts.DeploymentReplicas) r.LessOrEqual(elapsed, timeouts.DeploymentReplicas*4) r.Equal(502, res.Code, "response code was unexpected") + // we will always return the X-KEDA-HTTP-Cold-Start header + // when we are able to forward the + // request to the backend but not if we have failed due + // to a timeout from a waitFunc or earlier in the pipeline, + // for example, if we cannot reach the Kubernetes control + // plane. + r.Equal("", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start to be empty") + // waitFunc should have been called, even though it timed out waitFuncCalled := false select { @@ -277,8 +285,8 @@ func TestWaitHeaderTimeout(t *testing.T) { timeouts := defaultTimeouts() dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff()) - waitFunc := func(context.Context, string, string) error { - return nil + waitFunc := func(context.Context, string, string) (int, error) { + return 1, nil } routingTable := routing.NewTable() target := routing.NewTarget( @@ -309,6 +317,7 @@ func TestWaitHeaderTimeout(t *testing.T) { hdl.ServeHTTP(res, req) + r.Equal("false", res.Header().Get("X-KEDA-HTTP-Cold-Start"), "expected X-KEDA-HTTP-Cold-Start false") r.Equal(502, res.Code, "response code was unexpected") close(originHdlCh) } @@ -346,19 +355,19 @@ func waitForSignal(sig <-chan struct{}, waitDur time.Duration) error { // is called, or the context that is passed to it is done (e.g. cancelled, timed out, // etc...). in the former case, the returned func itself returns nil. in the latter, // it returns ctx.Err() -func notifyingFunc() (func(context.Context, string, string) error, <-chan struct{}, func()) { +func notifyingFunc() (forwardWaitFunc, <-chan struct{}, func()) { calledCh := make(chan struct{}) finishCh := make(chan struct{}) finishFunc := func() { close(finishCh) } - return func(ctx context.Context, _, _ string) error { + return func(ctx context.Context, _, _ string) (int, error) { close(calledCh) select { case <-finishCh: - return nil + return 0, nil case <-ctx.Done(): - return fmt.Errorf("TEST FUNCTION CONTEXT ERROR: %w", ctx.Err()) + return 0, fmt.Errorf("TEST FUNCTION CONTEXT ERROR: %w", ctx.Err()) } }, calledCh, finishFunc } From 88282bac664784cab67e5f884283ce3325238a78 Mon Sep 17 00:00:00 2001 From: Aaron Wislang Date: Wed, 19 Jan 2022 15:16:10 -0500 Subject: [PATCH 3/4] Update interceptor/main_test.go Co-authored-by: Aaron Schlesinger <70865+arschles@users.noreply.github.com> Signed-off-by: Aaron Wislang --- interceptor/main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interceptor/main_test.go b/interceptor/main_test.go index a0ab432e3..35a4d2e0a 100644 --- a/interceptor/main_test.go +++ b/interceptor/main_test.go @@ -108,7 +108,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { ) } if resp.Header.Get("X-KEDA-HTTP-Cold-Start") != "false" { - return errors.New("expected X-KEDA-HTTP-Cold-Start false") + return fmt.Errorf("expected X-KEDA-HTTP-Cold-Start false, but got %s", resp.Header.Get("X-KEDA-HTTP-Cold-Start)) } return nil }) From fc42e58ac6e741dbcc61dcc8e13bf8e82a81ca3b Mon Sep 17 00:00:00 2001 From: Aaron Wislang Date: Wed, 19 Jan 2022 15:25:28 -0500 Subject: [PATCH 4/4] add bracket Signed-off-by: Aaron Wislang --- interceptor/main_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/interceptor/main_test.go b/interceptor/main_test.go index 35a4d2e0a..c75c07793 100644 --- a/interceptor/main_test.go +++ b/interceptor/main_test.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -108,7 +107,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) { ) } if resp.Header.Get("X-KEDA-HTTP-Cold-Start") != "false" { - return fmt.Errorf("expected X-KEDA-HTTP-Cold-Start false, but got %s", resp.Header.Get("X-KEDA-HTTP-Cold-Start)) + return fmt.Errorf("expected X-KEDA-HTTP-Cold-Start false, but got %s", resp.Header.Get("X-KEDA-HTTP-Cold-Start")) } return nil })