Skip to content

Commit 4c95eb1

Browse files
author
Brian Tiger Chow
committed
refactor(net:message) add NetMessage interface
* Design Goal: reduce coupling * NB: Slices hold references to an underlying array, and if you assign one slice to another, both refer to the same array. If a function takes a slice argument, changes it makes to the elements of the slice will be visible to the caller, analogous to passing a pointer to the underlying array.
1 parent 71a19e1 commit 4c95eb1

File tree

6 files changed

+74
-57
lines changed

6 files changed

+74
-57
lines changed

net/message/message.go

+29-12
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,55 @@ import (
66
proto "code.google.com/p/goprotobuf/proto"
77
)
88

9-
// Message represents a packet of information sent to or received from a
9+
type NetMessage interface {
10+
Peer() *peer.Peer
11+
Data() []byte
12+
}
13+
14+
func New(p *peer.Peer, data []byte) NetMessage {
15+
return &message{peer: p, data: data}
16+
}
17+
18+
// message represents a packet of information sent to or received from a
1019
// particular Peer.
11-
type Message struct {
20+
type message struct {
1221
// To or from, depending on direction.
13-
Peer *peer.Peer
22+
peer *peer.Peer
1423

1524
// Opaque data
16-
Data []byte
25+
data []byte
26+
}
27+
28+
func (m *message) Peer() *peer.Peer {
29+
return m.peer
30+
}
31+
32+
func (m *message) Data() []byte {
33+
return m.data
1734
}
1835

1936
// FromObject creates a message from a protobuf-marshallable message.
20-
func FromObject(p *peer.Peer, data proto.Message) (*Message, error) {
37+
func FromObject(p *peer.Peer, data proto.Message) (*message, error) {
2138
bytes, err := proto.Marshal(data)
2239
if err != nil {
2340
return nil, err
2441
}
25-
return &Message{
26-
Peer: p,
27-
Data: bytes,
42+
return &message{
43+
peer: p,
44+
data: bytes,
2845
}, nil
2946
}
3047

3148
// Pipe objects represent a bi-directional message channel.
3249
type Pipe struct {
33-
Incoming chan *Message
34-
Outgoing chan *Message
50+
Incoming chan NetMessage
51+
Outgoing chan NetMessage
3552
}
3653

3754
// NewPipe constructs a pipe with channels of a given buffer size.
3855
func NewPipe(bufsize int) *Pipe {
3956
return &Pipe{
40-
Incoming: make(chan *Message, bufsize),
41-
Outgoing: make(chan *Message, bufsize),
57+
Incoming: make(chan NetMessage, bufsize),
58+
Outgoing: make(chan NetMessage, bufsize),
4259
}
4360
}

net/mux/mux.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,15 @@ func (m *Muxer) handleIncomingMessages(ctx context.Context) {
8787
}
8888

8989
// handleIncomingMessage routes message to the appropriate protocol.
90-
func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 *msg.Message) {
90+
func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 msg.NetMessage) {
9191

92-
data, pid, err := unwrapData(m1.Data)
92+
data, pid, err := unwrapData(m1.Data())
9393
if err != nil {
9494
u.PErr("muxer de-serializing error: %v\n", err)
9595
return
9696
}
9797

98-
m2 := &msg.Message{Peer: m1.Peer, Data: data}
98+
m2 := msg.New(m1.Peer(), data)
9999
proto, found := m.Protocols[pid]
100100
if !found {
101101
u.PErr("muxer unknown protocol %v\n", pid)
@@ -125,14 +125,14 @@ func (m *Muxer) handleOutgoingMessages(ctx context.Context, pid ProtocolID, prot
125125
}
126126

127127
// handleOutgoingMessage wraps out a message and sends it out the
128-
func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 *msg.Message) {
129-
data, err := wrapData(m1.Data, pid)
128+
func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 msg.NetMessage) {
129+
data, err := wrapData(m1.Data(), pid)
130130
if err != nil {
131131
u.PErr("muxer serializing error: %v\n", err)
132132
return
133133
}
134134

135-
m2 := &msg.Message{Peer: m1.Peer, Data: data}
135+
m2 := msg.New(m1.Peer(), data)
136136
select {
137137
case m.GetPipe().Outgoing <- m2:
138138
case <-ctx.Done():

net/mux/mux_test.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ func newPeer(t *testing.T, id string) *peer.Peer {
3232
return &peer.Peer{ID: peer.ID(mh)}
3333
}
3434

35-
func testMsg(t *testing.T, m *msg.Message, data []byte) {
36-
if !bytes.Equal(data, m.Data) {
37-
t.Errorf("Data does not match: %v != %v", data, m.Data)
35+
func testMsg(t *testing.T, m msg.NetMessage, data []byte) {
36+
if !bytes.Equal(data, m.Data()) {
37+
t.Errorf("Data does not match: %v != %v", data, m.Data())
3838
}
3939
}
4040

41-
func testWrappedMsg(t *testing.T, m *msg.Message, pid ProtocolID, data []byte) {
42-
data2, pid2, err := unwrapData(m.Data)
41+
func testWrappedMsg(t *testing.T, m msg.NetMessage, pid ProtocolID, data []byte) {
42+
data2, pid2, err := unwrapData(m.Data())
4343
if err != nil {
4444
t.Error(err)
4545
}
@@ -76,7 +76,7 @@ func TestSimpleMuxer(t *testing.T) {
7676

7777
// test outgoing p1
7878
for _, s := range []string{"foo", "bar", "baz"} {
79-
p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
79+
p1.Outgoing <- msg.New(peer1, []byte(s))
8080
testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s))
8181
}
8282

@@ -86,13 +86,13 @@ func TestSimpleMuxer(t *testing.T) {
8686
if err != nil {
8787
t.Error(err)
8888
}
89-
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
89+
mux1.Incoming <- msg.New(peer1, d)
9090
testMsg(t, <-p1.Incoming, []byte(s))
9191
}
9292

9393
// test outgoing p2
9494
for _, s := range []string{"foo", "bar", "baz"} {
95-
p2.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
95+
p2.Outgoing <- msg.New(peer1, []byte(s))
9696
testWrappedMsg(t, <-mux1.Outgoing, pid2, []byte(s))
9797
}
9898

@@ -102,7 +102,7 @@ func TestSimpleMuxer(t *testing.T) {
102102
if err != nil {
103103
t.Error(err)
104104
}
105-
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
105+
mux1.Incoming <- msg.New(peer1, d)
106106
testMsg(t, <-p2.Incoming, []byte(s))
107107
}
108108
}
@@ -139,7 +139,7 @@ func TestSimultMuxer(t *testing.T) {
139139
for i := 0; i < size; i++ {
140140
<-limiter
141141
s := fmt.Sprintf("proto %v out %v", pid, i)
142-
m := &msg.Message{Peer: peer1, Data: []byte(s)}
142+
m := msg.New(peer1, []byte(s))
143143
mux1.Protocols[pid].GetPipe().Outgoing <- m
144144
counts[pid][0][0]++
145145
u.DOut("sent %v\n", s)
@@ -156,7 +156,7 @@ func TestSimultMuxer(t *testing.T) {
156156
t.Error(err)
157157
}
158158

159-
m := &msg.Message{Peer: peer1, Data: d}
159+
m := msg.New(peer1, d)
160160
mux1.Incoming <- m
161161
counts[pid][1][0]++
162162
u.DOut("sent %v\n", s)
@@ -167,7 +167,7 @@ func TestSimultMuxer(t *testing.T) {
167167
for {
168168
select {
169169
case m := <-mux1.Outgoing:
170-
data, pid, err := unwrapData(m.Data)
170+
data, pid, err := unwrapData(m.Data())
171171
if err != nil {
172172
t.Error(err)
173173
}
@@ -186,7 +186,7 @@ func TestSimultMuxer(t *testing.T) {
186186
select {
187187
case m := <-mux1.Protocols[pid].GetPipe().Incoming:
188188
counts[pid][0][1]++
189-
u.DOut("got %v\n", string(m.Data))
189+
u.DOut("got %v\n", string(m.Data()))
190190
case <-ctx.Done():
191191
return
192192
}
@@ -239,7 +239,7 @@ func TestStopping(t *testing.T) {
239239

240240
// test outgoing p1
241241
for _, s := range []string{"foo", "bar", "baz"} {
242-
p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
242+
p1.Outgoing <- msg.New(peer1, []byte(s))
243243
testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s))
244244
}
245245

@@ -249,7 +249,7 @@ func TestStopping(t *testing.T) {
249249
if err != nil {
250250
t.Error(err)
251251
}
252-
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
252+
mux1.Incoming <- msg.New(peer1, d)
253253
testMsg(t, <-p1.Incoming, []byte(s))
254254
}
255255

@@ -260,7 +260,7 @@ func TestStopping(t *testing.T) {
260260

261261
// test outgoing p1
262262
for _, s := range []string{"foo", "bar", "baz"} {
263-
p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
263+
p1.Outgoing <- msg.New(peer1, []byte(s))
264264
select {
265265
case <-mux1.Outgoing:
266266
t.Error("should not have received anything.")
@@ -274,7 +274,7 @@ func TestStopping(t *testing.T) {
274274
if err != nil {
275275
t.Error(err)
276276
}
277-
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
277+
mux1.Incoming <- msg.New(peer1, d)
278278
select {
279279
case <-p1.Incoming:
280280
t.Error("should not have received anything.")

net/service/request.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type Request struct {
7575
PeerID peer.ID
7676

7777
// Response is the channel of incoming responses.
78-
Response chan *msg.Message
78+
Response chan msg.NetMessage
7979
}
8080

8181
// NewRequest creates a request for given peer.ID
@@ -88,7 +88,7 @@ func NewRequest(pid peer.ID) (*Request, error) {
8888
return &Request{
8989
ID: id,
9090
PeerID: pid,
91-
Response: make(chan *msg.Message, 1),
91+
Response: make(chan msg.NetMessage, 1),
9292
}, nil
9393
}
9494

net/service/service.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type Handler interface {
1616

1717
// HandleMessage receives an incoming message, and potentially returns
1818
// a response message to send back.
19-
HandleMessage(context.Context, *msg.Message) (*msg.Message, error)
19+
HandleMessage(context.Context, msg.NetMessage) (msg.NetMessage, error)
2020
}
2121

2222
// Service is a networking component that protocols can use to multiplex
@@ -69,16 +69,16 @@ func (s *Service) Stop() {
6969
}
7070

7171
// SendMessage sends a message out
72-
func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID) error {
72+
func (s *Service) SendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error {
7373

7474
// serialize ServiceMessage wrapper
75-
data, err := wrapData(m.Data, rid)
75+
data, err := wrapData(m.Data(), rid)
7676
if err != nil {
7777
return err
7878
}
7979

8080
// send message
81-
m2 := &msg.Message{Peer: m.Peer, Data: data}
81+
m2 := msg.New(m.Peer(), data)
8282
select {
8383
case s.Outgoing <- m2:
8484
case <-ctx.Done():
@@ -89,10 +89,10 @@ func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID
8989
}
9090

9191
// SendRequest sends a request message out and awaits a response.
92-
func (s *Service) SendRequest(ctx context.Context, m *msg.Message) (*msg.Message, error) {
92+
func (s *Service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
9393

9494
// create a request
95-
r, err := NewRequest(m.Peer.ID)
95+
r, err := NewRequest(m.Peer().ID)
9696
if err != nil {
9797
return nil, err
9898
}
@@ -145,14 +145,14 @@ func (s *Service) handleIncomingMessages(ctx context.Context) {
145145
}
146146
}
147147

148-
func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) {
148+
func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {
149149

150150
// unwrap the incoming message
151-
data, rid, err := unwrapData(m.Data)
151+
data, rid, err := unwrapData(m.Data())
152152
if err != nil {
153153
u.PErr("de-serializing error: %v\n", err)
154154
}
155-
m2 := &msg.Message{Peer: m.Peer, Data: data}
155+
m2 := msg.New(m.Peer(), data)
156156

157157
// if it's a request (or has no RequestID), handle it
158158
if rid == nil || rid.IsRequest() {
@@ -177,7 +177,7 @@ func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) {
177177
u.PErr("RequestID should identify a response here.\n")
178178
}
179179

180-
key := RequestKey(m.Peer.ID, RequestID(rid))
180+
key := RequestKey(m.Peer().ID, RequestID(rid))
181181
s.RequestsLock.RLock()
182182
r, found := s.Requests[key]
183183
s.RequestsLock.RUnlock()

net/service/service_test.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ import (
1515
// ReverseHandler reverses all Data it receives and sends it back.
1616
type ReverseHandler struct{}
1717

18-
func (t *ReverseHandler) HandleMessage(ctx context.Context, m *msg.Message) (
19-
*msg.Message, error) {
18+
func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) (
19+
msg.NetMessage, error) {
2020

21-
d := m.Data
21+
d := m.Data()
2222
for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 {
2323
d[i], d[j] = d[j], d[i]
2424
}
2525

26-
return &msg.Message{Peer: m.Peer, Data: d}, nil
26+
return msg.New(m.Peer(), d), nil
2727
}
2828

2929
func newPeer(t *testing.T, id string) *peer.Peer {
@@ -47,11 +47,11 @@ func TestServiceHandler(t *testing.T) {
4747
t.Error(err)
4848
}
4949

50-
m1 := &msg.Message{Peer: peer1, Data: d}
50+
m1 := msg.New(peer1, d)
5151
s.Incoming <- m1
5252
m2 := <-s.Outgoing
5353

54-
d, rid, err := unwrapData(m2.Data)
54+
d, rid, err := unwrapData(m2.Data())
5555
if err != nil {
5656
t.Error(err)
5757
}
@@ -85,14 +85,14 @@ func TestServiceRequest(t *testing.T) {
8585
}
8686
}()
8787

88-
m1 := &msg.Message{Peer: peer1, Data: []byte("beep")}
88+
m1 := msg.New(peer1, []byte("beep"))
8989
m2, err := s1.SendRequest(ctx, m1)
9090
if err != nil {
9191
t.Error(err)
9292
}
9393

94-
if !bytes.Equal(m2.Data, []byte("peeb")) {
95-
t.Errorf("service handler data incorrect: %v != %v", m2.Data, "oof")
94+
if !bytes.Equal(m2.Data(), []byte("peeb")) {
95+
t.Errorf("service handler data incorrect: %v != %v", m2.Data(), "oof")
9696
}
9797
}
9898

@@ -117,7 +117,7 @@ func TestServiceRequestTimeout(t *testing.T) {
117117
}
118118
}()
119119

120-
m1 := &msg.Message{Peer: peer1, Data: []byte("beep")}
120+
m1 := msg.New(peer1, []byte("beep"))
121121
m2, err := s1.SendRequest(ctx, m1)
122122
if err == nil || m2 != nil {
123123
t.Error("should've timed out")

0 commit comments

Comments
 (0)