Skip to content

Commit 69031a6

Browse files
fix(dot/sync): Gossip BlockAnnounceMessage only after successfully imported (#2885)
Co-authored-by: Timothy Wu <[email protected]>
1 parent 405db51 commit 69031a6

File tree

6 files changed

+170
-40
lines changed

6 files changed

+170
-40
lines changed

dot/core/service.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,40 +127,63 @@ func (s *Service) StorageRoot() (common.Hash, error) {
127127

128128
// HandleBlockImport handles a block that was imported via the network
129129
func (s *Service) HandleBlockImport(block *types.Block, state *rtstorage.TrieState) error {
130-
return s.handleBlock(block, state)
130+
err := s.handleBlock(block, state)
131+
if err != nil {
132+
return fmt.Errorf("handling block: %w", err)
133+
}
134+
135+
bestBlockHash := s.blockState.BestBlockHash()
136+
isBestBlock := bestBlockHash.Equal(block.Header.Hash())
137+
138+
blockAnnounce, err := createBlockAnnounce(block, isBestBlock)
139+
if err != nil {
140+
return fmt.Errorf("creating block announce: %w", err)
141+
}
142+
143+
s.net.GossipMessage(blockAnnounce)
144+
return nil
131145
}
132146

133147
// HandleBlockProduced handles a block that was produced by us
134148
// It is handled the same as an imported block in terms of state updates; the only difference
135149
// is we send a BlockAnnounceMessage to our peers.
136150
func (s *Service) HandleBlockProduced(block *types.Block, state *rtstorage.TrieState) error {
137-
if err := s.handleBlock(block, state); err != nil {
138-
return err
151+
err := s.handleBlock(block, state)
152+
if err != nil {
153+
return fmt.Errorf("handling block: %w", err)
139154
}
140155

156+
blockAnnounce, err := createBlockAnnounce(block, true)
157+
if err != nil {
158+
return fmt.Errorf("creating block announce: %w", err)
159+
}
160+
161+
s.net.GossipMessage(blockAnnounce)
162+
return nil
163+
}
164+
165+
func createBlockAnnounce(block *types.Block, isBestBlock bool) (
166+
blockAnnounce *network.BlockAnnounceMessage, err error) {
141167
digest := types.NewDigest()
142168
for i := range block.Header.Digest.Types {
143169
digestValue, err := block.Header.Digest.Types[i].Value()
144170
if err != nil {
145-
return fmt.Errorf("getting value of digest type at index %d: %w", i, err)
171+
return nil, fmt.Errorf("getting value of digest type at index %d: %w", i, err)
146172
}
147173
err = digest.Add(digestValue)
148174
if err != nil {
149-
return err
175+
return nil, fmt.Errorf("adding digest value for type at index %d: %w", i, err)
150176
}
151177
}
152178

153-
msg := &network.BlockAnnounceMessage{
179+
return &network.BlockAnnounceMessage{
154180
ParentHash: block.Header.ParentHash,
155181
Number: block.Header.Number,
156182
StateRoot: block.Header.StateRoot,
157183
ExtrinsicsRoot: block.Header.ExtrinsicsRoot,
158184
Digest: digest,
159-
BestBlock: true,
160-
}
161-
162-
s.net.GossipMessage(msg)
163-
return nil
185+
BestBlock: isBestBlock,
186+
}, nil
164187
}
165188

166189
func (s *Service) handleBlock(block *types.Block, state *rtstorage.TrieState) error {

dot/core/service_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -480,9 +480,9 @@ func Test_Service_HandleBlockProduced(t *testing.T) {
480480
t.Parallel()
481481
execTest := func(t *testing.T, s *Service, block *types.Block, trieState *rtstorage.TrieState, expErr error) {
482482
err := s.HandleBlockProduced(block, trieState)
483-
assert.ErrorIs(t, err, expErr)
483+
require.ErrorIs(t, err, expErr)
484484
if expErr != nil {
485-
assert.EqualError(t, err, expErr.Error())
485+
assert.EqualError(t, err, "handling block: "+expErr.Error())
486486
}
487487
}
488488
t.Run("nil input", func(t *testing.T) {

dot/network/block_announce.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/ChainSafe/gossamer/dot/peerset"
1111
"github.com/ChainSafe/gossamer/dot/types"
12+
"github.com/ChainSafe/gossamer/lib/blocktree"
1213
"github.com/ChainSafe/gossamer/lib/common"
1314
"github.com/ChainSafe/gossamer/pkg/scale"
1415

@@ -208,9 +209,10 @@ func (s *Service) handleBlockAnnounceMessage(from peer.ID, msg NotificationsMess
208209
return false, errors.New("invalid message")
209210
}
210211

211-
if err = s.syncer.HandleBlockAnnounce(from, bam); err != nil {
212-
return false, err
212+
err = s.syncer.HandleBlockAnnounce(from, bam)
213+
if errors.Is(err, blocktree.ErrBlockExists) {
214+
return true, nil
213215
}
214216

215-
return true, nil
217+
return false, err
216218
}

dot/network/block_announce_test.go

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ import (
77
"testing"
88

99
"github.com/ChainSafe/gossamer/dot/types"
10+
"github.com/ChainSafe/gossamer/lib/blocktree"
1011
"github.com/ChainSafe/gossamer/lib/common"
1112
"github.com/ChainSafe/gossamer/pkg/scale"
13+
gomock "github.com/golang/mock/gomock"
1214

1315
"github.com/libp2p/go-libp2p-core/peer"
1416
"github.com/stretchr/testify/require"
@@ -126,24 +128,56 @@ func TestDecodeBlockAnnounceHandshake(t *testing.T) {
126128
func TestHandleBlockAnnounceMessage(t *testing.T) {
127129
t.Parallel()
128130

129-
config := &Config{
130-
BasePath: t.TempDir(),
131-
Port: availablePort(t),
132-
NoBootstrap: true,
133-
NoMDNS: true,
131+
testCases := map[string]struct {
132+
propagate bool
133+
mockSyncer func(*testing.T, peer.ID, *BlockAnnounceMessage) Syncer
134+
}{
135+
"block already exists": {
136+
mockSyncer: func(t *testing.T, peer peer.ID, blockAnnounceMessage *BlockAnnounceMessage) Syncer {
137+
ctrl := gomock.NewController(t)
138+
syncer := NewMockSyncer(ctrl)
139+
syncer.EXPECT().
140+
HandleBlockAnnounce(peer, blockAnnounceMessage).
141+
Return(blocktree.ErrBlockExists)
142+
return syncer
143+
},
144+
propagate: true,
145+
},
146+
"block does not exists": {
147+
propagate: false,
148+
},
134149
}
135150

136-
s := createTestService(t, config)
151+
for tname, tt := range testCases {
152+
tt := tt
137153

138-
peerID := peer.ID("noot")
139-
msg := &BlockAnnounceMessage{
140-
Number: 10,
141-
Digest: types.NewDigest(),
142-
}
154+
t.Run(tname, func(t *testing.T) {
155+
t.Parallel()
143156

144-
propagate, err := s.handleBlockAnnounceMessage(peerID, msg)
145-
require.NoError(t, err)
146-
require.True(t, propagate)
157+
config := &Config{
158+
BasePath: t.TempDir(),
159+
Port: availablePort(t),
160+
NoBootstrap: true,
161+
NoMDNS: true,
162+
}
163+
164+
peerID := peer.ID("noot")
165+
msg := &BlockAnnounceMessage{
166+
Number: 10,
167+
Digest: types.NewDigest(),
168+
}
169+
170+
if tt.mockSyncer != nil {
171+
config.Syncer = tt.mockSyncer(t, peerID, msg)
172+
}
173+
174+
service := createTestService(t, config)
175+
gotPropagate, err := service.handleBlockAnnounceMessage(peerID, msg)
176+
177+
require.NoError(t, err)
178+
require.Equal(t, tt.propagate, gotPropagate)
179+
})
180+
}
147181
}
148182

149183
func TestValidateBlockAnnounceHandshake(t *testing.T) {

dot/sync/chain_sync.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/ChainSafe/gossamer/dot/network"
2222
"github.com/ChainSafe/gossamer/dot/peerset"
2323
"github.com/ChainSafe/gossamer/dot/types"
24+
"github.com/ChainSafe/gossamer/lib/blocktree"
2425
"github.com/ChainSafe/gossamer/lib/common"
2526
"github.com/ChainSafe/gossamer/lib/common/variadic"
2627
)
@@ -251,7 +252,7 @@ func (cs *chainSync) setBlockAnnounce(from peer.ID, header *types.Header) error
251252
}
252253

253254
if has {
254-
return nil
255+
return blocktree.ErrBlockExists
255256
}
256257

257258
if err = cs.pendingBlocks.addHeader(header); err != nil {
@@ -627,19 +628,19 @@ func (cs *chainSync) tryDispatchWorker(w *worker) {
627628
// if it fails due to any reason, it sets the worker `err` and returns
628629
// this function always places the worker into the `resultCh` for result handling upon return
629630
func (cs *chainSync) dispatchWorker(w *worker) {
631+
if w.targetNumber == nil || w.startNumber == nil {
632+
return
633+
}
634+
630635
logger.Debugf("dispatching sync worker id %d, "+
631636
"start number %d, target number %d, "+
632637
"start hash %s, target hash %s, "+
633638
"request data %d, direction %s",
634639
w.id,
635-
w.startNumber, w.targetNumber,
640+
*w.startNumber, *w.targetNumber,
636641
w.startHash, w.targetHash,
637642
w.requestData, w.direction)
638643

639-
if w.targetNumber == nil || w.startNumber == nil {
640-
return
641-
}
642-
643644
start := time.Now()
644645
defer func() {
645646
end := time.Now()

dot/sync/chain_sync_test.go

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/ChainSafe/gossamer/dot/network"
1313
"github.com/ChainSafe/gossamer/dot/peerset"
1414
"github.com/ChainSafe/gossamer/dot/types"
15+
"github.com/ChainSafe/gossamer/lib/blocktree"
1516
"github.com/ChainSafe/gossamer/lib/common"
1617
"github.com/ChainSafe/gossamer/lib/common/variadic"
1718
"github.com/ChainSafe/gossamer/lib/trie"
@@ -1247,20 +1248,23 @@ func Test_chainSync_start(t *testing.T) {
12471248
}
12481249

12491250
func Test_chainSync_setBlockAnnounce(t *testing.T) {
1251+
t.Parallel()
1252+
12501253
type args struct {
12511254
from peer.ID
12521255
header *types.Header
12531256
}
12541257
tests := map[string]struct {
1255-
chainSyncBuilder func(ctrl *gomock.Controller) chainSync
1258+
chainSyncBuilder func(*types.Header, *gomock.Controller) chainSync
12561259
args args
12571260
wantErr error
12581261
}{
12591262
"base case": {
1263+
wantErr: blocktree.ErrBlockExists,
12601264
args: args{
12611265
header: &types.Header{Number: 2},
12621266
},
1263-
chainSyncBuilder: func(ctrl *gomock.Controller) chainSync {
1267+
chainSyncBuilder: func(_ *types.Header, ctrl *gomock.Controller) chainSync {
12641268
mockBlockState := NewMockBlockState(ctrl)
12651269
mockBlockState.EXPECT().HasHeader(common.MustHexToHash(
12661270
"0x05bdcc454f60a08d427d05e7f19f240fdc391f570ab76fcb96ecca0b5823d3bf")).Return(true, nil)
@@ -1271,13 +1275,79 @@ func Test_chainSync_setBlockAnnounce(t *testing.T) {
12711275
}
12721276
},
12731277
},
1278+
"err_when_calling_has_header": {
1279+
wantErr: errors.New("checking header exists"),
1280+
args: args{
1281+
header: &types.Header{Number: 2},
1282+
},
1283+
chainSyncBuilder: func(_ *types.Header, ctrl *gomock.Controller) chainSync {
1284+
mockBlockState := NewMockBlockState(ctrl)
1285+
mockBlockState.EXPECT().
1286+
HasHeader(common.MustHexToHash(
1287+
"0x05bdcc454f60a08d427d05e7f19f240fdc391f570ab76fcb96ecca0b5823d3bf")).
1288+
Return(false, errors.New("checking header exists"))
1289+
mockDisjointBlockSet := NewMockDisjointBlockSet(ctrl)
1290+
return chainSync{
1291+
blockState: mockBlockState,
1292+
pendingBlocks: mockDisjointBlockSet,
1293+
}
1294+
},
1295+
},
1296+
"adding_block_header_to_pending_blocks": {
1297+
args: args{
1298+
header: &types.Header{Number: 2},
1299+
},
1300+
chainSyncBuilder: func(expectedHeader *types.Header, ctrl *gomock.Controller) chainSync {
1301+
argumentHeaderHash := common.MustHexToHash(
1302+
"0x05bdcc454f60a08d427d05e7f19f240fdc391f570ab76fcb96ecca0b5823d3bf")
1303+
1304+
mockBlockState := NewMockBlockState(ctrl)
1305+
mockBlockState.EXPECT().
1306+
HasHeader(argumentHeaderHash).
1307+
Return(false, nil)
1308+
1309+
mockBlockState.EXPECT().
1310+
BestBlockHeader().
1311+
Return(&types.Header{Number: 1}, nil)
1312+
1313+
mockDisjointBlockSet := NewMockDisjointBlockSet(ctrl)
1314+
mockDisjointBlockSet.EXPECT().
1315+
addHeader(expectedHeader).
1316+
Return(nil)
1317+
1318+
mockDisjointBlockSet.EXPECT().
1319+
addHashAndNumber(argumentHeaderHash, uint(2)).
1320+
Return(nil)
1321+
1322+
return chainSync{
1323+
blockState: mockBlockState,
1324+
pendingBlocks: mockDisjointBlockSet,
1325+
peerState: make(map[peer.ID]*peerState),
1326+
// creating an buffered channel for this specific test
1327+
// since it will put a work on the queue and an unbufered channel
1328+
// will hang until we read on this channel and the goal is to
1329+
// put the work on the channel and don't block
1330+
workQueue: make(chan *peerState, 1),
1331+
}
1332+
},
1333+
},
12741334
}
12751335
for name, tt := range tests {
1336+
tt := tt
12761337
t.Run(name, func(t *testing.T) {
1338+
t.Parallel()
12771339
ctrl := gomock.NewController(t)
1278-
sync := tt.chainSyncBuilder(ctrl)
1340+
sync := tt.chainSyncBuilder(tt.args.header, ctrl)
12791341
err := sync.setBlockAnnounce(tt.args.from, tt.args.header)
1280-
assert.ErrorIs(t, err, tt.wantErr)
1342+
if tt.wantErr != nil {
1343+
assert.EqualError(t, err, tt.wantErr.Error())
1344+
} else {
1345+
assert.NoError(t, err)
1346+
}
1347+
1348+
if sync.workQueue != nil {
1349+
assert.Equal(t, len(sync.workQueue), 1)
1350+
}
12811351
})
12821352
}
12831353
}

0 commit comments

Comments
 (0)