Skip to content

fix(lib/babe): Unrestricted Loop When Building Blocks (GSR-19) #2632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
fdaadd6
add test, add time.Sleep in loop
edwardmack Jun 29, 2022
f2f05ed
add copyright notice
edwardmack Jun 29, 2022
e431b70
Channel based notification for queue Push
qdm12 Jun 30, 2022
a45be38
trying to add channel closed check
edwardmack Jul 1, 2022
4ef5e81
add check when creating channel
edwardmack Jul 6, 2022
4c70d07
add check in Push func to check channel status
edwardmack Jul 6, 2022
02522a8
address lint issues
edwardmack Jul 6, 2022
7c1add4
refactor test with nil pop result
edwardmack Jul 6, 2022
046eb19
WIP, trying to make cancel channel for ticker
edwardmack Jul 6, 2022
bf7b414
address PR comments
edwardmack Jul 7, 2022
4c336e2
formatting, remove newline
edwardmack Jul 7, 2022
1121a3f
WIP: add to tests
edwardmack Jul 7, 2022
5837992
wip
qdm12 Jul 7, 2022
d0e52d3
add to tests to increase coverage
edwardmack Jul 8, 2022
35b8e1b
add check for closed channel
edwardmack Jul 13, 2022
f2321b4
remove nil assignment
edwardmack Jul 14, 2022
cf05044
remove flacky build unit tests, handled by integration tests
edwardmack Jul 20, 2022
2fe32fe
remove test time channel, add intergration test
edwardmack Jul 25, 2022
ded3520
WIP - refactor test
edwardmack Jul 27, 2022
4b2343e
updated integration test, increase coverage
edwardmack Jul 27, 2022
ce546cf
WIP - implement transactionState.PopChannel
edwardmack Jul 28, 2022
6d96206
WIP - stub PopChannel2
edwardmack Aug 10, 2022
cf69b05
refactor use of pop channel
edwardmack Aug 11, 2022
0ba0ae5
address linter issues, remove commented lines
edwardmack Aug 11, 2022
d4c6bd6
remove unused var
edwardmack Aug 11, 2022
792253a
add sleep to pop channel func
edwardmack Aug 11, 2022
673a395
refactor test to confirm priority
edwardmack Aug 11, 2022
7808743
re-generate mocks
edwardmack Aug 11, 2022
ee3664b
introduce pop func to use pollTimer
edwardmack Sep 14, 2022
68195bc
`PopChannel` -> `PopWithTimer`
qdm12 Sep 22, 2022
273258f
regenerate mocks
edwardmack Sep 23, 2022
c4ef178
add pop check, update pop with timer tests
edwardmack Oct 3, 2022
438e4ca
pass time.Time channel to PopWithTimer, move test
edwardmack Oct 6, 2022
9eb8833
regenrate mocks
edwardmack Oct 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dot/state/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package state

import (
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/telemetry"

Expand Down Expand Up @@ -47,6 +48,12 @@ func (s *TransactionState) Pop() *transaction.ValidTransaction {
return s.queue.Pop()
}

// PopWithTimer returns the next valid transaction from the queue.
// When the timer expires, it returns `nil`.
func (s *TransactionState) PopWithTimer(timerCh <-chan time.Time) (transaction *transaction.ValidTransaction) {
return s.queue.PopWithTimer(timerCh)
}

// Peek returns the head of the queue without removing it
func (s *TransactionState) Peek() *transaction.ValidTransaction {
return s.queue.Peek()
Expand Down
19 changes: 9 additions & 10 deletions lib/babe/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,15 @@ func (b *BlockBuilder) buildBlockSeal(header *types.Header) (*types.SealDigest,
func (b *BlockBuilder) buildBlockExtrinsics(slot Slot, rt runtime.Instance) []*transaction.ValidTransaction {
var included []*transaction.ValidTransaction

for !hasSlotEnded(slot) {
txn := b.transactionState.Pop()
// Transaction queue is empty.
if txn == nil {
continue
slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation
timeout := time.Until(slotEnd)
slotTimer := time.NewTimer(timeout)

for {
txn := b.transactionState.PopWithTimer(slotTimer.C)
slotTimerExpired := txn == nil
if slotTimerExpired {
break
}

extrinsic := txn.Extrinsic
Expand Down Expand Up @@ -287,11 +291,6 @@ func (b *BlockBuilder) addToQueue(txs []*transaction.ValidTransaction) {
}
}

func hasSlotEnded(slot Slot) bool {
slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation
return time.Since(slotEnd) >= 0
}

func extrinsicsToBody(inherents [][]byte, txs []*transaction.ValidTransaction) (types.Body, error) {
extrinsics := types.BytesArrayToExtrinsics(inherents)

Expand Down
14 changes: 14 additions & 0 deletions lib/babe/mock_state_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/babe/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type TransactionState interface {
Push(vt *transaction.ValidTransaction) (common.Hash, error)
Pop() *transaction.ValidTransaction
Peek() *transaction.ValidTransaction
PopWithTimer(timerCh <-chan time.Time) (tx *transaction.ValidTransaction)
}

// EpochState is the interface for epoch methods
Expand Down
46 changes: 41 additions & 5 deletions lib/transaction/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"container/heap"
"errors"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
Expand Down Expand Up @@ -77,17 +78,18 @@ func (pq *priorityQueue) Pop() interface{} {

// PriorityQueue is a thread safe wrapper over `priorityQueue`
type PriorityQueue struct {
pq priorityQueue
currOrder uint64
txs map[common.Hash]*Item
pq priorityQueue
currOrder uint64
txs map[common.Hash]*Item
pollInterval time.Duration
sync.Mutex
}

// NewPriorityQueue creates new instance of PriorityQueue
func NewPriorityQueue() *PriorityQueue {
spq := &PriorityQueue{
pq: make(priorityQueue, 0),
txs: make(map[common.Hash]*Item),
txs: make(map[common.Hash]*Item),
pollInterval: 10 * time.Millisecond,
}

heap.Init(&spq.pq)
Expand Down Expand Up @@ -139,6 +141,40 @@ func (spq *PriorityQueue) Push(txn *ValidTransaction) (common.Hash, error) {
return hash, nil
}

// PopWithTimer returns the next valid transaction from the queue.
// When the timer expires, it returns `nil`.
func (spq *PriorityQueue) PopWithTimer(timerCh <-chan time.Time) (transaction *ValidTransaction) {
transaction = spq.Pop()
if transaction != nil {
return transaction
}

transactionChannel := make(chan *ValidTransaction)
go func() {
pollTicker := time.NewTicker(spq.pollInterval)
defer pollTicker.Stop()

for {
select {
case <-timerCh:
transactionChannel <- nil
return
case <-pollTicker.C:
}

transaction := spq.Pop()
if transaction == nil {
continue
}

transactionChannel <- transaction
return
}
}()

return <-transactionChannel
}

// Pop removes the transaction with has the highest priority value from the queue and returns it.
// If there are multiple transaction with same priority value then it return them in FIFO order.
func (spq *PriorityQueue) Pop() *ValidTransaction {
Expand Down
57 changes: 57 additions & 0 deletions lib/transaction/priority_queue_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2021 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

//go:build integration

package transaction

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_PopWithTimer(t *testing.T) {
pq := NewPriorityQueue()
slotTimer := time.NewTimer(time.Second)

tests := []*ValidTransaction{
{
Extrinsic: []byte("a"),
Validity: &Validity{Priority: 1},
},
{
Extrinsic: []byte("b"),
Validity: &Validity{Priority: 4},
},
{
Extrinsic: []byte("c"),
Validity: &Validity{Priority: 2},
},
{
Extrinsic: []byte("d"),
Validity: &Validity{Priority: 17},
},
{
Extrinsic: []byte("e"),
Validity: &Validity{Priority: 2},
},
}

expected := []int{3, 1, 2, 4, 0}

for _, test := range tests {
pq.Push(test)
}

counter := 0
for {
txn := pq.PopWithTimer(slotTimer.C)
if txn == nil {
break
}
assert.Equal(t, tests[expected[counter]], txn)
counter++
}
}
75 changes: 75 additions & 0 deletions lib/transaction/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestPriorityQueue(t *testing.T) {
Expand Down Expand Up @@ -270,3 +272,76 @@ func TestRemoveExtrinsic(t *testing.T) {
t.Fatalf("Fail: got %v expected %v", res, tests[1])
}
}

func Test_PriorityQueue_PopWithTimer(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
queueBuilder func() *PriorityQueue
queueModifier func(queue *PriorityQueue, done chan<- struct{})
timer *time.Timer
transaction *ValidTransaction
}{
"empty queue polled once": {
// test should last 1ns
queueBuilder: NewPriorityQueue,
timer: time.NewTimer(time.Nanosecond),
},
"empty queue polled multiple times": {
// test should last 1ms
queueBuilder: func() *PriorityQueue {
queue := NewPriorityQueue()
queue.pollInterval = time.Nanosecond
return queue
},
timer: time.NewTimer(time.Millisecond),
},
"queue with one element polled once": {
// test should be instantaneous
queueBuilder: func() *PriorityQueue {
queue := NewPriorityQueue()
queue.Push(&ValidTransaction{Validity: &Validity{Priority: 1}})
return queue
},
timer: time.NewTimer(time.Nanosecond),
transaction: &ValidTransaction{Validity: &Validity{Priority: 1}},
},
"queue polled multiple times until new element": {
// test should last 1ms
queueBuilder: func() *PriorityQueue {
queue := NewPriorityQueue()
queue.pollInterval = time.Nanosecond
return queue
},
queueModifier: func(queue *PriorityQueue, done chan<- struct{}) {
close(done)
time.Sleep(time.Millisecond)
queue.Push(&ValidTransaction{Validity: &Validity{Priority: 1}})
},
timer: time.NewTimer(time.Second),
transaction: &ValidTransaction{Validity: &Validity{Priority: 1}},
},
}

for name, testCase := range testCases {
testCase := testCase
t.Run(name, func(t *testing.T) {
t.Parallel()

queue := testCase.queueBuilder()

modifyDone := make(chan struct{})
if testCase.queueModifier != nil {
// modify queue asynchronously while popping
go testCase.queueModifier(queue, modifyDone)
} else {
close(modifyDone)
}

transaction := queue.PopWithTimer(testCase.timer.C)
<-modifyDone
testCase.timer.Stop()
assert.Equal(t, testCase.transaction, transaction)
})
}
}