Skip to content

Commit cce849c

Browse files
committed
address review comments
1 parent ffed95f commit cce849c

File tree

12 files changed

+212
-210
lines changed

12 files changed

+212
-210
lines changed

core/network/conn.go

+11
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77

88
ic "github.com/libp2p/go-libp2p/core/crypto"
9+
910
"github.com/libp2p/go-libp2p/core/peer"
1011
"github.com/libp2p/go-libp2p/core/protocol"
1112

@@ -31,6 +32,16 @@ func (c *ConnError) Error() string {
3132
return fmt.Sprintf("connection closed (%s): code: %d", side, c.ErrorCode)
3233
}
3334

35+
func (c *ConnError) Is(target error) bool {
36+
if target == ErrReset {
37+
return true
38+
}
39+
if tce, ok := target.(*ConnError); ok {
40+
return tce.ErrorCode == c.ErrorCode && tce.Remote == c.Remote
41+
}
42+
return false
43+
}
44+
3445
func (c *ConnError) Unwrap() error {
3546
return c.TransportError
3647
}

core/network/mux.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@ func (s *StreamError) Error() string {
3232
}
3333

3434
func (s *StreamError) Is(target error) bool {
35-
return target == ErrReset
35+
if target == ErrReset {
36+
return true
37+
}
38+
if tse, ok := target.(*StreamError); ok {
39+
return tse.ErrorCode == s.ErrorCode && tse.Remote == s.Remote
40+
}
41+
return false
3642
}
3743

3844
func (s *StreamError) Unwrap() error {
@@ -96,16 +102,14 @@ type MuxedStream interface {
96102
// side to hang up and go away.
97103
Reset() error
98104

99-
SetDeadline(time.Time) error
100-
SetReadDeadline(time.Time) error
101-
SetWriteDeadline(time.Time) error
102-
}
103-
104-
type ResetWithErrorer interface {
105-
// ResetWithError closes both ends of the stream with errCode. The errCode is sent
105+
// ResetWithError aborts both ends of the stream with `errCode`. `errCode` is sent
106106
// to the peer on a best effort basis. For transports that do not support sending
107107
// error codes to remote peer, the behavior is identical to calling Reset
108108
ResetWithError(errCode StreamErrorCode) error
109+
110+
SetDeadline(time.Time) error
111+
SetReadDeadline(time.Time) error
112+
SetWriteDeadline(time.Time) error
109113
}
110114

111115
// MuxedConn represents a connection to a remote peer that has been

p2p/muxer/yamux/stream.go

-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ func parseResetError(err error) error {
2727
if errors.As(err, &ce) {
2828
return &network.ConnError{Remote: ce.Remote, ErrorCode: network.ConnErrorCode(ce.ErrorCode)}
2929
}
30-
// TODO: How should we handle resets for reason other than a remote error
3130
if errors.Is(err, yamux.ErrStreamReset) {
3231
return fmt.Errorf("%w: %w", network.ErrReset, err)
3332
}

p2p/net/connmgr/connmgr_test.go

+76
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ import (
1111
"github.com/libp2p/go-libp2p/core/crypto"
1212
"github.com/libp2p/go-libp2p/core/network"
1313
"github.com/libp2p/go-libp2p/core/peer"
14+
"github.com/libp2p/go-libp2p/core/peerstore"
1415
tu "github.com/libp2p/go-libp2p/core/test"
1516

17+
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
18+
1619
ma "github.com/multiformats/go-multiaddr"
1720
"github.com/stretchr/testify/require"
1821
)
@@ -995,3 +998,76 @@ type testLimitGetter struct {
995998
func (g testLimitGetter) GetConnLimit() int {
996999
return g.limit
9971000
}
1001+
1002+
func TestErrorCode(t *testing.T) {
1003+
sw1, sw2, sw3 := swarmt.GenSwarm(t), swarmt.GenSwarm(t), swarmt.GenSwarm(t)
1004+
defer sw1.Close()
1005+
defer sw2.Close()
1006+
defer sw3.Close()
1007+
1008+
cm, err := NewConnManager(1, 1, WithGracePeriod(0), WithSilencePeriod(10))
1009+
require.NoError(t, err)
1010+
defer cm.Close()
1011+
1012+
sw1.Notify(cm.Notifee())
1013+
sw1.Peerstore().AddAddrs(sw2.LocalPeer(), sw2.ListenAddresses(), peerstore.PermanentAddrTTL)
1014+
sw1.Peerstore().AddAddrs(sw3.LocalPeer(), sw3.ListenAddresses(), peerstore.PermanentAddrTTL)
1015+
1016+
c12, err := sw1.DialPeer(context.Background(), sw2.LocalPeer())
1017+
require.NoError(t, err)
1018+
1019+
var c21 network.Conn
1020+
require.Eventually(t, func() bool {
1021+
conns := sw2.ConnsToPeer(sw1.LocalPeer())
1022+
if len(conns) == 0 {
1023+
return false
1024+
}
1025+
c21 = conns[0]
1026+
return true
1027+
}, 5*time.Second, 100*time.Millisecond)
1028+
1029+
c13, err := sw1.DialPeer(context.Background(), sw3.LocalPeer())
1030+
require.NoError(t, err)
1031+
1032+
var c31 network.Conn
1033+
require.Eventually(t, func() bool {
1034+
conns := sw3.ConnsToPeer(sw1.LocalPeer())
1035+
if len(conns) == 0 {
1036+
return false
1037+
}
1038+
c31 = conns[0]
1039+
return true
1040+
}, 5*time.Second, 100*time.Millisecond)
1041+
1042+
cm.TrimOpenConns(context.Background())
1043+
1044+
require.True(t, c12.IsClosed() || c13.IsClosed())
1045+
var c, cr network.Conn
1046+
if c12.IsClosed() {
1047+
c = c12
1048+
require.Eventually(t, func() bool {
1049+
conns := sw2.ConnsToPeer(sw1.LocalPeer())
1050+
if len(conns) == 0 {
1051+
cr = c21
1052+
return true
1053+
}
1054+
return false
1055+
}, 5*time.Second, 100*time.Millisecond)
1056+
} else {
1057+
c = c13
1058+
require.Eventually(t, func() bool {
1059+
conns := sw3.ConnsToPeer(sw1.LocalPeer())
1060+
if len(conns) == 0 {
1061+
cr = c31
1062+
return true
1063+
}
1064+
return false
1065+
}, 5*time.Second, 100*time.Millisecond)
1066+
}
1067+
1068+
_, err = c.NewStream(context.Background())
1069+
require.ErrorIs(t, err, &network.ConnError{ErrorCode: network.ConnGarbageCollected, Remote: false})
1070+
1071+
_, err = cr.NewStream(context.Background())
1072+
require.ErrorIs(t, err, &network.ConnError{ErrorCode: network.ConnGarbageCollected, Remote: true})
1073+
}

p2p/net/mock/mock_stream.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,13 @@ func (s *stream) Reset() error {
144144
return nil
145145
}
146146

147-
func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
147+
// ResetWithError resets the stream. It ignores the provided error code.
148+
// TODO: Implement error code support.
149+
func (s *stream) ResetWithError(_ network.StreamErrorCode) error {
148150
// Cancel any pending reads/writes with an error.
149-
// TODO: Should these be the other way round(remote=true)?
150-
s.write.CloseWithError(&network.StreamError{Remote: false, ErrorCode: errCode})
151-
s.read.CloseWithError(&network.StreamError{Remote: false, ErrorCode: errCode})
151+
152+
s.write.CloseWithError(network.ErrReset)
153+
s.read.CloseWithError(network.ErrReset)
152154

153155
select {
154156
case s.reset <- struct{}{}:

p2p/net/swarm/swarm_conn.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,7 @@ func (c *Conn) start() {
138138
}
139139
scope, err := c.swarm.ResourceManager().OpenStream(c.RemotePeer(), network.DirInbound)
140140
if err != nil {
141-
if tse, ok := ts.(network.ResetWithErrorer); ok {
142-
tse.ResetWithError(network.StreamResourceLimitExceeded)
143-
} else {
144-
ts.Reset()
145-
}
141+
ts.ResetWithError(network.StreamResourceLimitExceeded)
146142
continue
147143
}
148144
c.swarm.refs.Add(1)

p2p/net/swarm/swarm_stream.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,7 @@ func (s *Stream) Reset() error {
9292
}
9393

9494
func (s *Stream) ResetWithError(errCode network.StreamErrorCode) error {
95-
var err error
96-
if se, ok := s.stream.(network.ResetWithErrorer); ok {
97-
err = se.ResetWithError(errCode)
98-
} else {
99-
err = s.stream.Reset()
100-
}
95+
err := s.stream.ResetWithError(errCode)
10196
s.closeAndRemoveStream()
10297
return err
10398
}

p2p/protocol/circuitv2/relay/relay_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ func TestRelayLimitData(t *testing.T) {
369369
t.Fatalf("expected to read %d bytes but read %d", len(buf), n)
370370
}
371371
}
372+
372373
buf = make([]byte, 4096)
373374
if _, err := rand.Read(buf); err != nil {
374375
t.Fatal(err)

0 commit comments

Comments
 (0)