Skip to content

Commit f53ba9d

Browse files
lightclientfjl
authored andcommitted
p2p/discover: fix update logic in handleAddNode (ethereum#29836)
It seems the semantic differences between addFoundNode and addInboundNode were lost in ethereum#29572. My understanding is addFoundNode is for a node you have not contacted directly (and are unsure if is available) whereas addInboundNode is for adding nodes that have contacted the local node and we can verify they are active. handleAddNode seems to be the consolidation of those two methods, yet it bumps the node in the bucket (updating it's IP addr) even if the node was not an inbound. This PR fixes this. It wasn't originally caught in tests like TestTable_addSeenNode because the manipulation of the node object actually modified the node value used by the test. New logic is added to reject non-inbound updates unless the sequence number of the (signed) ENR increases. Inbound updates, which are published by the updated node itself, are always accepted. If an inbound update changes the endpoint, the node will be revalidated on an expedited schedule. Co-authored-by: Felix Lange <[email protected]>
1 parent 16e6e7f commit f53ba9d

File tree

4 files changed

+187
-59
lines changed

4 files changed

+187
-59
lines changed

p2p/discover/table.go

+33-13
Original file line numberDiff line numberDiff line change
@@ -513,8 +513,9 @@ func (tab *Table) handleAddNode(req addNodeOp) bool {
513513
}
514514

515515
b := tab.bucket(req.node.ID())
516-
if tab.bumpInBucket(b, req.node.Node) {
517-
// Already in bucket, update record.
516+
n, _ := tab.bumpInBucket(b, req.node.Node, req.isInbound)
517+
if n != nil {
518+
// Already in bucket.
518519
return false
519520
}
520521
if len(b.entries) >= bucketSize {
@@ -605,26 +606,45 @@ func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *node {
605606
return rep
606607
}
607608

608-
// bumpInBucket updates the node record of n in the bucket.
609-
func (tab *Table) bumpInBucket(b *bucket, newRecord *enode.Node) bool {
609+
// bumpInBucket updates a node record if it exists in the bucket.
610+
// The second return value reports whether the node's endpoint (IP/port) was updated.
611+
func (tab *Table) bumpInBucket(b *bucket, newRecord *enode.Node, isInbound bool) (n *node, endpointChanged bool) {
610612
i := slices.IndexFunc(b.entries, func(elem *node) bool {
611613
return elem.ID() == newRecord.ID()
612614
})
613615
if i == -1 {
614-
return false
616+
return nil, false // not in bucket
617+
}
618+
n = b.entries[i]
619+
620+
// For inbound updates (from the node itself) we accept any change, even if it sets
621+
// back the sequence number. For found nodes (!isInbound), seq has to advance. Note
622+
// this check also ensures found discv4 nodes (which always have seq=0) can't be
623+
// updated.
624+
if newRecord.Seq() <= n.Seq() && !isInbound {
625+
return n, false
615626
}
616627

617-
if !newRecord.IP().Equal(b.entries[i].IP()) {
618-
// Endpoint has changed, ensure that the new IP fits into table limits.
619-
tab.removeIP(b, b.entries[i].IP())
628+
// Check endpoint update against IP limits.
629+
ipchanged := newRecord.IPAddr() != n.IPAddr()
630+
portchanged := newRecord.UDP() != n.UDP()
631+
if ipchanged {
632+
tab.removeIP(b, n.IP())
620633
if !tab.addIP(b, newRecord.IP()) {
621-
// It doesn't, put the previous one back.
622-
tab.addIP(b, b.entries[i].IP())
623-
return false
634+
// It doesn't fit with the limit, put the previous record back.
635+
tab.addIP(b, n.IP())
636+
return n, false
624637
}
625638
}
626-
b.entries[i].Node = newRecord
627-
return true
639+
640+
// Apply update.
641+
n.Node = newRecord
642+
if ipchanged || portchanged {
643+
// Ensure node is revalidated quickly for endpoint changes.
644+
tab.revalidation.nodeEndpointChanged(tab, n)
645+
return n, true
646+
}
647+
return n, false
628648
}
629649

630650
func (tab *Table) handleTrackRequest(op trackRequestOp) {

p2p/discover/table_reval.go

+13-12
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828

2929
const never = mclock.AbsTime(math.MaxInt64)
3030

31+
const slowRevalidationFactor = 3
32+
3133
// tableRevalidation implements the node revalidation process.
3234
// It tracks all nodes contained in Table, and schedules sending PING to them.
3335
type tableRevalidation struct {
@@ -48,7 +50,7 @@ func (tr *tableRevalidation) init(cfg *Config) {
4850
tr.fast.interval = cfg.PingInterval
4951
tr.fast.name = "fast"
5052
tr.slow.nextTime = never
51-
tr.slow.interval = cfg.PingInterval * 3
53+
tr.slow.interval = cfg.PingInterval * slowRevalidationFactor
5254
tr.slow.name = "slow"
5355
}
5456

@@ -65,6 +67,12 @@ func (tr *tableRevalidation) nodeRemoved(n *node) {
6567
n.revalList.remove(n)
6668
}
6769

70+
// nodeEndpointChanged is called when a change in IP or port is detected.
71+
func (tr *tableRevalidation) nodeEndpointChanged(tab *Table, n *node) {
72+
n.isValidatedLive = false
73+
tr.moveToList(&tr.fast, n, tab.cfg.Clock.Now(), &tab.rand)
74+
}
75+
6876
// run performs node revalidation.
6977
// It returns the next time it should be invoked, which is used in the Table main loop
7078
// to schedule a timer. However, run can be called at any time.
@@ -146,11 +154,11 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons
146154
defer tab.mutex.Unlock()
147155

148156
if !resp.didRespond {
149-
// Revalidation failed.
150157
n.livenessChecks /= 3
151158
if n.livenessChecks <= 0 {
152159
tab.deleteInBucket(b, n.ID())
153160
} else {
161+
tab.log.Debug("Node revalidation failed", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name)
154162
tr.moveToList(&tr.fast, n, now, &tab.rand)
155163
}
156164
return
@@ -159,22 +167,15 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons
159167
// The node responded.
160168
n.livenessChecks++
161169
n.isValidatedLive = true
170+
tab.log.Debug("Node revalidated", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name)
162171
var endpointChanged bool
163172
if resp.newRecord != nil {
164-
endpointChanged = tab.bumpInBucket(b, resp.newRecord)
165-
if endpointChanged {
166-
// If the node changed its advertised endpoint, the updated ENR is not served
167-
// until it has been revalidated.
168-
n.isValidatedLive = false
169-
}
173+
_, endpointChanged = tab.bumpInBucket(b, resp.newRecord, false)
170174
}
171-
tab.log.Debug("Revalidated node", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList)
172175

173-
// Move node over to slow queue after first validation.
176+
// Node moves to slow list if it passed and hasn't changed.
174177
if !endpointChanged {
175178
tr.moveToList(&tr.slow, n, now, &tab.rand)
176-
} else {
177-
tr.moveToList(&tr.fast, n, now, &tab.rand)
178179
}
179180
}
180181

p2p/discover/table_reval_test.go

+51-2
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ import (
2222
"time"
2323

2424
"github.com/ethereum/go-ethereum/common/mclock"
25+
"github.com/ethereum/go-ethereum/p2p/enode"
26+
"github.com/ethereum/go-ethereum/p2p/enr"
2527
)
2628

2729
// This test checks that revalidation can handle a node disappearing while
2830
// a request is active.
29-
func TestRevalidationNodeRemoved(t *testing.T) {
31+
func TestRevalidation_nodeRemoved(t *testing.T) {
3032
var (
3133
clock mclock.Simulated
3234
transport = newPingRecorder()
@@ -35,7 +37,7 @@ func TestRevalidationNodeRemoved(t *testing.T) {
3537
)
3638
defer db.Close()
3739

38-
// Fill a bucket.
40+
// Add a node to the table.
3941
node := nodeAtDistance(tab.self().ID(), 255, net.IP{77, 88, 99, 1})
4042
tab.handleAddNode(addNodeOp{node: node})
4143

@@ -68,3 +70,50 @@ func TestRevalidationNodeRemoved(t *testing.T) {
6870
t.Fatal("removed node contained in revalidation list")
6971
}
7072
}
73+
74+
// This test checks that nodes with an updated endpoint remain in the fast revalidation list.
75+
func TestRevalidation_endpointUpdate(t *testing.T) {
76+
var (
77+
clock mclock.Simulated
78+
transport = newPingRecorder()
79+
tab, db = newInactiveTestTable(transport, Config{Clock: &clock})
80+
tr = &tab.revalidation
81+
)
82+
defer db.Close()
83+
84+
// Add node to table.
85+
node := nodeAtDistance(tab.self().ID(), 255, net.IP{77, 88, 99, 1})
86+
tab.handleAddNode(addNodeOp{node: node})
87+
88+
// Update the record in transport, including endpoint update.
89+
record := node.Record()
90+
record.Set(enr.IP{100, 100, 100, 100})
91+
record.Set(enr.UDP(9999))
92+
nodev2 := enode.SignNull(record, node.ID())
93+
transport.updateRecord(nodev2)
94+
95+
// Start a revalidation request. Schedule once to get the next start time,
96+
// then advance the clock to that point and schedule again to start.
97+
next := tr.run(tab, clock.Now())
98+
clock.Run(time.Duration(next + 1))
99+
tr.run(tab, clock.Now())
100+
if len(tr.activeReq) != 1 {
101+
t.Fatal("revalidation request did not start:", tr.activeReq)
102+
}
103+
104+
// Now finish the revalidation request.
105+
var resp revalidationResponse
106+
select {
107+
case resp = <-tab.revalResponseCh:
108+
case <-time.After(1 * time.Second):
109+
t.Fatal("timed out waiting for revalidation")
110+
}
111+
tr.handleResponse(tab, resp)
112+
113+
if !tr.fast.contains(node.ID()) {
114+
t.Fatal("node not contained in fast revalidation list")
115+
}
116+
if node.isValidatedLive {
117+
t.Fatal("node is marked live after endpoint change")
118+
}
119+
}

p2p/discover/table_test.go

+90-32
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func waitForRevalidationPing(t *testing.T, transport *pingRecorder, tab *Table,
131131
simclock := tab.cfg.Clock.(*mclock.Simulated)
132132
maxAttempts := tab.len() * 8
133133
for i := 0; i < maxAttempts; i++ {
134-
simclock.Run(tab.cfg.PingInterval)
134+
simclock.Run(tab.cfg.PingInterval * slowRevalidationFactor)
135135
p := transport.waitPing(2 * time.Second)
136136
if p == nil {
137137
t.Fatal("Table did not send revalidation ping")
@@ -275,7 +275,7 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
275275
return reflect.ValueOf(t)
276276
}
277277

278-
func TestTable_addVerifiedNode(t *testing.T) {
278+
func TestTable_addInboundNode(t *testing.T) {
279279
tab, db := newTestTable(newPingRecorder(), Config{})
280280
<-tab.initDone
281281
defer db.Close()
@@ -286,29 +286,26 @@ func TestTable_addVerifiedNode(t *testing.T) {
286286
n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2})
287287
tab.addFoundNode(n1)
288288
tab.addFoundNode(n2)
289-
bucket := tab.bucket(n1.ID())
289+
checkBucketContent(t, tab, []*enode.Node{n1.Node, n2.Node})
290290

291-
// Verify bucket content:
292-
bcontent := []*node{n1, n2}
293-
if !reflect.DeepEqual(unwrapNodes(bucket.entries), unwrapNodes(bcontent)) {
294-
t.Fatalf("wrong bucket content: %v", bucket.entries)
295-
}
296-
297-
// Add a changed version of n2.
291+
// Add a changed version of n2. The bucket should be updated.
298292
newrec := n2.Record()
299293
newrec.Set(enr.IP{99, 99, 99, 99})
300-
newn2 := wrapNode(enode.SignNull(newrec, n2.ID()))
301-
tab.addInboundNode(newn2)
302-
303-
// Check that bucket is updated correctly.
304-
newBcontent := []*node{n1, newn2}
305-
if !reflect.DeepEqual(unwrapNodes(bucket.entries), unwrapNodes(newBcontent)) {
306-
t.Fatalf("wrong bucket content after update: %v", bucket.entries)
307-
}
308-
checkIPLimitInvariant(t, tab)
294+
n2v2 := enode.SignNull(newrec, n2.ID())
295+
tab.addInboundNode(wrapNode(n2v2))
296+
checkBucketContent(t, tab, []*enode.Node{n1.Node, n2v2})
297+
298+
// Try updating n2 without sequence number change. The update is accepted
299+
// because it's inbound.
300+
newrec = n2.Record()
301+
newrec.Set(enr.IP{100, 100, 100, 100})
302+
newrec.SetSeq(n2.Seq())
303+
n2v3 := enode.SignNull(newrec, n2.ID())
304+
tab.addInboundNode(wrapNode(n2v3))
305+
checkBucketContent(t, tab, []*enode.Node{n1.Node, n2v3})
309306
}
310307

311-
func TestTable_addSeenNode(t *testing.T) {
308+
func TestTable_addFoundNode(t *testing.T) {
312309
tab, db := newTestTable(newPingRecorder(), Config{})
313310
<-tab.initDone
314311
defer db.Close()
@@ -319,23 +316,84 @@ func TestTable_addSeenNode(t *testing.T) {
319316
n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2})
320317
tab.addFoundNode(n1)
321318
tab.addFoundNode(n2)
319+
checkBucketContent(t, tab, []*enode.Node{n1.Node, n2.Node})
322320

323-
// Verify bucket content:
324-
bcontent := []*node{n1, n2}
325-
if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
326-
t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries)
327-
}
328-
329-
// Add a changed version of n2.
321+
// Add a changed version of n2. The bucket should be updated.
330322
newrec := n2.Record()
331323
newrec.Set(enr.IP{99, 99, 99, 99})
332-
newn2 := wrapNode(enode.SignNull(newrec, n2.ID()))
333-
tab.addFoundNode(newn2)
324+
n2v2 := enode.SignNull(newrec, n2.ID())
325+
tab.addFoundNode(wrapNode(n2v2))
326+
checkBucketContent(t, tab, []*enode.Node{n1.Node, n2v2})
327+
328+
// Try updating n2 without a sequence number change.
329+
// The update should not be accepted.
330+
newrec = n2.Record()
331+
newrec.Set(enr.IP{100, 100, 100, 100})
332+
newrec.SetSeq(n2.Seq())
333+
n2v3 := enode.SignNull(newrec, n2.ID())
334+
tab.addFoundNode(wrapNode(n2v3))
335+
checkBucketContent(t, tab, []*enode.Node{n1.Node, n2v2})
336+
}
334337

335-
// Check that bucket content is unchanged.
336-
if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
337-
t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries)
338+
// This test checks that discv4 nodes can update their own endpoint via PING.
339+
func TestTable_addInboundNodeUpdateV4Accept(t *testing.T) {
340+
tab, db := newTestTable(newPingRecorder(), Config{})
341+
<-tab.initDone
342+
defer db.Close()
343+
defer tab.close()
344+
345+
// Add a v4 node.
346+
key, _ := crypto.HexToECDSA("dd3757a8075e88d0f2b1431e7d3c5b1562e1c0aab9643707e8cbfcc8dae5cfe3")
347+
n1 := enode.NewV4(&key.PublicKey, net.IP{88, 77, 66, 1}, 9000, 9000)
348+
tab.addInboundNode(wrapNode(n1))
349+
checkBucketContent(t, tab, []*enode.Node{n1})
350+
351+
// Add an updated version with changed IP.
352+
// The update will be accepted because it is inbound.
353+
n1v2 := enode.NewV4(&key.PublicKey, net.IP{99, 99, 99, 99}, 9000, 9000)
354+
tab.addInboundNode(wrapNode(n1v2))
355+
checkBucketContent(t, tab, []*enode.Node{n1v2})
356+
}
357+
358+
// This test checks that discv4 node entries will NOT be updated when a
359+
// changed record is found.
360+
func TestTable_addFoundNodeV4UpdateReject(t *testing.T) {
361+
tab, db := newTestTable(newPingRecorder(), Config{})
362+
<-tab.initDone
363+
defer db.Close()
364+
defer tab.close()
365+
366+
// Add a v4 node.
367+
key, _ := crypto.HexToECDSA("dd3757a8075e88d0f2b1431e7d3c5b1562e1c0aab9643707e8cbfcc8dae5cfe3")
368+
n1 := enode.NewV4(&key.PublicKey, net.IP{88, 77, 66, 1}, 9000, 9000)
369+
tab.addFoundNode(wrapNode(n1))
370+
checkBucketContent(t, tab, []*enode.Node{n1})
371+
372+
// Add an updated version with changed IP.
373+
// The update won't be accepted because it isn't inbound.
374+
n1v2 := enode.NewV4(&key.PublicKey, net.IP{99, 99, 99, 99}, 9000, 9000)
375+
tab.addFoundNode(wrapNode(n1v2))
376+
checkBucketContent(t, tab, []*enode.Node{n1})
377+
}
378+
379+
func checkBucketContent(t *testing.T, tab *Table, nodes []*enode.Node) {
380+
t.Helper()
381+
382+
b := tab.bucket(nodes[0].ID())
383+
if reflect.DeepEqual(unwrapNodes(b.entries), nodes) {
384+
return
338385
}
386+
t.Log("wrong bucket content. have nodes:")
387+
for _, n := range b.entries {
388+
t.Logf(" %v (seq=%v, ip=%v)", n.ID(), n.Seq(), n.IP())
389+
}
390+
t.Log("want nodes:")
391+
for _, n := range nodes {
392+
t.Logf(" %v (seq=%v, ip=%v)", n.ID(), n.Seq(), n.IP())
393+
}
394+
t.FailNow()
395+
396+
// Also check IP limits.
339397
checkIPLimitInvariant(t, tab)
340398
}
341399

0 commit comments

Comments
 (0)