Skip to content

p2p/discover: improved node revalidation #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions common/mclock/alarm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package mclock

import (
"time"
)

// Alarm sends timed notifications on a channel. This is very similar to a regular timer,
// but is easier to use in code that needs to re-schedule the same timer over and over.
//
// When scheduling an Alarm, the channel returned by C() will receive a value no later
// than the scheduled time. An Alarm can be reused after it has fired and can also be
// canceled by calling Stop.
type Alarm struct {
ch chan struct{}
clock Clock
timer Timer
deadline AbsTime
}

// NewAlarm creates an Alarm.
func NewAlarm(clock Clock) *Alarm {
if clock == nil {
panic("nil clock")
}
return &Alarm{
ch: make(chan struct{}, 1),
clock: clock,
}
}

// C returns the alarm notification channel. This channel remains identical for
// the entire lifetime of the alarm, and is never closed.
func (e *Alarm) C() <-chan struct{} {
return e.ch
}

// Stop cancels the alarm and drains the channel.
// This method is not safe for concurrent use.
func (e *Alarm) Stop() {
// Clear timer.
if e.timer != nil {
e.timer.Stop()
}
e.deadline = 0

// Drain the channel.
select {
case <-e.ch:
default:
}
}

// Schedule sets the alarm to fire no later than the given time. If the alarm was already
// scheduled but has not fired yet, it may fire earlier than the newly-scheduled time.
func (e *Alarm) Schedule(time AbsTime) {
now := e.clock.Now()
e.schedule(now, time)
}

func (e *Alarm) schedule(now, newDeadline AbsTime) {
if e.timer != nil {
if e.deadline > now && e.deadline <= newDeadline {
// Here, the current timer can be reused because it is already scheduled to
// occur earlier than the new deadline.
//
// The e.deadline > now part of the condition is important. If the old
// deadline lies in the past, we assume the timer has already fired and needs
// to be rescheduled.
return
}
e.timer.Stop()
}

// Set the timer.
d := time.Duration(0)
if newDeadline < now {
newDeadline = now
} else {
d = newDeadline.Sub(now)
}
e.timer = e.clock.AfterFunc(d, e.send)
e.deadline = newDeadline
}

func (e *Alarm) send() {
select {
case e.ch <- struct{}{}:
default:
}
}
116 changes: 116 additions & 0 deletions common/mclock/alarm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package mclock

import "testing"

// This test checks basic functionality of Alarm.
func TestAlarm(t *testing.T) {
clk := new(Simulated)
clk.Run(20)
a := NewAlarm(clk)

a.Schedule(clk.Now() + 10)
if recv(a.C()) {
t.Fatal("Alarm fired before scheduled deadline")
}
if ntimers := clk.ActiveTimers(); ntimers != 1 {
t.Fatal("clock has", ntimers, "active timers, want", 1)
}
clk.Run(5)
if recv(a.C()) {
t.Fatal("Alarm fired too early")
}

clk.Run(5)
if !recv(a.C()) {
t.Fatal("Alarm did not fire")
}
if recv(a.C()) {
t.Fatal("Alarm fired twice")
}
if ntimers := clk.ActiveTimers(); ntimers != 0 {
t.Fatal("clock has", ntimers, "active timers, want", 0)
}

a.Schedule(clk.Now() + 5)
if recv(a.C()) {
t.Fatal("Alarm fired before scheduled deadline when scheduling the second event")
}

clk.Run(5)
if !recv(a.C()) {
t.Fatal("Alarm did not fire when scheduling the second event")
}
if recv(a.C()) {
t.Fatal("Alarm fired twice when scheduling the second event")
}
}

// This test checks that scheduling an Alarm to an earlier time than the
// one already scheduled works properly.
func TestAlarmScheduleEarlier(t *testing.T) {
clk := new(Simulated)
clk.Run(20)
a := NewAlarm(clk)

a.Schedule(clk.Now() + 50)
clk.Run(5)
a.Schedule(clk.Now() + 1)
clk.Run(3)
if !recv(a.C()) {
t.Fatal("Alarm did not fire")
}
}

// This test checks that scheduling an Alarm to a later time than the
// one already scheduled works properly.
func TestAlarmScheduleLater(t *testing.T) {
clk := new(Simulated)
clk.Run(20)
a := NewAlarm(clk)

a.Schedule(clk.Now() + 50)
clk.Run(5)
a.Schedule(clk.Now() + 100)
clk.Run(50)
if !recv(a.C()) {
t.Fatal("Alarm did not fire")
}
}

// This test checks that scheduling an Alarm in the past makes it fire immediately.
func TestAlarmNegative(t *testing.T) {
clk := new(Simulated)
clk.Run(50)
a := NewAlarm(clk)

a.Schedule(-1)
clk.Run(1) // needed to process timers
if !recv(a.C()) {
t.Fatal("Alarm did not fire for negative time")
}
}

func recv(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}
64 changes: 58 additions & 6 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ package discover

import (
"crypto/ecdsa"
crand "crypto/rand"
"encoding/binary"
"fmt"
"math/rand"
"net"
"sync"
"time"

"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/forkid"
Expand Down Expand Up @@ -72,15 +77,28 @@ type Config struct {

// These settings are optional:
NetRestrict *netutil.Netlist // list of allowed IP networks
Bootnodes []*enode.Node // list of bootstrap nodes
Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
Log log.Logger // if set, log messages go here
ValidSchemes enr.IdentityScheme // allowed identity schemes
Clock mclock.Clock
FilterFunction NodeFilterFunc // function for filtering ENR entries

// Node table configuration:
Bootnodes []*enode.Node // list of bootstrap nodes
PingInterval time.Duration // speed of node liveness check
RefreshInterval time.Duration // used in bucket refresh
}

func (cfg Config) withDefaults() Config {
// Node table configuration:
if cfg.PingInterval == 0 {
cfg.PingInterval = 3 * time.Second
}
if cfg.RefreshInterval == 0 {
cfg.RefreshInterval = 30 * time.Minute
}

// Debug/test settings:
if cfg.Log == nil {
cfg.Log = log.Root()
}
Expand All @@ -105,9 +123,43 @@ type ReadPacket struct {
Addr *net.UDPAddr
}

func min(x, y int) int {
if x > y {
return y
}
return x
type randomSource interface {
Intn(int) int
Int63n(int64) int64
Shuffle(int, func(int, int))
}

// reseedingRandom is a random number generator that tracks when it was last re-seeded.
type reseedingRandom struct {
mu sync.Mutex
cur *rand.Rand
}

func (r *reseedingRandom) seed() {
var b [8]byte
crand.Read(b[:])
seed := binary.BigEndian.Uint64(b[:])
new := rand.New(rand.NewSource(int64(seed)))

r.mu.Lock()
r.cur = new
r.mu.Unlock()
}

func (r *reseedingRandom) Intn(n int) int {
r.mu.Lock()
defer r.mu.Unlock()
return r.cur.Intn(n)
}

func (r *reseedingRandom) Int63n(n int64) int64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.cur.Int63n(n)
}

func (r *reseedingRandom) Shuffle(n int, swap func(i, j int)) {
r.mu.Lock()
defer r.mu.Unlock()
r.cur.Shuffle(n, swap)
}
32 changes: 7 additions & 25 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package discover

import (
"context"
"errors"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
Expand All @@ -28,7 +29,7 @@ import (
// not need to be an actual node identifier.
type lookup struct {
tab *Table
queryfunc func(*node) ([]*node, error)
queryfunc queryFunc
replyCh chan []*node
cancelCh <-chan struct{}
asked, seen map[enode.ID]bool
Expand Down Expand Up @@ -139,32 +140,13 @@ func (it *lookup) slowdown() {
}

func (it *lookup) query(n *node, reply chan<- []*node) {
fails := it.tab.db.FindFails(n.ID(), n.IP())
r, err := it.queryfunc(n)
if err == errClosed {
// Avoid recording failures on shutdown.
reply <- nil
return
} else if len(r) == 0 {
fails++
it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
// Remove the node from the local table if it fails to return anything useful too
// many times, but only if there are enough other nodes in the bucket.
dropped := false
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
dropped = true
it.tab.delete(n)
if !errors.Is(err, errClosed) { // avoid recording failures on shutdown.
success := len(r) > 0
it.tab.trackRequest(n, success, r)
if err != nil {
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "err", err)
}
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err)
} else if fails > 0 {
// Reset failure counter because it counts _consecutive_ failures.
it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)
}

// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
for _, n := range r {
it.tab.addSeenNode(n)
}
reply <- r
}
Expand Down
Loading