Skip to content

Commit 2fa5d8a

Browse files
authored
fix(dot/peerset): fix sending on closed channel race condition when dropping peer (#2573)
* set channel to nil on close, check for nil chan * refactor parallel goroutines to single goroutine * cr feedback * bump to 45m * bump up integration to 45m
1 parent 8582cb2 commit 2fa5d8a

File tree

3 files changed

+18
-28
lines changed

3 files changed

+18
-28
lines changed

.github/workflows/integration-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,4 @@ jobs:
6060
restore-keys: ${{ runner.os }}-go-mod
6161

6262
- name: Run integration tests
63-
run: go test -timeout=30m -tags integration ${{ matrix.packages }}
63+
run: go test -timeout=45m -tags integration ${{ matrix.packages }}

.github/workflows/unit-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ jobs:
6767
echo "$HOME/.local/bin" >> $GITHUB_PATH
6868
6969
- name: Run unit tests
70-
run: go test -short -coverprofile=coverage.out -covermode=atomic -timeout=30m ./...
70+
run: go test -short -coverprofile=coverage.out -covermode=atomic -timeout=45m ./...
7171

7272
- name: Test State - Race
7373
run: make test-state-race

dot/peerset/peerset.go

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -712,15 +712,28 @@ func (ps *PeerSet) start(ctx context.Context, actionQueue chan action) {
712712
ps.actionQueue = actionQueue
713713
ps.resultMsgCh = make(chan Message, msgChanSize)
714714

715-
go ps.listenAction(ctx)
716-
go ps.periodicallyAllocateSlots(ctx)
715+
go ps.listenActionAllocSlots(ctx)
717716
}
718717

719-
func (ps *PeerSet) listenAction(ctx context.Context) {
718+
func (ps *PeerSet) listenActionAllocSlots(ctx context.Context) {
719+
ticker := time.NewTicker(ps.nextPeriodicAllocSlots)
720+
721+
defer func() {
722+
ticker.Stop()
723+
close(ps.resultMsgCh)
724+
}()
725+
720726
for {
721727
select {
722728
case <-ctx.Done():
729+
logger.Debugf("peerset slot allocation exiting: %s", ctx.Err())
723730
return
731+
case <-ticker.C:
732+
for setID := 0; setID < ps.peerState.getSetLength(); setID++ {
733+
if err := ps.allocSlots(setID); err != nil {
734+
logger.Warnf("failed to allocate slots: %s", err)
735+
}
736+
}
724737
case act, ok := <-ps.actionQueue:
725738
if !ok {
726739
return
@@ -758,26 +771,3 @@ func (ps *PeerSet) listenAction(ctx context.Context) {
758771
}
759772
}
760773
}
761-
762-
func (ps *PeerSet) periodicallyAllocateSlots(ctx context.Context) {
763-
ticker := time.NewTicker(ps.nextPeriodicAllocSlots)
764-
765-
defer func() {
766-
ticker.Stop()
767-
close(ps.resultMsgCh)
768-
}()
769-
770-
for {
771-
select {
772-
case <-ctx.Done():
773-
logger.Debugf("peerset slot allocation exiting: %s", ctx.Err())
774-
return
775-
case <-ticker.C:
776-
for setID := 0; setID < ps.peerState.getSetLength(); setID++ {
777-
if err := ps.allocSlots(setID); err != nil {
778-
logger.Warnf("failed to allocate slots: %s", err)
779-
}
780-
}
781-
}
782-
}
783-
}

0 commit comments

Comments
 (0)