Skip to content

Commit 139ad89

Browse files
edwardmackqdm12
andauthored
fix(lib/babe): Unrestricted Loop When Building Blocks (GSR-19) (#2632)
Co-authored-by: Quentin McGaw <[email protected]>
1 parent 736cbc8 commit 139ad89

File tree

7 files changed

+204
-15
lines changed

7 files changed

+204
-15
lines changed

dot/state/transaction.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package state
55

66
import (
77
"sync"
8+
"time"
89

910
"github.com/ChainSafe/gossamer/dot/telemetry"
1011

@@ -47,6 +48,12 @@ func (s *TransactionState) Pop() *transaction.ValidTransaction {
4748
return s.queue.Pop()
4849
}
4950

51+
// PopWithTimer returns the next valid transaction from the queue.
52+
// When the timer expires, it returns `nil`.
53+
func (s *TransactionState) PopWithTimer(timerCh <-chan time.Time) (transaction *transaction.ValidTransaction) {
54+
return s.queue.PopWithTimer(timerCh)
55+
}
56+
5057
// Peek returns the head of the queue without removing it
5158
func (s *TransactionState) Peek() *transaction.ValidTransaction {
5259
return s.queue.Peek()

lib/babe/build.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,15 @@ func (b *BlockBuilder) buildBlockSeal(header *types.Header) (*types.SealDigest,
174174
func (b *BlockBuilder) buildBlockExtrinsics(slot Slot, rt runtime.Instance) []*transaction.ValidTransaction {
175175
var included []*transaction.ValidTransaction
176176

177-
for !hasSlotEnded(slot) {
178-
txn := b.transactionState.Pop()
179-
// Transaction queue is empty.
180-
if txn == nil {
181-
continue
177+
slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation
178+
timeout := time.Until(slotEnd)
179+
slotTimer := time.NewTimer(timeout)
180+
181+
for {
182+
txn := b.transactionState.PopWithTimer(slotTimer.C)
183+
slotTimerExpired := txn == nil
184+
if slotTimerExpired {
185+
break
182186
}
183187

184188
extrinsic := txn.Extrinsic
@@ -287,11 +291,6 @@ func (b *BlockBuilder) addToQueue(txs []*transaction.ValidTransaction) {
287291
}
288292
}
289293

290-
func hasSlotEnded(slot Slot) bool {
291-
slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation
292-
return time.Since(slotEnd) >= 0
293-
}
294-
295294
func extrinsicsToBody(inherents [][]byte, txs []*transaction.ValidTransaction) (types.Body, error) {
296295
extrinsics := types.BytesArrayToExtrinsics(inherents)
297296

lib/babe/mock_state_test.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/babe/state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type TransactionState interface {
5656
Push(vt *transaction.ValidTransaction) (common.Hash, error)
5757
Pop() *transaction.ValidTransaction
5858
Peek() *transaction.ValidTransaction
59+
PopWithTimer(timerCh <-chan time.Time) (tx *transaction.ValidTransaction)
5960
}
6061

6162
// EpochState is the interface for epoch methods

lib/transaction/priority_queue.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"container/heap"
88
"errors"
99
"sync"
10+
"time"
1011

1112
"github.com/ChainSafe/gossamer/dot/types"
1213
"github.com/ChainSafe/gossamer/lib/common"
@@ -77,17 +78,18 @@ func (pq *priorityQueue) Pop() interface{} {
7778

7879
// PriorityQueue is a thread safe wrapper over `priorityQueue`
7980
type PriorityQueue struct {
80-
pq priorityQueue
81-
currOrder uint64
82-
txs map[common.Hash]*Item
81+
pq priorityQueue
82+
currOrder uint64
83+
txs map[common.Hash]*Item
84+
pollInterval time.Duration
8385
sync.Mutex
8486
}
8587

8688
// NewPriorityQueue creates new instance of PriorityQueue
8789
func NewPriorityQueue() *PriorityQueue {
8890
spq := &PriorityQueue{
89-
pq: make(priorityQueue, 0),
90-
txs: make(map[common.Hash]*Item),
91+
txs: make(map[common.Hash]*Item),
92+
pollInterval: 10 * time.Millisecond,
9193
}
9294

9395
heap.Init(&spq.pq)
@@ -139,6 +141,40 @@ func (spq *PriorityQueue) Push(txn *ValidTransaction) (common.Hash, error) {
139141
return hash, nil
140142
}
141143

144+
// PopWithTimer returns the next valid transaction from the queue.
145+
// When the timer expires, it returns `nil`.
146+
func (spq *PriorityQueue) PopWithTimer(timerCh <-chan time.Time) (transaction *ValidTransaction) {
147+
transaction = spq.Pop()
148+
if transaction != nil {
149+
return transaction
150+
}
151+
152+
transactionChannel := make(chan *ValidTransaction)
153+
go func() {
154+
pollTicker := time.NewTicker(spq.pollInterval)
155+
defer pollTicker.Stop()
156+
157+
for {
158+
select {
159+
case <-timerCh:
160+
transactionChannel <- nil
161+
return
162+
case <-pollTicker.C:
163+
}
164+
165+
transaction := spq.Pop()
166+
if transaction == nil {
167+
continue
168+
}
169+
170+
transactionChannel <- transaction
171+
return
172+
}
173+
}()
174+
175+
return <-transactionChannel
176+
}
177+
142178
// Pop removes the transaction with has the highest priority value from the queue and returns it.
143179
// If there are multiple transaction with same priority value then it return them in FIFO order.
144180
func (spq *PriorityQueue) Pop() *ValidTransaction {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2021 ChainSafe Systems (ON)
2+
// SPDX-License-Identifier: LGPL-3.0-only
3+
4+
//go:build integration
5+
6+
package transaction
7+
8+
import (
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func Test_PopWithTimer(t *testing.T) {
16+
pq := NewPriorityQueue()
17+
slotTimer := time.NewTimer(time.Second)
18+
19+
tests := []*ValidTransaction{
20+
{
21+
Extrinsic: []byte("a"),
22+
Validity: &Validity{Priority: 1},
23+
},
24+
{
25+
Extrinsic: []byte("b"),
26+
Validity: &Validity{Priority: 4},
27+
},
28+
{
29+
Extrinsic: []byte("c"),
30+
Validity: &Validity{Priority: 2},
31+
},
32+
{
33+
Extrinsic: []byte("d"),
34+
Validity: &Validity{Priority: 17},
35+
},
36+
{
37+
Extrinsic: []byte("e"),
38+
Validity: &Validity{Priority: 2},
39+
},
40+
}
41+
42+
expected := []int{3, 1, 2, 4, 0}
43+
44+
for _, test := range tests {
45+
pq.Push(test)
46+
}
47+
48+
counter := 0
49+
for {
50+
txn := pq.PopWithTimer(slotTimer.C)
51+
if txn == nil {
52+
break
53+
}
54+
assert.Equal(t, tests[expected[counter]], txn)
55+
counter++
56+
}
57+
}

lib/transaction/priority_queue_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"sync"
99
"testing"
1010
"time"
11+
12+
"github.com/stretchr/testify/assert"
1113
)
1214

1315
func TestPriorityQueue(t *testing.T) {
@@ -270,3 +272,76 @@ func TestRemoveExtrinsic(t *testing.T) {
270272
t.Fatalf("Fail: got %v expected %v", res, tests[1])
271273
}
272274
}
275+
276+
func Test_PriorityQueue_PopWithTimer(t *testing.T) {
277+
t.Parallel()
278+
279+
testCases := map[string]struct {
280+
queueBuilder func() *PriorityQueue
281+
queueModifier func(queue *PriorityQueue, done chan<- struct{})
282+
timer *time.Timer
283+
transaction *ValidTransaction
284+
}{
285+
"empty queue polled once": {
286+
// test should last 1ns
287+
queueBuilder: NewPriorityQueue,
288+
timer: time.NewTimer(time.Nanosecond),
289+
},
290+
"empty queue polled multiple times": {
291+
// test should last 1ms
292+
queueBuilder: func() *PriorityQueue {
293+
queue := NewPriorityQueue()
294+
queue.pollInterval = time.Nanosecond
295+
return queue
296+
},
297+
timer: time.NewTimer(time.Millisecond),
298+
},
299+
"queue with one element polled once": {
300+
// test should be instantaneous
301+
queueBuilder: func() *PriorityQueue {
302+
queue := NewPriorityQueue()
303+
queue.Push(&ValidTransaction{Validity: &Validity{Priority: 1}})
304+
return queue
305+
},
306+
timer: time.NewTimer(time.Nanosecond),
307+
transaction: &ValidTransaction{Validity: &Validity{Priority: 1}},
308+
},
309+
"queue polled multiple times until new element": {
310+
// test should last 1ms
311+
queueBuilder: func() *PriorityQueue {
312+
queue := NewPriorityQueue()
313+
queue.pollInterval = time.Nanosecond
314+
return queue
315+
},
316+
queueModifier: func(queue *PriorityQueue, done chan<- struct{}) {
317+
close(done)
318+
time.Sleep(time.Millisecond)
319+
queue.Push(&ValidTransaction{Validity: &Validity{Priority: 1}})
320+
},
321+
timer: time.NewTimer(time.Second),
322+
transaction: &ValidTransaction{Validity: &Validity{Priority: 1}},
323+
},
324+
}
325+
326+
for name, testCase := range testCases {
327+
testCase := testCase
328+
t.Run(name, func(t *testing.T) {
329+
t.Parallel()
330+
331+
queue := testCase.queueBuilder()
332+
333+
modifyDone := make(chan struct{})
334+
if testCase.queueModifier != nil {
335+
// modify queue asynchronously while popping
336+
go testCase.queueModifier(queue, modifyDone)
337+
} else {
338+
close(modifyDone)
339+
}
340+
341+
transaction := queue.PopWithTimer(testCase.timer.C)
342+
<-modifyDone
343+
testCase.timer.Stop()
344+
assert.Equal(t, testCase.transaction, transaction)
345+
})
346+
}
347+
}

0 commit comments

Comments
 (0)