-
Notifications
You must be signed in to change notification settings - Fork 554
[Feature] Add timeout for apiserver grpc server #3427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
c677b80
9cbf138
33cf5da
d34686f
ebb5ba4
cade329
a8fee22
d4a554a
f1b5baf
f8ad90f
4c897aa
aa7fcb5
6425633
79a6349
54c8d57
897ad01
47f872a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -11,6 +11,7 @@ import ( | |||
"path" | ||||
"strings" | ||||
"sync/atomic" | ||||
"time" | ||||
|
||||
assetfs "github.com/elazarl/go-bindata-assetfs" | ||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||||
|
@@ -50,11 +51,21 @@ func main() { | |||
_ = flagSet.Set("log_file", *logFile) | ||||
} | ||||
|
||||
grpcTimeout := 60 * time.Second // Default timeout | ||||
if timeoutStr := os.Getenv("GRPC_SERVER_TIMEOUT"); timeoutStr != "" { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw why do we use env var instead of flags? I think flags are strictly better in a few ways:
I almost only use env variables when:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the guidance! The reason why I put it in environment variable instead of flag is because I search through the code base and find they put this (which I think is a bit similar to timeout?) in the environment variable, so I just simply follow what it does
I agree to your points, if there's no other places that need this value, I think I'll just move it to flag instead There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved to flag |
||||
if timeout, err := time.ParseDuration(timeoutStr); err == nil { | ||||
grpcTimeout = timeout | ||||
klog.Infof("gRPC servier timeout set to %v", grpcTimeout) | ||||
} else { | ||||
klog.Warningf("Invalid GRPC_SERVER_TIMEOUT value: %v, using default timeout (60 seconds)", err) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! Just added |
||||
} | ||||
} | ||||
|
||||
clientManager := manager.NewClientManager() | ||||
resourceManager := manager.NewResourceManager(&clientManager) | ||||
|
||||
atomic.StoreInt32(&healthy, 1) | ||||
go startRPCServer(resourceManager) | ||||
go startRPCServer(resourceManager, grpcTimeout) | ||||
startHttpProxy() | ||||
// See also https://gist.github.com/enricofoltran/10b4a980cd07cb02836f70a4ab3e72d7 | ||||
quit := make(chan os.Signal, 1) | ||||
|
@@ -70,7 +81,7 @@ func main() { | |||
|
||||
type RegisterHttpHandlerFromEndpoint func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error | ||||
|
||||
func startRPCServer(resourceManager *manager.ResourceManager) { | ||||
func startRPCServer(resourceManager *manager.ResourceManager, grpcTimeout time.Duration) { | ||||
klog.Infof("Starting gRPC server at port %s", *rpcPortFlag) | ||||
|
||||
listener, err := net.Listen("tcp", *rpcPortFlag) | ||||
|
@@ -86,8 +97,13 @@ func startRPCServer(resourceManager *manager.ResourceManager) { | |||
|
||||
s := grpc.NewServer( | ||||
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), | ||||
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(grpc_prometheus.UnaryServerInterceptor, interceptor.APIServerInterceptor)), | ||||
grpc.MaxRecvMsgSize(math.MaxInt32)) | ||||
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( | ||||
interceptor.TimeoutInterceptor(grpcTimeout), | ||||
grpc_prometheus.UnaryServerInterceptor, | ||||
interceptor.APIServerInterceptor, | ||||
)), | ||||
grpc.MaxRecvMsgSize(math.MaxInt32), | ||||
) | ||||
api.RegisterClusterServiceServer(s, clusterServer) | ||||
api.RegisterComputeTemplateServiceServer(s, templateServer) | ||||
api.RegisterRayJobServiceServer(s, jobServer) | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,8 @@ package interceptor | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"google.golang.org/grpc" | ||
klog "k8s.io/klog/v2" | ||
|
@@ -19,3 +21,41 @@ func APIServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary | |
klog.Infof("%v handler finished", info.FullMethod) | ||
return | ||
} | ||
|
||
// TimeoutInterceptor implements UnaryServerInterceptor that sets the timeout for the request | ||
func TimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor { | ||
dentiny marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return func( | ||
ctx context.Context, | ||
req interface{}, | ||
_ *grpc.UnaryServerInfo, | ||
handler grpc.UnaryHandler, | ||
) (interface{}, error) { | ||
// Create a context with timeout | ||
ctx, cancel := context.WithTimeout(ctx, timeout) | ||
dentiny marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer cancel() | ||
dentiny marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Channel to capture execution result | ||
done := make(chan struct{}) | ||
var ( | ||
resp interface{} | ||
err error | ||
) | ||
|
||
go func() { | ||
resp, err = handler(ctx, req) | ||
close(done) | ||
}() | ||
|
||
select { | ||
case <-ctx.Done(): | ||
// Raise error if time out | ||
if ctx.Err() == context.DeadlineExceeded { | ||
return nil, fmt.Errorf("grpc server timed out") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we name the grpc server with KubeRay API server ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure! Just changed |
||
} | ||
return nil, ctx.Err() | ||
case <-done: | ||
// Handler finished | ||
return resp, err | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we following mechanisms to define constants or we are adding to each files where we are using it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quickly glancing over the code, we have
constants.go
for other components (i.e. operator)https://github.com/ray-project/kuberay/blob/ebb5ba441b0a7f888c17aa5c2d33943084a9a2d9/ray-operator/controllers/ray/utils/constant.go
I usually do this in two ways:
getGrpcServerTimeoutOrDefault
and have default timeout besidesOur codebase seems to prefer (1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I found that in
apiserver
, they put constants inconfig.go
, I'll add it herekuberay/apiserver/pkg/util/config.go
Lines 10 to 14 in a83d3c1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added