-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconn_driver.go
151 lines (141 loc) · 3.27 KB
/
conn_driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package rrpc
import (
"context"
"errors"
"io"
"log"
"sync"
)
//the communication driver for client/server on one end of connection
type connDriver struct {
server *Server
client *Client
//codec write lock
wlock sync.Mutex
codec Codec
//wait group for outstanding calls to server
wg sync.WaitGroup
callLock sync.Mutex
actCalls map[uint64]context.CancelFunc
}
func newConnDriver(codec Codec) *connDriver {
return &connDriver{
codec: codec,
actCalls: make(map[uint64]context.CancelFunc),
}
}
func (cd *connDriver) Close() error {
cd.wlock.Lock()
defer cd.wlock.Unlock()
return cd.codec.Close()
}
func (cd *connDriver) AddCall(seq uint64, cancel context.CancelFunc) {
cd.callLock.Lock()
cd.actCalls[seq] = cancel
cd.callLock.Unlock()
}
func (cd *connDriver) CancelCall(seq uint64) {
cd.callLock.Lock()
cancel := cd.actCalls[seq]
cd.callLock.Unlock()
if cancel != nil {
cancel()
}
}
func (cd *connDriver) RemoveCall(seq uint64) {
cd.callLock.Lock()
cancel := cd.actCalls[seq]
cd.callLock.Unlock()
if cancel != nil {
cancel()
}
}
func (cd *connDriver) Loop() {
if cd.server != nil {
//register active codec/connDriver with server
cd.server.connLock.Lock()
if cd.server.closing {
cd.server.connLock.Unlock()
return
}
cd.server.actCodecs[cd] = struct{}{}
cd.server.connLock.Unlock()
}
var err error
var header *Header
if cd.server != nil {
header = cd.server.getHeader()
} else {
//for pure client, reuse a single header
header = &Header{}
}
for {
err = cd.codec.ReadHeader(header)
if err != nil {
//failed to decode Header, exit
if err == io.EOF || err == io.ErrUnexpectedEOF {
if debugLog {
log.Println("rpc:", err)
}
break
}
err = errors.New("rpc: cannot decode header: " + err.Error())
if debugLog {
log.Println("rpc:", err)
}
break
}
if header.Kind == Request || header.Kind == RequestWithContext || header.Kind == Cancel {
// Forward requests to server
if cd.server != nil {
err = cd.server.handleRequest(cd, header)
if err != nil && debugLog {
log.Println("rpc:", err)
}
//since header is freed inside server.handleRequest
//allocate a new header
header = cd.server.getHeader()
} else {
if debugLog {
log.Println("rpc: receive requests, but there is no server")
}
}
} else if header.Kind == Response || header.Kind == Error {
// Forwars reponses and errors to client
if cd.client != nil {
err = cd.client.handleResponse(cd.codec, header)
if err != nil && debugLog {
log.Println("rpc:", err)
}
if err != nil && cd.server == nil {
//pure client break out on 1st error
if debugLog {
log.Println("rpc: >>>client exit, ", err)
}
break
}
*header = Header{} //reset header
} else {
if debugLog {
log.Println("rpc: receive responses, but there is no client")
}
}
} else {
if debugLog {
log.Printf("rpc: invalid header.Kind: %v\n", header.Kind)
}
}
}
if cd.server != nil {
cd.server.freeHeader(header)
//wait for all outstanding calls
cd.server.connShutdown(cd)
}
if cd.client != nil {
//notify remaining clients
cd.client.connShutdown(err)
}
if debugLog {
log.Println("*** Loop() Exit, server=", cd.server != nil, ", client=", cd.client != nil)
}
}