Skip to content

Commit 15ee0d6

Browse files
committed
integration test preliminaries
1 parent eafffe8 commit 15ee0d6

File tree

2 files changed

+287
-0
lines changed

2 files changed

+287
-0
lines changed

itest/echo.go

+201
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package itest
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"sync"
8+
"time"
9+
10+
"github.com/libp2p/go-libp2p-core/host"
11+
"github.com/libp2p/go-libp2p-core/network"
12+
"github.com/libp2p/go-libp2p-core/peer"
13+
14+
logging "github.com/ipfs/go-log/v2"
15+
)
16+
17+
const (
18+
EchoService = "test.echo"
19+
EchoProtoID = "/test/echo"
20+
)
21+
22+
var (
23+
echoLog = logging.Logger("echo")
24+
)
25+
26+
type Echo struct {
27+
Host host.Host
28+
29+
WaitBeforeRead, WaitBeforeWrite func() error
30+
31+
mx sync.Mutex
32+
status EchoStatus
33+
}
34+
35+
type EchoStatus struct {
36+
StreamsIn int
37+
EchosIn, EchosOut int
38+
IOErrors int
39+
ResourceServiceErrors int
40+
ResourceReservationErrors int
41+
}
42+
43+
func NewEcho(h host.Host) *Echo {
44+
e := &Echo{Host: h}
45+
h.SetStreamHandler(EchoProtoID, e.handleStream)
46+
return e
47+
}
48+
49+
func (e *Echo) Status() EchoStatus {
50+
e.mx.Lock()
51+
defer e.mx.Unlock()
52+
53+
return e.status
54+
}
55+
56+
func (e *Echo) handleStream(s network.Stream) {
57+
defer s.Close()
58+
59+
e.mx.Lock()
60+
e.status.StreamsIn++
61+
e.mx.Unlock()
62+
63+
if err := s.Scope().SetService(EchoService); err != nil {
64+
echoLog.Debugf("error attaching stream to echo service: %s", err)
65+
66+
e.mx.Lock()
67+
e.status.ResourceServiceErrors++
68+
e.mx.Unlock()
69+
70+
s.Reset()
71+
return
72+
}
73+
74+
if err := s.Scope().ReserveMemory(4096, network.ReservationPriorityAlways); err != nil {
75+
echoLog.Debugf("error reserving memory: %s", err)
76+
77+
e.mx.Lock()
78+
e.status.ResourceReservationErrors++
79+
e.mx.Unlock()
80+
81+
s.Reset()
82+
return
83+
}
84+
85+
if e.WaitBeforeRead != nil {
86+
if err := e.WaitBeforeRead(); err != nil {
87+
echoLog.Debugf("error waiting before read: %s", err)
88+
89+
s.Reset()
90+
return
91+
}
92+
}
93+
94+
buf := make([]byte, 4096)
95+
96+
s.SetReadDeadline(time.Now().Add(5 * time.Second))
97+
n, err := s.Read(buf)
98+
switch {
99+
case err == io.EOF:
100+
if n == 0 {
101+
return
102+
}
103+
104+
case err != nil:
105+
echoLog.Debugf("I/O error : %s", err)
106+
107+
e.mx.Lock()
108+
e.status.IOErrors++
109+
e.mx.Unlock()
110+
111+
s.Reset()
112+
return
113+
}
114+
115+
e.mx.Lock()
116+
e.status.EchosIn++
117+
e.mx.Unlock()
118+
119+
if e.WaitBeforeWrite != nil {
120+
if err := e.WaitBeforeWrite(); err != nil {
121+
echoLog.Debugf("error waiting before write: %s", err)
122+
123+
s.Reset()
124+
return
125+
}
126+
}
127+
128+
s.SetWriteDeadline(time.Now().Add(5 * time.Second))
129+
_, err = s.Write(buf[:n])
130+
if err != nil {
131+
echoLog.Debugf("I/O error: %s", err)
132+
133+
e.mx.Lock()
134+
e.status.IOErrors++
135+
e.mx.Unlock()
136+
137+
s.Reset()
138+
return
139+
}
140+
141+
e.mx.Lock()
142+
e.status.EchosOut++
143+
e.mx.Unlock()
144+
145+
s.CloseWrite()
146+
}
147+
148+
func (e *Echo) Echo(p peer.ID, what string) error {
149+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
150+
defer cancel()
151+
152+
s, err := e.Host.NewStream(ctx, p, EchoProtoID)
153+
if err != nil {
154+
return err
155+
}
156+
defer s.Close()
157+
158+
if err := s.Scope().SetService(EchoService); err != nil {
159+
echoLog.Debugf("error attaching stream to echo service: %s", err)
160+
161+
s.Reset()
162+
return err
163+
}
164+
165+
if err := s.Scope().ReserveMemory(4096, network.ReservationPriorityAlways); err != nil {
166+
echoLog.Debugf("error reserving memory: %s", err)
167+
168+
s.Reset()
169+
return err
170+
}
171+
172+
s.SetWriteDeadline(time.Now().Add(5 * time.Second))
173+
_, err = s.Write([]byte(what))
174+
if err != nil {
175+
return err
176+
}
177+
s.CloseWrite()
178+
179+
buf := make([]byte, 4096)
180+
181+
s.SetReadDeadline(time.Now().Add(5 * time.Second))
182+
n, err := s.Read(buf)
183+
switch {
184+
case err == io.EOF:
185+
if n == 0 {
186+
return err
187+
}
188+
189+
case err != nil:
190+
echoLog.Debugf("I/O error : %s", err)
191+
192+
s.Reset()
193+
return err
194+
}
195+
196+
if what != string(buf[:n]) {
197+
return fmt.Errorf("echo output doesn't match input")
198+
}
199+
200+
return nil
201+
}

itest/echo_test.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package itest
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/libp2p/go-libp2p"
8+
"github.com/libp2p/go-libp2p-core/peer"
9+
"github.com/libp2p/go-libp2p-core/peerstore"
10+
)
11+
12+
func createEchos(t *testing.T, count int, opts ...libp2p.Option) []*Echo {
13+
result := make([]*Echo, 0, count)
14+
15+
for i := 0; i < count; i++ {
16+
h, err := libp2p.New(opts...)
17+
if err != nil {
18+
t.Fatal(err)
19+
}
20+
21+
e := NewEcho(h)
22+
result = append(result, e)
23+
}
24+
25+
for i := 0; i < count; i++ {
26+
for j := 0; j < count; j++ {
27+
if i == j {
28+
continue
29+
}
30+
31+
result[i].Host.Peerstore().AddAddrs(result[j].Host.ID(), result[j].Host.Addrs(), peerstore.PermanentAddrTTL)
32+
}
33+
}
34+
35+
return result
36+
}
37+
38+
func checkEchoStatus(t *testing.T, e *Echo, expected EchoStatus) {
39+
t.Helper()
40+
41+
status := e.Status()
42+
43+
if status.StreamsIn != expected.StreamsIn {
44+
t.Fatalf("expected %d streams in, got %d", expected.StreamsIn, status.StreamsIn)
45+
}
46+
if status.EchosIn != expected.EchosIn {
47+
t.Fatalf("expected %d echos in, got %d", expected.EchosIn, status.EchosIn)
48+
}
49+
if status.EchosOut != expected.EchosOut {
50+
t.Fatalf("expected %d echos out, got %d", expected.EchosOut, status.EchosOut)
51+
}
52+
if status.IOErrors != expected.IOErrors {
53+
t.Fatalf("expected %d I/O errors, got %d", expected.IOErrors, status.IOErrors)
54+
}
55+
if status.ResourceServiceErrors != expected.ResourceServiceErrors {
56+
t.Fatalf("expected %d service resource errors, got %d", expected.ResourceServiceErrors, status.ResourceServiceErrors)
57+
}
58+
if status.ResourceReservationErrors != expected.ResourceReservationErrors {
59+
t.Fatalf("expected %d reservation resource errors, got %d", expected.ResourceReservationErrors, status.ResourceReservationErrors)
60+
}
61+
}
62+
63+
func TestEcho(t *testing.T) {
64+
echos := createEchos(t, 2)
65+
66+
err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()})
67+
if err != nil {
68+
t.Fatal(err)
69+
}
70+
71+
defer func() {
72+
for _, e := range echos {
73+
e.Host.Close()
74+
}
75+
}()
76+
77+
if err = echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil {
78+
t.Fatal(err)
79+
}
80+
81+
checkEchoStatus(t, echos[1], EchoStatus{
82+
StreamsIn: 1,
83+
EchosIn: 1,
84+
EchosOut: 1,
85+
})
86+
}

0 commit comments

Comments
 (0)