Skip to content

Commit 0406086

Browse files
authored
Merge pull request #1634 from MartinForReal/shafan/grpc
Refactor: remove grpc wrapper
2 parents 6d52a19 + bd8e20b commit 0406086

File tree

8 files changed

+90
-214
lines changed

8 files changed

+90
-214
lines changed

pkg/azurefile/azurefile.go

+29-7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"context"
2222
"encoding/binary"
23+
"errors"
2324
"fmt"
2425
"net/url"
2526
"os/exec"
@@ -33,11 +34,12 @@ import (
3334
"github.com/container-storage-interface/spec/lib/go/csi"
3435
"github.com/pborman/uuid"
3536
"github.com/rubiojr/go-vhd/vhd"
37+
"google.golang.org/grpc"
3638
"google.golang.org/grpc/codes"
3739
"google.golang.org/grpc/status"
3840

3941
v1 "k8s.io/api/core/v1"
40-
"k8s.io/apimachinery/pkg/api/errors"
42+
apierrors "k8s.io/apimachinery/pkg/api/errors"
4143
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4244
"k8s.io/apimachinery/pkg/util/wait"
4345
"k8s.io/klog/v2"
@@ -248,6 +250,7 @@ type Driver struct {
248250
printVolumeStatsCallLogs bool
249251
fileClient *azureFileClient
250252
mounter *mount.SafeFormatAndMount
253+
server *grpc.Server
251254
// lock per volume attach (only for vhd disk feature)
252255
volLockMap *lockMap
253256
// only for nfs feature
@@ -353,7 +356,7 @@ func NewDriver(options *DriverOptions) *Driver {
353356
}
354357

355358
// Run driver initialization
356-
func (d *Driver) Run(endpoint, kubeconfig string, testBool bool) {
359+
func (d *Driver) Run(ctx context.Context, endpoint, kubeconfig string) error {
357360
versionMeta, err := GetVersionYAML(d.Name)
358361
if err != nil {
359362
klog.Fatalf("%v", err)
@@ -407,10 +410,29 @@ func (d *Driver) Run(endpoint, kubeconfig string, testBool bool) {
407410
}
408411
d.AddNodeServiceCapabilities(nodeCap)
409412

410-
s := csicommon.NewNonBlockingGRPCServer()
411-
// Driver d act as IdentityServer, ControllerServer and NodeServer
412-
s.Start(endpoint, d, d, d, testBool)
413-
s.Wait()
413+
//setup grpc server
414+
opts := []grpc.ServerOption{
415+
grpc.UnaryInterceptor(csicommon.LogGRPC),
416+
}
417+
server := grpc.NewServer(opts...)
418+
csi.RegisterIdentityServer(server, d)
419+
csi.RegisterControllerServer(server, d)
420+
csi.RegisterNodeServer(server, d)
421+
d.server = server
422+
423+
listener, err := csicommon.ListenEndpoint(endpoint)
424+
if err != nil {
425+
klog.Fatalf("failed to listen endpoint: %v", err)
426+
}
427+
go func() {
428+
<-ctx.Done()
429+
d.server.GracefulStop()
430+
}()
431+
if err = d.server.Serve(listener); errors.Is(err, grpc.ErrServerStopped) {
432+
klog.Infof("gRPC server stopped serving")
433+
return nil
434+
}
435+
return err
414436
}
415437

416438
// getFileShareQuota return (-1, nil) means file share does not exist
@@ -1160,7 +1182,7 @@ func (d *Driver) SetAzureCredentials(ctx context.Context, accountName, accountKe
11601182
Type: "Opaque",
11611183
}
11621184
_, err := d.cloud.KubeClient.CoreV1().Secrets(secretNamespace).Create(ctx, secret, metav1.CreateOptions{})
1163-
if errors.IsAlreadyExists(err) {
1185+
if apierrors.IsAlreadyExists(err) {
11641186
err = nil
11651187
}
11661188
if err != nil {

pkg/azurefile/azurefile_test.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"reflect"
2727
"sort"
2828
"testing"
29+
"time"
2930

3031
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
3132
azure2 "github.com/Azure/go-autorest/autorest/azure"
@@ -1052,7 +1053,15 @@ func TestRun(t *testing.T) {
10521053
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
10531054

10541055
d := NewFakeDriver()
1055-
d.Run("tcp://127.0.0.1:0", "", true)
1056+
ctx, cancelFn := context.WithCancel(context.Background())
1057+
go func() {
1058+
time.Sleep(1 * time.Second)
1059+
cancelFn()
1060+
}()
1061+
if err := d.Run(ctx, "tcp://127.0.0.1:0", ""); err != nil {
1062+
t.Error(err.Error())
1063+
}
1064+
10561065
},
10571066
},
10581067
{
@@ -1077,9 +1086,16 @@ func TestRun(t *testing.T) {
10771086
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
10781087

10791088
d := NewFakeDriver()
1089+
ctx, cancelFn := context.WithCancel(context.Background())
1090+
go func() {
1091+
time.Sleep(1 * time.Second)
1092+
cancelFn()
1093+
}()
10801094
d.cloud = &azure.Cloud{}
10811095
d.NodeID = ""
1082-
d.Run("tcp://127.0.0.1:0", "", true)
1096+
if err := d.Run(ctx, "tcp://127.0.0.1:0", ""); err != nil {
1097+
t.Error(err.Error())
1098+
}
10831099
},
10841100
},
10851101
}

pkg/azurefileplugin/main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ func handle() {
120120
if driver == nil {
121121
klog.Fatalln("Failed to initialize azurefile CSI Driver")
122122
}
123-
driver.Run(*endpoint, *kubeconfig, false)
123+
if err := driver.Run(context.Background(), *endpoint, *kubeconfig); err != nil {
124+
klog.Fatalln(err)
125+
}
124126
}
125127

126128
func exportMetrics() {

pkg/csi-common/server.go

-122
This file was deleted.

pkg/csi-common/server_test.go

-67
This file was deleted.

pkg/csi-common/utils.go

+26-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ package csicommon
1818

1919
import (
2020
"fmt"
21+
"net"
22+
"os"
23+
"runtime"
2124
"strings"
2225

2326
"golang.org/x/net/context"
@@ -28,7 +31,7 @@ import (
2831
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
2932
)
3033

31-
func ParseEndpoint(ep string) (string, string, error) {
34+
func parseEndpoint(ep string) (string, string, error) {
3235
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
3336
s := strings.SplitN(ep, "://", 2)
3437
if s[1] != "" {
@@ -37,6 +40,27 @@ func ParseEndpoint(ep string) (string, string, error) {
3740
}
3841
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
3942
}
43+
func ListenEndpoint(endpoint string) (net.Listener, error) {
44+
proto, addr, err := parseEndpoint(endpoint)
45+
if err != nil {
46+
klog.Fatal(err.Error())
47+
}
48+
49+
if proto == "unix" {
50+
if runtime.GOOS != "windows" {
51+
addr = "/" + addr
52+
}
53+
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
54+
klog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
55+
}
56+
}
57+
58+
listener, err := net.Listen(proto, addr)
59+
if err != nil {
60+
klog.Fatalf("Failed to listen: %v", err)
61+
}
62+
return listener, err
63+
}
4064

4165
func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *csi.VolumeCapability_AccessMode {
4266
return &csi.VolumeCapability_AccessMode{Mode: mode}
@@ -71,7 +95,7 @@ func getLogLevel(method string) int32 {
7195
return 2
7296
}
7397

74-
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
98+
func LogGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
7599
level := klog.Level(getLogLevel(info.FullMethod))
76100
klog.V(level).Infof("GRPC call: %s", info.FullMethod)
77101
klog.V(level).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))

0 commit comments

Comments
 (0)