Skip to content

Commit bfcaa0b

Browse files
committed
[version] Restart services on chain or protocol change
1 parent da3d855 commit bfcaa0b

File tree

9 files changed

+323
-228
lines changed

9 files changed

+323
-228
lines changed

nil/common/concurrent/utils.go

+18
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,24 @@ func RunTickerLoop(ctx context.Context, interval time.Duration, onTick func(cont
134134
}
135135
}
136136

137+
// RunTickerLoopWithErr runs a loop that executes a function at regular intervals
138+
// and returns an error if the function returns one.
139+
func RunTickerLoopWithErr(ctx context.Context, interval time.Duration, onTick func(context.Context) error) error {
140+
ticker := time.NewTicker(interval)
141+
defer ticker.Stop()
142+
143+
for {
144+
select {
145+
case <-ticker.C:
146+
if err := onTick(ctx); err != nil {
147+
return err
148+
}
149+
case <-ctx.Done():
150+
return nil
151+
}
152+
}
153+
}
154+
137155
func RunWithRetries[T any](ctx context.Context, interval time.Duration, retries int, f func() (T, error)) (T, error) {
138156
timer := time.NewTimer(interval)
139157
defer timer.Stop()

nil/internal/collate/bootstrap.go

-46
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,12 @@ package collate
22

33
import (
44
"context"
5-
"fmt"
65

7-
"github.com/NilFoundation/nil/nil/common"
8-
"github.com/NilFoundation/nil/nil/common/check"
96
"github.com/NilFoundation/nil/nil/common/logging"
107
"github.com/NilFoundation/nil/nil/internal/db"
118
"github.com/NilFoundation/nil/nil/internal/network"
12-
"github.com/NilFoundation/nil/nil/internal/types"
139
)
1410

15-
const topicVersion = "/nil/version"
16-
1711
func topicBootstrapShard() network.ProtocolID {
1812
return "/nil/snap"
1913
}
@@ -53,32 +47,6 @@ func SetBootstrapHandler(ctx context.Context, nm network.Manager, db db.DB) {
5347
logger.Info().Msg("Enabled bootstrap endpoint")
5448
}
5549

56-
func SetVersionHandler(ctx context.Context, nm network.Manager, fabric db.DB) error {
57-
tx, err := fabric.CreateRoTx(ctx)
58-
if err != nil {
59-
return fmt.Errorf("failed to create transaction: %w", err)
60-
}
61-
defer tx.Rollback()
62-
63-
// The genesis block must have been initialized before this method is called.
64-
version, err := db.ReadBlockHashByNumber(tx, types.MainShardId, 0)
65-
if err != nil {
66-
return fmt.Errorf("failed to read genesis block hash: %w", err)
67-
}
68-
check.PanicIfNot(!version.Empty())
69-
70-
resp, err := version.MarshalSSZ()
71-
if err != nil {
72-
return fmt.Errorf("failed to marshal genesis block hash: %w", err)
73-
}
74-
75-
nm.SetRequestHandler(ctx, topicVersion, func(ctx context.Context, _ []byte) ([]byte, error) {
76-
return resp, nil
77-
})
78-
79-
return nil
80-
}
81-
8250
func fetchShardSnap(
8351
ctx context.Context,
8452
nm network.Manager,
@@ -119,17 +87,3 @@ func fetchSnapshot(
11987
}
12088
return fetchShardSnap(ctx, nm, peerId, db, logger)
12189
}
122-
123-
func fetchGenesisBlockHash(ctx context.Context, nm network.Manager, peerId network.PeerID) (common.Hash, error) {
124-
resp, err := nm.SendRequestAndGetResponse(ctx, peerId, topicVersion, nil)
125-
if err != nil {
126-
return common.EmptyHash, fmt.Errorf("failed to fetch genesis block hash: %w", err)
127-
}
128-
129-
var res common.Hash
130-
if err := res.UnmarshalSSZ(resp); err != nil {
131-
return common.EmptyHash, fmt.Errorf("failed to unmarshal genesis block hash: %w", err)
132-
}
133-
134-
return res, nil
135-
}

nil/internal/collate/syncer.go

+51-57
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import (
77
"sync"
88
"time"
99

10-
"github.com/NilFoundation/nil/nil/common"
1110
"github.com/NilFoundation/nil/nil/common/assert"
1211
"github.com/NilFoundation/nil/nil/common/check"
12+
"github.com/NilFoundation/nil/nil/common/concurrent"
1313
"github.com/NilFoundation/nil/nil/common/logging"
1414
cerrors "github.com/NilFoundation/nil/nil/internal/collate/errors"
1515
"github.com/NilFoundation/nil/nil/internal/db"
@@ -102,54 +102,6 @@ func (s *Syncer) WaitComplete(ctx context.Context) error {
102102
}
103103
}
104104

105-
func (s *Syncer) getLocalVersion(ctx context.Context) (*NodeVersion, error) {
106-
protocolVersion := s.networkManager.ProtocolVersion()
107-
108-
rotx, err := s.db.CreateRoTx(ctx)
109-
if err != nil {
110-
return nil, err
111-
}
112-
defer rotx.Rollback()
113-
114-
res, err := db.ReadBlockHashByNumber(rotx, types.MainShardId, 0)
115-
if err != nil {
116-
if errors.Is(err, db.ErrKeyNotFound) {
117-
return &NodeVersion{protocolVersion, common.EmptyHash}, nil
118-
}
119-
return nil, err
120-
}
121-
return &NodeVersion{protocolVersion, res}, err
122-
}
123-
124-
type NodeVersion struct {
125-
ProtocolVersion string
126-
GenesisBlockHash common.Hash
127-
}
128-
129-
func (s *Syncer) fetchRemoteVersion(ctx context.Context) (NodeVersion, error) {
130-
var err error
131-
for _, peer := range s.config.BootstrapPeers {
132-
var peerId network.PeerID
133-
peerId, err = s.networkManager.Connect(ctx, network.AddrInfo(peer))
134-
if err != nil {
135-
continue
136-
}
137-
138-
var protocolVersion string
139-
protocolVersion, err = s.networkManager.GetPeerProtocolVersion(peerId)
140-
if err != nil {
141-
continue
142-
}
143-
144-
var res common.Hash
145-
res, err = fetchGenesisBlockHash(ctx, s.networkManager, peerId)
146-
if err == nil {
147-
return NodeVersion{protocolVersion, res}, nil
148-
}
149-
}
150-
return NodeVersion{}, fmt.Errorf("failed to fetch version from all peers; last error: %w", err)
151-
}
152-
153105
func (s *Syncer) fetchSnapshot(ctx context.Context) error {
154106
var err error
155107
for _, peer := range s.config.BootstrapPeers {
@@ -171,12 +123,12 @@ func (s *Syncer) Init(ctx context.Context, allowDbDrop bool) error {
171123
return nil
172124
}
173125

174-
version, err := s.getLocalVersion(ctx)
126+
version, err := GetLocalVersion(ctx, s.networkManager, s.db)
175127
if err != nil {
176128
return err
177129
}
178130

179-
remoteVersion, err := s.fetchRemoteVersion(ctx)
131+
remoteVersion, err := FetchRemoteVersion(ctx, s.networkManager, s.config.BootstrapPeers)
180132
if err != nil {
181133
// Nodes with allowDbDrop are supposed to be secondary, so they must sync with some reliable peer.
182134
// We need some nodes to start without fetching a remote version
@@ -220,7 +172,7 @@ func (s *Syncer) Init(ctx context.Context, allowDbDrop bool) error {
220172
// SetHandlers sets the handlers for generic (shard-independent) protocols.
221173
// It must be called after the initial sync of ALL shards is completed.
222174
// It should be called once, e.g., by the main shard syncer.
223-
// (Subsequent calls do not have any side effects, but might return an error.)
175+
// (Subsequent calls do not have any side effects but might return an error.)
224176
func (s *Syncer) SetHandlers(ctx context.Context) error {
225177
if s.networkManager == nil {
226178
return nil
@@ -236,6 +188,9 @@ func (s *Syncer) Run(ctx context.Context) error {
236188
return nil
237189
}
238190

191+
ctx, cancel := context.WithCancel(ctx)
192+
defer cancel()
193+
239194
if s.config.ShardId.IsMainShard() {
240195
if err := SetVersionHandler(ctx, s.networkManager, s.db); err != nil {
241196
return fmt.Errorf("failed to set version handler: %w", err)
@@ -249,24 +204,60 @@ func (s *Syncer) Run(ctx context.Context) error {
249204
s.fetchBlocks(ctx)
250205
s.waitForSync.Done()
251206

252-
s.logger.Info().Msg("Syncer initialization complete")
253-
254207
if ctx.Err() != nil {
255208
return nil
256209
}
257210

211+
s.logger.Info().Msg("Running syncer...")
212+
258213
sub, err := s.networkManager.PubSub().Subscribe(s.topic)
259214
if err != nil {
260215
return fmt.Errorf("failed to subscribe to %s: %w", s.topic, err)
261216
}
262217
defer sub.Close()
263218

219+
go s.doFetch(ctx, sub)
220+
221+
return s.runVersionCheckLoop(ctx)
222+
}
223+
224+
func (s *Syncer) runVersionCheckLoop(ctx context.Context) error {
225+
if len(s.config.BootstrapPeers) == 0 {
226+
s.logger.Info().Msg("No bootstrap peers. Skipping version check")
227+
return nil
228+
}
229+
230+
version, err := GetLocalVersion(ctx, s.networkManager, s.db)
231+
if err != nil {
232+
return err
233+
}
234+
235+
return concurrent.RunTickerLoopWithErr(ctx, 10*time.Second, func(ctx context.Context) error {
236+
remoteVersion, err := FetchRemoteVersion(ctx, s.networkManager, s.config.BootstrapPeers)
237+
if err != nil {
238+
s.logger.Warn().Err(err).Msg("Failed to fetch remote version")
239+
return nil
240+
}
241+
242+
if version.ProtocolVersion != remoteVersion.ProtocolVersion {
243+
return &ProtocolVersionMismatchError{
244+
version.ProtocolVersion,
245+
remoteVersion.ProtocolVersion,
246+
}
247+
}
248+
249+
if version.GenesisBlockHash != remoteVersion.GenesisBlockHash {
250+
return fmt.Errorf("local version is outdated; local: %s, remote: %s", version, remoteVersion)
251+
}
252+
253+
return nil
254+
})
255+
}
256+
257+
func (s *Syncer) doFetch(ctx context.Context, sub *network.Subscription) {
264258
ch := sub.Start(ctx, true)
265259
for {
266260
select {
267-
case <-ctx.Done():
268-
s.logger.Debug().Msg("Syncer is terminated")
269-
return nil
270261
case msg := <-ch:
271262
saved, err := s.processTopicTransaction(ctx, msg.Data)
272263
if err != nil {
@@ -287,6 +278,9 @@ func (s *Syncer) Run(ctx context.Context) error {
287278
s.logger.Warn().Msgf("No new block in the topic for %s, pulling blocks actively", s.config.Timeout)
288279

289280
s.fetchBlocks(ctx)
281+
case <-ctx.Done():
282+
s.logger.Info().Msg("Syncer is terminated")
283+
return
290284
}
291285
}
292286
}

nil/internal/collate/version.go

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package collate
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"github.com/NilFoundation/nil/nil/common"
9+
"github.com/NilFoundation/nil/nil/common/check"
10+
"github.com/NilFoundation/nil/nil/internal/db"
11+
"github.com/NilFoundation/nil/nil/internal/network"
12+
"github.com/NilFoundation/nil/nil/internal/types"
13+
)
14+
15+
const topicVersion = "/nil/version"
16+
17+
type NodeVersion struct {
18+
ProtocolVersion string
19+
GenesisBlockHash common.Hash
20+
}
21+
22+
func SetVersionHandler(ctx context.Context, nm network.Manager, fabric db.DB) error {
23+
tx, err := fabric.CreateRoTx(ctx)
24+
if err != nil {
25+
return fmt.Errorf("failed to create transaction: %w", err)
26+
}
27+
defer tx.Rollback()
28+
29+
// The genesis block must have been initialized before this method is called.
30+
version, err := db.ReadBlockHashByNumber(tx, types.MainShardId, 0)
31+
if err != nil {
32+
return fmt.Errorf("failed to read genesis block hash: %w", err)
33+
}
34+
check.PanicIfNot(!version.Empty())
35+
36+
resp, err := version.MarshalSSZ()
37+
if err != nil {
38+
return fmt.Errorf("failed to marshal genesis block hash: %w", err)
39+
}
40+
41+
nm.SetRequestHandler(ctx, topicVersion, func(ctx context.Context, _ []byte) ([]byte, error) {
42+
return resp, nil
43+
})
44+
45+
return nil
46+
}
47+
48+
func GetLocalVersion(ctx context.Context, nm network.Manager, fabric db.DB) (NodeVersion, error) {
49+
protocolVersion := nm.ProtocolVersion()
50+
51+
tx, err := fabric.CreateRoTx(ctx)
52+
if err != nil {
53+
return NodeVersion{}, err
54+
}
55+
defer tx.Rollback()
56+
57+
res, err := db.ReadBlockHashByNumber(tx, types.MainShardId, 0)
58+
if err != nil {
59+
if errors.Is(err, db.ErrKeyNotFound) {
60+
return NodeVersion{protocolVersion, common.EmptyHash}, nil
61+
}
62+
return NodeVersion{}, err
63+
}
64+
return NodeVersion{protocolVersion, res}, err
65+
}
66+
67+
func FetchRemoteVersion(ctx context.Context, nm network.Manager, peers network.AddrInfoSlice) (NodeVersion, error) {
68+
var err error
69+
for _, peer := range peers {
70+
var peerId network.PeerID
71+
peerId, err = nm.Connect(ctx, network.AddrInfo(peer))
72+
if err != nil {
73+
continue
74+
}
75+
76+
var protocolVersion string
77+
protocolVersion, err = nm.GetPeerProtocolVersion(peerId)
78+
if err != nil {
79+
continue
80+
}
81+
82+
var res common.Hash
83+
res, err = fetchGenesisBlockHash(ctx, nm, peerId)
84+
if err == nil {
85+
return NodeVersion{protocolVersion, res}, nil
86+
}
87+
}
88+
return NodeVersion{}, fmt.Errorf("failed to fetch version from all peers; last error: %w", err)
89+
}
90+
91+
func fetchGenesisBlockHash(ctx context.Context, nm network.Manager, peerId network.PeerID) (common.Hash, error) {
92+
resp, err := nm.SendRequestAndGetResponse(ctx, peerId, topicVersion, nil)
93+
if err != nil {
94+
return common.EmptyHash, fmt.Errorf("failed to fetch genesis block hash: %w", err)
95+
}
96+
97+
var res common.Hash
98+
if err := res.UnmarshalSSZ(resp); err != nil {
99+
return common.EmptyHash, fmt.Errorf("failed to unmarshal genesis block hash: %w", err)
100+
}
101+
102+
return res, nil
103+
}

0 commit comments

Comments
 (0)