Skip to content

Commit cc979b9

Browse files
regular reprovide improvement
1 parent b060d52 commit cc979b9

File tree

2 files changed

+88
-27
lines changed

2 files changed

+88
-27
lines changed

reprovider/sweeper.go

Lines changed: 85 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package reprovider
33
import (
44
"context"
55
"errors"
6+
"slices"
67
"strconv"
78
"sync"
89
"sync/atomic"
@@ -51,6 +52,7 @@ var logger = logging.Logger("dht/ReprovideSweep")
5152

5253
var (
5354
ErrNodeOffline = errors.New("reprovider: node is offline")
55+
ErrNoKeyProvided = errors.New("reprovider: failed to provide any key")
5456
errTooManyIterationsDuringExploration = errors.New("closestPeersToPrefix needed more than maxPrefixSearches iterations")
5557
)
5658

@@ -83,10 +85,11 @@ type reprovideSweeper struct {
8385

8486
cids *trie.Trie[bit256.Key, mh.Multihash]
8587

86-
provideChan chan provideReq
87-
schedule *trie.Trie[bitstr.Key, time.Duration]
88-
scheduleLk sync.Mutex
89-
scheduleTimer *clock.Timer
88+
provideChan chan provideReq
89+
schedule *trie.Trie[bitstr.Key, time.Duration]
90+
scheduleLk sync.Mutex
91+
scheduleTimer *clock.Timer
92+
scheduleTimerStartedAt time.Time
9093

9194
failedRegionsChan chan bitstr.Key
9295
lateRegionsQueue []bitstr.Key
@@ -184,7 +187,9 @@ func (s *reprovideSweeper) run() {
184187
case <-s.scheduleTimer.C:
185188
s.handleReprovide()
186189
case prefix := <-s.failedRegionsChan:
187-
s.lateRegionsQueue = append(s.lateRegionsQueue, prefix)
190+
if !slices.Contains(s.lateRegionsQueue, prefix) {
191+
s.lateRegionsQueue = append(s.lateRegionsQueue, prefix)
192+
}
188193
if s.online.Load() {
189194
go s.onlineCheck()
190195
}
@@ -265,7 +270,7 @@ func (s *reprovideSweeper) addPrefixToScheduleNoLock(prefix bitstr.Key) {
265270
s.schedule.Add(prefix, reprovideTime)
266271

267272
currentTimeOffset := s.currentTimeOffset()
268-
timeUntilReprovide := (reprovideTime - currentTimeOffset + s.reprovideInterval) % s.reprovideInterval
273+
timeUntilReprovide := (reprovideTime-currentTimeOffset+s.reprovideInterval-1)%s.reprovideInterval + 1
269274
if s.prefixCursor == "" {
270275
s.scheduleNextReprovideNoLock(prefix, timeUntilReprovide)
271276
} else {
@@ -287,6 +292,7 @@ func (s *reprovideSweeper) addPrefixToScheduleNoLock(prefix bitstr.Key) {
287292
func (s *reprovideSweeper) scheduleNextReprovideNoLock(prefix bitstr.Key, timeUntilReprovide time.Duration) {
288293
s.prefixCursor = prefix
289294
s.scheduleTimer.Reset(timeUntilReprovide)
295+
s.scheduleTimerStartedAt = s.clock.Now()
290296
}
291297

292298
func (s *reprovideSweeper) getPrefixesForCids(cids []mh.Multihash) map[bitstr.Key]*trie.Trie[bit256.Key, mh.Multihash] {
@@ -408,35 +414,75 @@ func (s *reprovideSweeper) networkProvide(prefixes map[bitstr.Key]*trie.Trie[bit
408414
}
409415

410416
if atomic.LoadUint32(&anySuccess) == 0 {
411-
return errors.New("failed to provide any cid")
417+
return ErrNoKeyProvided
412418
}
413419
return nil
414420
}
415421

416422
func (s *reprovideSweeper) handleReprovide() {
417423
online := s.online.Load()
418424
s.scheduleLk.Lock()
419-
if online {
420-
// Remove prefix from trie if online, new schedule will be added as needed
421-
// after reprovide.
422-
s.schedule.Remove(s.prefixCursor)
423-
}
424-
// Get next prefix to reprovide, and set timer for it.
425-
next := nextNonEmptyLeaf(s.schedule, s.prefixCursor, s.order)
426425
currentPrefix := s.prefixCursor
427-
// TODO: for all prefixes between next.Data and s.currentTimeOffset(), add them to failedRegionsChan
426+
// Get next prefix to reprovide, and set timer for it.
427+
next := nextNonEmptyLeaf(s.schedule, currentPrefix, s.order)
428428

429-
var nextReprovideDelay time.Duration
430-
nextPrefix := currentPrefix
431429
if next == nil {
432-
// Empty schedule, keep current prefixCursor, and wake up in
433-
// reprovideInterval.
434-
nextReprovideDelay = s.reprovideInterval
430+
// Schedule is empty, don't reprovide anything.
431+
s.scheduleLk.Unlock()
432+
return
433+
}
434+
435+
currentTimeOffset := s.currentTimeOffset()
436+
var nextPrefix bitstr.Key
437+
var timeUntilNextReprovide time.Duration
438+
if next.Key == currentPrefix {
439+
// There is a single prefix in the schedule.
440+
nextPrefix = currentPrefix
441+
timeUntilNextReprovide = (s.reprovideTimeForPrefix(currentPrefix)-currentTimeOffset+s.reprovideInterval-1)%s.reprovideInterval + 1
435442
} else {
443+
timeSinceTimerRunning := (currentTimeOffset - s.timeOffset(s.scheduleTimerStartedAt) + s.reprovideInterval) % s.reprovideInterval
444+
timeSinceTimerUntilNext := (next.Data - s.timeOffset(s.scheduleTimerStartedAt) + s.reprovideInterval) % s.reprovideInterval
445+
446+
if s.scheduleTimerStartedAt.Add(s.reprovideInterval).Before(s.clock.Now()) {
447+
// Alarm was programmed more than reprovideInterval ago, which means that
448+
// no regions has been reprovided since. Add all regions to
449+
// failedRegionsChan. This only happens if the main thread gets blocked
450+
// for more than reprovideInterval.
451+
nextKeyFound := false
452+
scheduleEntries := allEntries(s.schedule, s.order)
453+
for _, entry := range scheduleEntries {
454+
if !nextKeyFound && entry.Data > currentTimeOffset {
455+
next = entry
456+
nextKeyFound = true
457+
}
458+
s.failedRegionsChan <- entry.Key
459+
}
460+
if !nextKeyFound {
461+
next = scheduleEntries[0]
462+
}
463+
timeUntilNextReprovide = (next.Data-currentTimeOffset+s.reprovideInterval-1)%s.reprovideInterval + 1
464+
// Don't reprovide any region now, but schedule the next one. All regions
465+
// are expected to be reprovided when the provider is catching up with
466+
// failed regions.
467+
s.scheduleNextReprovideNoLock(next.Key, timeUntilNextReprovide)
468+
s.scheduleLk.Unlock()
469+
return
470+
} else if timeSinceTimerUntilNext < timeSinceTimerRunning {
471+
// next is scheduled in the past. While next is in the past, add next to
472+
// failedRegions and take nextLeaf as next.
473+
474+
for timeSinceTimerUntilNext < timeSinceTimerRunning {
475+
s.failedRegionsChan <- next.Key
476+
next = nextNonEmptyLeaf(s.schedule, next.Key, s.order)
477+
timeSinceTimerUntilNext = (next.Data - s.timeOffset(s.scheduleTimerStartedAt) + s.reprovideInterval) % s.reprovideInterval
478+
}
479+
}
480+
// next is in the future
436481
nextPrefix = next.Key
437-
nextReprovideDelay = (s.reprovideInterval + next.Data - s.currentTimeOffset()) % s.reprovideInterval
482+
timeUntilNextReprovide = (next.Data-currentTimeOffset+s.reprovideInterval-1)%s.reprovideInterval + 1
438483
}
439-
s.scheduleNextReprovideNoLock(nextPrefix, nextReprovideDelay)
484+
485+
s.scheduleNextReprovideNoLock(nextPrefix, timeUntilNextReprovide)
440486
s.scheduleLk.Unlock()
441487

442488
// If we are offline, don't even try to reprovide region.
@@ -445,6 +491,15 @@ func (s *reprovideSweeper) handleReprovide() {
445491
return
446492
}
447493

494+
// Remove prefix that is about to be reprovided from the late regions queue
495+
// if present.
496+
for i, r := range s.lateRegionsQueue {
497+
if r == currentPrefix {
498+
s.lateRegionsQueue = slices.Delete(s.lateRegionsQueue, i, i+1)
499+
break
500+
}
501+
}
502+
448503
cids := s.cids.Copy() // NOTE: if many cids, this may have a large memory footprint
449504
go s.provideForPrefix(currentPrefix, cids, regularReprovide)
450505
}
@@ -692,7 +747,7 @@ func (s *reprovideSweeper) unscheduleSubsumedPrefixes(prefix bitstr.Key) {
692747
if next == nil {
693748
s.scheduleNextReprovideNoLock(prefix, s.reprovideInterval)
694749
} else {
695-
timeUntilReprovide := (s.reprovideInterval + next.Data - s.currentTimeOffset()) % s.reprovideInterval
750+
timeUntilReprovide := (s.reprovideInterval+next.Data-s.currentTimeOffset()-1)%s.reprovideInterval + 1
696751
s.scheduleNextReprovideNoLock(next.Key, timeUntilReprovide)
697752
}
698753
}
@@ -828,7 +883,13 @@ func (s *reprovideSweeper) scheduleNextReprovide(prefix bitstr.Key, lastReprovid
828883

829884
// currentTimeOffset returns the current time offset in the reprovide cycle.
830885
func (s *reprovideSweeper) currentTimeOffset() time.Duration {
831-
return s.clock.Now().Sub(s.cycleStart) % s.reprovideInterval
886+
return s.timeOffset(s.clock.Now())
887+
}
888+
889+
// timeOffset returns the time offset in the reprovide cycle for the given
890+
// time.
891+
func (s *reprovideSweeper) timeOffset(t time.Time) time.Duration {
892+
return t.Sub(s.cycleStart) % s.reprovideInterval
832893
}
833894

834895
const maxPrefixSize = 24

reprovider/trie_helpers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,16 +122,16 @@ func allValues[K0 kad.Key[K0], K1 kad.Key[K1], D any](t *trie.Trie[K0, D], order
122122

123123
// allKeys returns a slice containing all keys in the trie `t` sorted according
124124
// to the provided `order`.
125-
func allEntries[K0 kad.Key[K0], K1 kad.Key[K1], D any](t *trie.Trie[K0, D], order K1) []trie.Entry[K0, D] {
125+
func allEntries[K0 kad.Key[K0], K1 kad.Key[K1], D any](t *trie.Trie[K0, D], order K1) []*trie.Entry[K0, D] {
126126
return allEntriesAtDepth(t, order, 0)
127127
}
128128

129-
func allEntriesAtDepth[K0 kad.Key[K0], K1 kad.Key[K1], D any](t *trie.Trie[K0, D], order K1, depth int) []trie.Entry[K0, D] {
129+
func allEntriesAtDepth[K0 kad.Key[K0], K1 kad.Key[K1], D any](t *trie.Trie[K0, D], order K1, depth int) []*trie.Entry[K0, D] {
130130
if t.IsEmptyLeaf() {
131131
return nil
132132
}
133133
if t.IsNonEmptyLeaf() {
134-
return []trie.Entry[K0, D]{{Key: *t.Key(), Data: t.Data()}}
134+
return []*trie.Entry[K0, D]{{Key: *t.Key(), Data: t.Data()}}
135135
}
136136
b := int(order.Bit(depth))
137137
return append(allEntriesAtDepth(t.Branch(b), order, depth+1),

0 commit comments

Comments
 (0)