@@ -39,44 +39,27 @@ import (
39
39
// ...
40
40
func StreamCanceler (ctx context.Context ) grpc.StreamServerInterceptor {
41
41
var (
42
- cancels = map [* context.CancelFunc ]struct {}{}
43
- cancelMu = new (sync.Mutex )
42
+ cancels sync.Map
44
43
canceling uint32
45
44
)
46
45
47
46
go func () {
48
47
<- ctx .Done ()
49
48
atomic .StoreUint32 (& canceling , 1 )
50
- cancelMu .Lock ()
51
- defer cancelMu .Unlock ()
52
- for cancel := range cancels {
49
+ cancels .Range (func (key any , value any ) bool {
50
+ cancel := key .(* context.CancelFunc )
53
51
(* cancel )()
54
- }
52
+ return true
53
+ })
55
54
}()
56
55
return grpc .StreamServerInterceptor (func (srv any , ss grpc.ServerStream , info * grpc.StreamServerInfo , handler grpc.StreamHandler ) error {
57
56
if atomic .LoadUint32 (& canceling ) == 1 {
58
57
return status .Error (codes .Unavailable , "server is stopping" )
59
58
}
60
- var (
61
- cctx = ss .Context ()
62
- cancel context.CancelFunc
63
- )
64
- cctx , cancel = context .WithCancel (cctx )
65
-
66
- // add the cancel function
67
- cancelMu .Lock ()
68
- cancels [& cancel ] = struct {}{}
69
- cancelMu .Unlock ()
70
-
71
- // invoke rpc
59
+ cctx , cancel := context .WithCancel (ss .Context ())
60
+ cancels .Store (& cancel , struct {}{})
72
61
err := handler (srv , NewWrappedServerStream (cctx , ss ))
73
-
74
- // remove the cancel function
75
- cancelMu .Lock ()
76
- delete (cancels , & cancel )
77
- cancelMu .Unlock ()
78
-
79
- // cleanup the WithCancel
62
+ cancels .Delete (& cancel )
80
63
cancel ()
81
64
82
65
return err
0 commit comments