Skip to content

remove goprocess from the mock package #1266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/pubsub/chat/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log/v2 v2.4.0
github.com/jbenet/goprocess v0.1.4
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/koron/go-ssdp v0.0.2 // indirect
github.com/libp2p/go-addr-util v0.1.0
Expand Down
3 changes: 2 additions & 1 deletion p2p/net/mock/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
)

type Mocknet interface {

// GenPeer generates a peer and its network.Network in the Mocknet
GenPeer() (host.Host, error)

Expand Down Expand Up @@ -63,6 +62,8 @@ type Mocknet interface {
DisconnectNets(network.Network, network.Network) error
LinkAll() error
ConnectAllButSelf() error

io.Closer
}

// LinkOptions are used to change aspects of the links.
Expand Down
18 changes: 7 additions & 11 deletions p2p/net/mock/mock.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package mocknet

import (
"context"

logging "github.com/ipfs/go-log/v2"
)

var log = logging.Logger("mocknet")

// WithNPeers constructs a Mocknet with N peers.
func WithNPeers(ctx context.Context, n int) (Mocknet, error) {
m := New(ctx)
func WithNPeers(n int) (Mocknet, error) {
m := New()
for i := 0; i < n; i++ {
if _, err := m.GenPeer(); err != nil {
return nil, err
Expand All @@ -22,30 +20,28 @@ func WithNPeers(ctx context.Context, n int) (Mocknet, error) {
// FullMeshLinked constructs a Mocknet with full mesh of Links.
// This means that all the peers **can** connect to each other
// (not that they already are connected. you can use m.ConnectAll())
func FullMeshLinked(ctx context.Context, n int) (Mocknet, error) {
m, err := WithNPeers(ctx, n)
func FullMeshLinked(n int) (Mocknet, error) {
m, err := WithNPeers(n)
if err != nil {
return nil, err
}

if err := m.LinkAll(); err != nil {
return nil, err
}

return m, nil
}

// FullMeshConnected constructs a Mocknet with full mesh of Connections.
// This means that all the peers have dialed and are ready to talk to
// each other.
func FullMeshConnected(ctx context.Context, n int) (Mocknet, error) {
m, err := FullMeshLinked(ctx, n)
func FullMeshConnected(n int) (Mocknet, error) {
m, err := FullMeshLinked(n)
if err != nil {
return nil, err
}

err = m.ConnectAllButSelf()
if err != nil {
if err := m.ConnectAllButSelf(); err != nil {
return nil, err
}
return m, nil
Expand Down
18 changes: 8 additions & 10 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"sync/atomic"

process "github.com/jbenet/goprocess"
ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -40,13 +39,13 @@ type conn struct {
streams list.List
stat network.ConnStats

pairProc, connProc process.Process
closeOnce sync.Once

sync.RWMutex
}

func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction) *conn {
c := &conn{net: ln, link: l, pairProc: p}
func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn {
c := &conn{net: ln, link: l}
c.local = ln.peer
c.remote = rn.peer
c.stat.Direction = dir
Expand All @@ -65,7 +64,6 @@ func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction)

c.localPrivKey = ln.ps.PrivKey(ln.peer)
c.remotePubKey = rn.ps.PubKey(rn.peer)
c.connProc = process.WithParent(c.pairProc)
return c
}

Expand All @@ -74,11 +72,11 @@ func (c *conn) ID() string {
}

func (c *conn) Close() error {
return c.pairProc.Close()
}

func (c *conn) setup() {
c.connProc.SetTeardown(c.teardown)
c.closeOnce.Do(func() {
go c.rconn.Close()
c.teardown()
})
return nil
}

func (c *conn) teardown() error {
Expand Down
7 changes: 2 additions & 5 deletions p2p/net/mock/mock_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

process "github.com/jbenet/goprocess"
)

// link implements mocknet.Link
Expand All @@ -33,13 +31,12 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
l.RLock()
defer l.RUnlock()

parent := process.WithTeardown(func() error { return nil })
target := l.nets[0]
if target == dialer {
target = l.nets[1]
}
dc := newConn(parent, dialer, target, l, network.DirOutbound)
tc := newConn(parent, target, dialer, l, network.DirInbound)
dc := newConn(dialer, target, l, network.DirOutbound)
tc := newConn(target, dialer, l, network.DirInbound)
dc.rconn = tc
tc.rconn = dc
return dc, tc
Expand Down
39 changes: 20 additions & 19 deletions p2p/net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (

bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"

p2putil "github.com/libp2p/go-libp2p-netutil"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -30,7 +27,7 @@ var blackholeIP6 = net.ParseIP("100::")
// mocknet implements mocknet.Mocknet
type mocknet struct {
nets map[peer.ID]*peernet
hosts map[peer.ID]*bhost.BasicHost
hosts map[peer.ID]host.Host

// links make it possible to connect two peers.
// think of links as the physical medium.
Expand All @@ -40,22 +37,30 @@ type mocknet struct {

linkDefaults LinkOptions

proc goprocess.Process // for Context closing
ctx context.Context
ctxCancel context.CancelFunc
ctx context.Context
sync.Mutex
}

func New(ctx context.Context) Mocknet {
proc := goprocessctx.WithContext(ctx)
ctx = goprocessctx.WithProcessClosing(ctx, proc)

return &mocknet{
func New() Mocknet {
mn := &mocknet{
nets: map[peer.ID]*peernet{},
hosts: map[peer.ID]*bhost.BasicHost{},
hosts: map[peer.ID]host.Host{},
links: map[peer.ID]map[peer.ID]map[*link]struct{}{},
proc: proc,
ctx: ctx,
}
mn.ctx, mn.ctxCancel = context.WithCancel(context.Background())
return mn
}

func (mn *mocknet) Close() error {
mn.ctxCancel()
for _, h := range mn.hosts {
h.Close()
}
for _, n := range mn.nets {
n.Close()
}
return nil
}

func (mn *mocknet) GenPeer() (host.Host, error) {
Expand Down Expand Up @@ -104,7 +109,7 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
}

func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host.Host, error) {
n, err := newPeernet(mn.ctx, mn, p, ps)
n, err := newPeernet(mn, p, ps)
if err != nil {
return nil, err
}
Expand All @@ -119,10 +124,6 @@ func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host
return nil, err
}

// Ensure we close the hoset when we close the mock network.
// Otherwise, tests leak memory.
mn.proc.AddChild(goprocess.WithTeardown(h.Close))

mn.Lock()
mn.nets[n.peer] = n
mn.hosts[n.peer] = h
Expand Down
8 changes: 3 additions & 5 deletions p2p/net/mock/mock_notif_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ import (

func TestNotifications(t *testing.T) {
const swarmSize = 5
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const timeout = 10 * time.Second

mn, err := FullMeshLinked(ctx, swarmSize)
mn, err := FullMeshLinked(swarmSize)
if err != nil {
t.Fatal(err)
}

timeout := 10 * time.Second
defer mn.Close()

// signup notifs
nets := mn.Nets()
Expand Down
23 changes: 2 additions & 21 deletions p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"

"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"

ma "github.com/multiformats/go-multiaddr"
)

Expand All @@ -36,12 +33,11 @@ type peernet struct {
notifmu sync.Mutex
notifs map[network.Notifiee]struct{}

proc goprocess.Process
sync.RWMutex
}

// newPeernet constructs a new peernet
func newPeernet(ctx context.Context, m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) {
func newPeernet(m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) {
n := &peernet{
mocknet: m,
peer: p,
Expand All @@ -53,12 +49,10 @@ func newPeernet(ctx context.Context, m *mocknet, p peer.ID, ps peerstore.Peersto
notifs: make(map[network.Notifiee]struct{}),
}

n.proc = goprocessctx.WithContextAndTeardown(ctx, n.teardown)
return n, nil
}

func (pn *peernet) teardown() error {

func (pn *peernet) Close() error {
// close the connections
for _, c := range pn.allConns() {
c.Close()
Expand All @@ -79,11 +73,6 @@ func (pn *peernet) allConns() []*conn {
return cs
}

// Close calls the ContextCloser func
func (pn *peernet) Close() error {
return pn.proc.Close()
}

func (pn *peernet) Peerstore() peerstore.Peerstore {
return pn.ps
}
Expand Down Expand Up @@ -199,9 +188,6 @@ func (pn *peernet) remoteOpenedConn(c *conn) {
// to given remote peer over given link
func (pn *peernet) addConn(c *conn) {
defer c.notifLk.Unlock()
// Call this after unlocking as it might cause us to immediately close
// the connection and remove it from the swarm.
c.setup()

pn.notifyAll(func(n network.Notifiee) {
n.Connected(pn, c)
Expand All @@ -226,11 +212,6 @@ func (pn *peernet) removeConn(c *conn) {
delete(cs, c)
}

// Process returns the network's Process
func (pn *peernet) Process() goprocess.Process {
return pn.proc
}

// LocalPeer the network's LocalPeer
func (pn *peernet) LocalPeer() peer.ID {
return pn.peer
Expand Down
Loading