Skip to content

Commit f351aa6

Browse files
committed
introduce pop func to use pollTimer
1 parent e3f9bf2 commit f351aa6

File tree

2 files changed

+69
-14
lines changed

2 files changed

+69
-14
lines changed

lib/transaction/priority_queue.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,19 @@ func (pq *priorityQueue) Pop() interface{} {
7878

7979
// PriorityQueue is a thread safe wrapper over `priorityQueue`
8080
type PriorityQueue struct {
81-
pq priorityQueue
82-
currOrder uint64
83-
txs map[common.Hash]*Item
84-
sleepTime time.Duration
81+
pq priorityQueue
82+
currOrder uint64
83+
txs map[common.Hash]*Item
84+
pollInterval time.Duration
8585
sync.Mutex
8686
}
8787

8888
// NewPriorityQueue creates new instance of PriorityQueue
8989
func NewPriorityQueue() *PriorityQueue {
9090
spq := &PriorityQueue{
91-
pq: make(priorityQueue, 0),
92-
txs: make(map[common.Hash]*Item),
93-
sleepTime: time.Millisecond * 10,
91+
pq: make(priorityQueue, 0),
92+
txs: make(map[common.Hash]*Item),
93+
pollInterval: time.Millisecond * 10,
9494
}
9595

9696
heap.Init(&spq.pq)
@@ -147,18 +147,23 @@ func (spq *PriorityQueue) Push(txn *ValidTransaction) (common.Hash, error) {
147147
func (spq *PriorityQueue) PopChannel(timer *time.Timer) (tx chan *ValidTransaction) {
148148
popChannel := make(chan *ValidTransaction)
149149
go func() {
150+
var pollTimer <-chan time.Time = nil
151+
var pop = func() {
152+
txn := spq.Pop()
153+
if txn != nil {
154+
popChannel <- txn
155+
} else {
156+
pollTimer = time.NewTimer(spq.pollInterval).C
157+
}
158+
}
150159
for {
151160
select {
152161
case <-timer.C:
153162
close(popChannel)
163+
case <-pollTimer:
164+
pop()
154165
default:
155-
txn := spq.Pop()
156-
157-
if txn != nil {
158-
popChannel <- txn
159-
} else {
160-
time.Sleep(spq.sleepTime)
161-
}
166+
pop()
162167
}
163168
}
164169
}()

lib/transaction/priority_queue_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,3 +313,53 @@ func TestPopChannel(t *testing.T) {
313313
counter++
314314
}
315315
}
316+
317+
func TestPopChannelEnds(t *testing.T) {
318+
pq := NewPriorityQueue()
319+
// increase sleep time greater than timer
320+
pq.pollInterval = 2 * time.Second
321+
slotTimer := time.NewTimer(time.Second)
322+
323+
start := time.Now()
324+
325+
popChan := pq.PopChannel(slotTimer)
326+
tests := []*ValidTransaction{
327+
{
328+
Extrinsic: []byte("a"),
329+
Validity: &Validity{Priority: 1},
330+
},
331+
{
332+
Extrinsic: []byte("b"),
333+
Validity: &Validity{Priority: 4},
334+
},
335+
{
336+
Extrinsic: []byte("c"),
337+
Validity: &Validity{Priority: 2},
338+
},
339+
{
340+
Extrinsic: []byte("d"),
341+
Validity: &Validity{Priority: 17},
342+
},
343+
{
344+
Extrinsic: []byte("e"),
345+
Validity: &Validity{Priority: 2},
346+
},
347+
}
348+
349+
expected := []int{3, 1, 2, 4, 0}
350+
351+
for _, test := range tests {
352+
pq.Push(test)
353+
}
354+
355+
counter := 0
356+
for txn := range popChan {
357+
assert.Equal(t, tests[expected[counter]], txn)
358+
counter++
359+
}
360+
361+
d := time.Since(start)
362+
// assert between 1s and 1.1s
363+
assert.GreaterOrEqual(t, d, time.Second)
364+
assert.LessOrEqual(t, d, time.Second+(time.Millisecond*100))
365+
}

0 commit comments

Comments
 (0)