1
+ // Package multistream implements a simple stream router for the
2
+ // multistream-select protocoli. The protocol is defined at
3
+ // https://github.com/multiformats/multistream-select
1
4
package multistream
2
5
3
6
import (
@@ -9,23 +12,34 @@ import (
9
12
"sync"
10
13
)
11
14
15
+ // ErrTooLarge is an error to signal that an incoming message was too large
12
16
var ErrTooLarge = errors .New ("incoming message was too large" )
13
17
18
+ // ProtocolID identifies the multistream protocol itself and makes sure
19
+ // the multistream muxers on both sides of a channel can work with each other.
14
20
const ProtocolID = "/multistream/1.0.0"
15
21
16
- type HandlerFunc func (string , io.ReadWriteCloser ) error
22
+ // HandlerFunc is a user-provided function used by the MultistreamMuxer to
23
+ // handle a protocol/stream.
24
+ type HandlerFunc func (protocol string , rwc io.ReadWriteCloser ) error
17
25
26
+ // Handler is a wrapper to HandlerFunc which attaches a name (protocol) and a
27
+ // match function which can optionally be used to select a handler by other
28
+ // means than the name.
18
29
type Handler struct {
19
30
MatchFunc func (string ) bool
20
31
Handle HandlerFunc
21
32
AddName string
22
33
}
23
34
35
+ // MultistreamMuxer is a muxer for multistream. Depending on the stream
36
+ // protocol tag it will select the right handler and hand the stream off to it.
24
37
type MultistreamMuxer struct {
25
38
handlerlock sync.Mutex
26
39
handlers []Handler
27
40
}
28
41
42
+ // NewMultistreamMuxer creates a muxer.
29
43
func NewMultistreamMuxer () * MultistreamMuxer {
30
44
return new (MultistreamMuxer )
31
45
}
@@ -68,6 +82,8 @@ func delimWrite(w io.Writer, mes []byte) error {
68
82
return nil
69
83
}
70
84
85
+ // Ls is a Multistream muxer command which returns the list of handler names
86
+ // available on a muxer.
71
87
func Ls (rw io.ReadWriter ) ([]string , error ) {
72
88
err := delimWriteBuffered (rw , []byte ("ls" ))
73
89
if err != nil {
@@ -97,10 +113,14 @@ func fulltextMatch(s string) func(string) bool {
97
113
}
98
114
}
99
115
116
+ // AddHandler attaches a new protocol handler to the muxer.
100
117
func (msm * MultistreamMuxer ) AddHandler (protocol string , handler HandlerFunc ) {
101
118
msm .AddHandlerWithFunc (protocol , fulltextMatch (protocol ), handler )
102
119
}
103
120
121
+ // AddHandlerWithFunc attaches a new protocol handler to the muxer with a match.
122
+ // If the match function returns true for a given protocol tag, the protocol
123
+ // will be selected even if the handler name and protocol tags are different.
104
124
func (msm * MultistreamMuxer ) AddHandlerWithFunc (protocol string , match func (string ) bool , handler HandlerFunc ) {
105
125
msm .handlerlock .Lock ()
106
126
msm .removeHandler (protocol )
@@ -112,6 +132,7 @@ func (msm *MultistreamMuxer) AddHandlerWithFunc(protocol string, match func(stri
112
132
msm .handlerlock .Unlock ()
113
133
}
114
134
135
+ // RemoveHandler removes the handler with the given name from the muxer.
115
136
func (msm * MultistreamMuxer ) RemoveHandler (protocol string ) {
116
137
msm .handlerlock .Lock ()
117
138
defer msm .handlerlock .Unlock ()
@@ -128,6 +149,7 @@ func (msm *MultistreamMuxer) removeHandler(protocol string) {
128
149
}
129
150
}
130
151
152
+ // Protocols returns the list of handler-names added to this this muxer.
131
153
func (msm * MultistreamMuxer ) Protocols () []string {
132
154
var out []string
133
155
msm .handlerlock .Lock ()
@@ -138,6 +160,8 @@ func (msm *MultistreamMuxer) Protocols() []string {
138
160
return out
139
161
}
140
162
163
+ // ErrIncorrectVersion is an error reported when the muxer protocol negotiation
164
+ // fails because of a ProtocolID mismatch.
141
165
var ErrIncorrectVersion = errors .New ("client connected with incorrect version" )
142
166
143
167
func (msm * MultistreamMuxer ) findHandler (proto string ) * Handler {
@@ -153,6 +177,10 @@ func (msm *MultistreamMuxer) findHandler(proto string) *Handler {
153
177
return nil
154
178
}
155
179
180
+ // NegotiateLazy performs protocol selection and returns
181
+ // a multistream, the protocol used, the handler and an error. It is lazy
182
+ // because the write-handshake is performed on a subroutine, allowing this
183
+ // to return before that handshake is completed.
156
184
func (msm * MultistreamMuxer ) NegotiateLazy (rwc io.ReadWriteCloser ) (Multistream , string , HandlerFunc , error ) {
157
185
pval := make (chan string , 1 )
158
186
writeErr := make (chan error , 1 )
@@ -239,6 +267,8 @@ loop:
239
267
}
240
268
}
241
269
270
+ // Negotiate performs protocol selection and returns the protocol name and
271
+ // the matching handler function for it (or an error).
242
272
func (msm * MultistreamMuxer ) Negotiate (rwc io.ReadWriteCloser ) (string , HandlerFunc , error ) {
243
273
// Send our protocol ID
244
274
err := delimWriteBuffered (rwc , []byte (ProtocolID ))
@@ -292,6 +322,8 @@ loop:
292
322
293
323
}
294
324
325
+ // Ls implements the "ls" command which writes the list of
326
+ // supported protocols to the given Writer.
295
327
func (msm * MultistreamMuxer ) Ls (w io.Writer ) error {
296
328
buf := new (bytes.Buffer )
297
329
msm .handlerlock .Lock ()
@@ -317,6 +349,9 @@ func (msm *MultistreamMuxer) Ls(w io.Writer) error {
317
349
return err
318
350
}
319
351
352
+ // Handle performs protocol negotiation on a ReadWriteCloser
353
+ // (i.e. a connection). It will find a matching handler for the
354
+ // incoming protocol and pass the ReadWriteCloser to it.
320
355
func (msm * MultistreamMuxer ) Handle (rwc io.ReadWriteCloser ) error {
321
356
p , h , err := msm .Negotiate (rwc )
322
357
if err != nil {
@@ -325,6 +360,8 @@ func (msm *MultistreamMuxer) Handle(rwc io.ReadWriteCloser) error {
325
360
return h (p , rwc )
326
361
}
327
362
363
+ // ReadNextToken extracts a token from a ReadWriter. It is used during
364
+ // protocol negotiation and returns a string.
328
365
func ReadNextToken (rw io.ReadWriter ) (string , error ) {
329
366
tok , err := ReadNextTokenBytes (rw )
330
367
if err != nil {
@@ -334,6 +371,8 @@ func ReadNextToken(rw io.ReadWriter) (string, error) {
334
371
return string (tok ), nil
335
372
}
336
373
374
+ // ReadNextTokenBytes extracts a token from a ReadWriter. It is used
375
+ // during protocol negotiation and returns a byte slice.
337
376
func ReadNextTokenBytes (rw io.ReadWriter ) ([]byte , error ) {
338
377
data , err := lpReadBuf (rw )
339
378
switch err {
0 commit comments