Skip to content

Commit 384233a

Browse files
committed
[version] Restart services on chain or protocol change
1 parent da3d855 commit 384233a

File tree

9 files changed

+348
-240
lines changed

9 files changed

+348
-240
lines changed

nil/common/concurrent/utils.go

+25
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package concurrent
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"runtime/debug"
78
"sync"
@@ -17,6 +18,9 @@ const (
1718
RootContextNameLabel LabelContextKey = "rootContextName"
1819
)
1920

21+
// ErrStopIteration indicates that the iteration should stop without producing an error.
22+
var ErrStopIteration = errors.New("stop iteration")
23+
2024
type Func = func(context.Context) error
2125

2226
type Task struct {
@@ -134,6 +138,27 @@ func RunTickerLoop(ctx context.Context, interval time.Duration, onTick func(cont
134138
}
135139
}
136140

141+
// RunTickerLoopWithErr runs a loop that executes a function at regular intervals
142+
// and returns an error if the function returns one.
143+
func RunTickerLoopWithErr(ctx context.Context, interval time.Duration, onTick func(context.Context) error) error {
144+
ticker := time.NewTicker(interval)
145+
defer ticker.Stop()
146+
147+
for {
148+
select {
149+
case <-ticker.C:
150+
if err := onTick(ctx); err != nil {
151+
if errors.Is(err, ErrStopIteration) {
152+
return nil
153+
}
154+
return err
155+
}
156+
case <-ctx.Done():
157+
return nil
158+
}
159+
}
160+
}
161+
137162
func RunWithRetries[T any](ctx context.Context, interval time.Duration, retries int, f func() (T, error)) (T, error) {
138163
timer := time.NewTimer(interval)
139164
defer timer.Stop()

nil/internal/collate/bootstrap.go

-46
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,12 @@ package collate
22

33
import (
44
"context"
5-
"fmt"
65

7-
"github.com/NilFoundation/nil/nil/common"
8-
"github.com/NilFoundation/nil/nil/common/check"
96
"github.com/NilFoundation/nil/nil/common/logging"
107
"github.com/NilFoundation/nil/nil/internal/db"
118
"github.com/NilFoundation/nil/nil/internal/network"
12-
"github.com/NilFoundation/nil/nil/internal/types"
139
)
1410

15-
const topicVersion = "/nil/version"
16-
1711
func topicBootstrapShard() network.ProtocolID {
1812
return "/nil/snap"
1913
}
@@ -53,32 +47,6 @@ func SetBootstrapHandler(ctx context.Context, nm network.Manager, db db.DB) {
5347
logger.Info().Msg("Enabled bootstrap endpoint")
5448
}
5549

56-
func SetVersionHandler(ctx context.Context, nm network.Manager, fabric db.DB) error {
57-
tx, err := fabric.CreateRoTx(ctx)
58-
if err != nil {
59-
return fmt.Errorf("failed to create transaction: %w", err)
60-
}
61-
defer tx.Rollback()
62-
63-
// The genesis block must have been initialized before this method is called.
64-
version, err := db.ReadBlockHashByNumber(tx, types.MainShardId, 0)
65-
if err != nil {
66-
return fmt.Errorf("failed to read genesis block hash: %w", err)
67-
}
68-
check.PanicIfNot(!version.Empty())
69-
70-
resp, err := version.MarshalSSZ()
71-
if err != nil {
72-
return fmt.Errorf("failed to marshal genesis block hash: %w", err)
73-
}
74-
75-
nm.SetRequestHandler(ctx, topicVersion, func(ctx context.Context, _ []byte) ([]byte, error) {
76-
return resp, nil
77-
})
78-
79-
return nil
80-
}
81-
8250
func fetchShardSnap(
8351
ctx context.Context,
8452
nm network.Manager,
@@ -119,17 +87,3 @@ func fetchSnapshot(
11987
}
12088
return fetchShardSnap(ctx, nm, peerId, db, logger)
12189
}
122-
123-
func fetchGenesisBlockHash(ctx context.Context, nm network.Manager, peerId network.PeerID) (common.Hash, error) {
124-
resp, err := nm.SendRequestAndGetResponse(ctx, peerId, topicVersion, nil)
125-
if err != nil {
126-
return common.EmptyHash, fmt.Errorf("failed to fetch genesis block hash: %w", err)
127-
}
128-
129-
var res common.Hash
130-
if err := res.UnmarshalSSZ(resp); err != nil {
131-
return common.EmptyHash, fmt.Errorf("failed to unmarshal genesis block hash: %w", err)
132-
}
133-
134-
return res, nil
135-
}

nil/internal/collate/syncer.go

+53-57
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import (
77
"sync"
88
"time"
99

10-
"github.com/NilFoundation/nil/nil/common"
1110
"github.com/NilFoundation/nil/nil/common/assert"
1211
"github.com/NilFoundation/nil/nil/common/check"
12+
"github.com/NilFoundation/nil/nil/common/concurrent"
1313
"github.com/NilFoundation/nil/nil/common/logging"
1414
cerrors "github.com/NilFoundation/nil/nil/internal/collate/errors"
1515
"github.com/NilFoundation/nil/nil/internal/db"
@@ -102,54 +102,6 @@ func (s *Syncer) WaitComplete(ctx context.Context) error {
102102
}
103103
}
104104

105-
func (s *Syncer) getLocalVersion(ctx context.Context) (*NodeVersion, error) {
106-
protocolVersion := s.networkManager.ProtocolVersion()
107-
108-
rotx, err := s.db.CreateRoTx(ctx)
109-
if err != nil {
110-
return nil, err
111-
}
112-
defer rotx.Rollback()
113-
114-
res, err := db.ReadBlockHashByNumber(rotx, types.MainShardId, 0)
115-
if err != nil {
116-
if errors.Is(err, db.ErrKeyNotFound) {
117-
return &NodeVersion{protocolVersion, common.EmptyHash}, nil
118-
}
119-
return nil, err
120-
}
121-
return &NodeVersion{protocolVersion, res}, err
122-
}
123-
124-
type NodeVersion struct {
125-
ProtocolVersion string
126-
GenesisBlockHash common.Hash
127-
}
128-
129-
func (s *Syncer) fetchRemoteVersion(ctx context.Context) (NodeVersion, error) {
130-
var err error
131-
for _, peer := range s.config.BootstrapPeers {
132-
var peerId network.PeerID
133-
peerId, err = s.networkManager.Connect(ctx, network.AddrInfo(peer))
134-
if err != nil {
135-
continue
136-
}
137-
138-
var protocolVersion string
139-
protocolVersion, err = s.networkManager.GetPeerProtocolVersion(peerId)
140-
if err != nil {
141-
continue
142-
}
143-
144-
var res common.Hash
145-
res, err = fetchGenesisBlockHash(ctx, s.networkManager, peerId)
146-
if err == nil {
147-
return NodeVersion{protocolVersion, res}, nil
148-
}
149-
}
150-
return NodeVersion{}, fmt.Errorf("failed to fetch version from all peers; last error: %w", err)
151-
}
152-
153105
func (s *Syncer) fetchSnapshot(ctx context.Context) error {
154106
var err error
155107
for _, peer := range s.config.BootstrapPeers {
@@ -171,12 +123,12 @@ func (s *Syncer) Init(ctx context.Context, allowDbDrop bool) error {
171123
return nil
172124
}
173125

174-
version, err := s.getLocalVersion(ctx)
126+
version, err := GetLocalVersion(ctx, s.networkManager, s.db)
175127
if err != nil {
176128
return err
177129
}
178130

179-
remoteVersion, err := s.fetchRemoteVersion(ctx)
131+
remoteVersion, err := FetchRemoteVersion(ctx, s.networkManager, s.config.BootstrapPeers)
180132
if err != nil {
181133
// Nodes with allowDbDrop are supposed to be secondary, so they must sync with some reliable peer.
182134
// We need some nodes to start without fetching a remote version
@@ -220,7 +172,7 @@ func (s *Syncer) Init(ctx context.Context, allowDbDrop bool) error {
220172
// SetHandlers sets the handlers for generic (shard-independent) protocols.
221173
// It must be called after the initial sync of ALL shards is completed.
222174
// It should be called once, e.g., by the main shard syncer.
223-
// (Subsequent calls do not have any side effects, but might return an error.)
175+
// (Subsequent calls do not have any side effects but might return an error.)
224176
func (s *Syncer) SetHandlers(ctx context.Context) error {
225177
if s.networkManager == nil {
226178
return nil
@@ -236,6 +188,9 @@ func (s *Syncer) Run(ctx context.Context) error {
236188
return nil
237189
}
238190

191+
ctx, cancel := context.WithCancel(ctx)
192+
defer cancel()
193+
239194
if s.config.ShardId.IsMainShard() {
240195
if err := SetVersionHandler(ctx, s.networkManager, s.db); err != nil {
241196
return fmt.Errorf("failed to set version handler: %w", err)
@@ -249,24 +204,62 @@ func (s *Syncer) Run(ctx context.Context) error {
249204
s.fetchBlocks(ctx)
250205
s.waitForSync.Done()
251206

252-
s.logger.Info().Msg("Syncer initialization complete")
253-
254207
if ctx.Err() != nil {
255208
return nil
256209
}
257210

211+
s.logger.Info().Msg("Running syncer...")
212+
258213
sub, err := s.networkManager.PubSub().Subscribe(s.topic)
259214
if err != nil {
260215
return fmt.Errorf("failed to subscribe to %s: %w", s.topic, err)
261216
}
262217
defer sub.Close()
263218

219+
go s.doFetch(ctx, sub)
220+
221+
if len(s.config.BootstrapPeers) == 0 {
222+
s.logger.Info().Msg("No bootstrap peers. Skipping version check")
223+
224+
<-ctx.Done()
225+
return nil
226+
}
227+
228+
return s.runVersionCheckLoop(ctx)
229+
}
230+
231+
func (s *Syncer) runVersionCheckLoop(ctx context.Context) error {
232+
version, err := GetLocalVersion(ctx, s.networkManager, s.db)
233+
if err != nil {
234+
return err
235+
}
236+
237+
return concurrent.RunTickerLoopWithErr(ctx, 10*time.Second, func(ctx context.Context) error {
238+
remoteVersion, err := FetchRemoteVersion(ctx, s.networkManager, s.config.BootstrapPeers)
239+
if err != nil {
240+
s.logger.Warn().Err(err).Msg("Failed to fetch remote version")
241+
return nil
242+
}
243+
244+
if version.ProtocolVersion != remoteVersion.ProtocolVersion {
245+
return &ProtocolVersionMismatchError{
246+
version.ProtocolVersion,
247+
remoteVersion.ProtocolVersion,
248+
}
249+
}
250+
251+
if version.GenesisBlockHash != remoteVersion.GenesisBlockHash {
252+
return fmt.Errorf("local version is outdated; local: %s, remote: %s", version, remoteVersion)
253+
}
254+
255+
return nil
256+
})
257+
}
258+
259+
func (s *Syncer) doFetch(ctx context.Context, sub *network.Subscription) {
264260
ch := sub.Start(ctx, true)
265261
for {
266262
select {
267-
case <-ctx.Done():
268-
s.logger.Debug().Msg("Syncer is terminated")
269-
return nil
270263
case msg := <-ch:
271264
saved, err := s.processTopicTransaction(ctx, msg.Data)
272265
if err != nil {
@@ -287,6 +280,9 @@ func (s *Syncer) Run(ctx context.Context) error {
287280
s.logger.Warn().Msgf("No new block in the topic for %s, pulling blocks actively", s.config.Timeout)
288281

289282
s.fetchBlocks(ctx)
283+
case <-ctx.Done():
284+
s.logger.Info().Msg("Syncer is terminated")
285+
return
290286
}
291287
}
292288
}

nil/internal/collate/version.go

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package collate
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"github.com/NilFoundation/nil/nil/common"
9+
"github.com/NilFoundation/nil/nil/common/check"
10+
"github.com/NilFoundation/nil/nil/internal/db"
11+
"github.com/NilFoundation/nil/nil/internal/network"
12+
"github.com/NilFoundation/nil/nil/internal/types"
13+
)
14+
15+
const topicVersion = "/nil/version"
16+
17+
type NodeVersion struct {
18+
ProtocolVersion string
19+
GenesisBlockHash common.Hash
20+
}
21+
22+
func SetVersionHandler(ctx context.Context, nm network.Manager, fabric db.DB) error {
23+
tx, err := fabric.CreateRoTx(ctx)
24+
if err != nil {
25+
return fmt.Errorf("failed to create transaction: %w", err)
26+
}
27+
defer tx.Rollback()
28+
29+
// The genesis block must have been initialized before this method is called.
30+
version, err := db.ReadBlockHashByNumber(tx, types.MainShardId, 0)
31+
if err != nil {
32+
return fmt.Errorf("failed to read genesis block hash: %w", err)
33+
}
34+
check.PanicIfNot(!version.Empty())
35+
36+
resp, err := version.MarshalSSZ()
37+
if err != nil {
38+
return fmt.Errorf("failed to marshal genesis block hash: %w", err)
39+
}
40+
41+
nm.SetRequestHandler(ctx, topicVersion, func(ctx context.Context, _ []byte) ([]byte, error) {
42+
return resp, nil
43+
})
44+
45+
return nil
46+
}
47+
48+
func GetLocalVersion(ctx context.Context, nm network.Manager, fabric db.DB) (NodeVersion, error) {
49+
protocolVersion := nm.ProtocolVersion()
50+
51+
tx, err := fabric.CreateRoTx(ctx)
52+
if err != nil {
53+
return NodeVersion{}, err
54+
}
55+
defer tx.Rollback()
56+
57+
res, err := db.ReadBlockHashByNumber(tx, types.MainShardId, 0)
58+
if err != nil {
59+
if errors.Is(err, db.ErrKeyNotFound) {
60+
return NodeVersion{protocolVersion, common.EmptyHash}, nil
61+
}
62+
return NodeVersion{}, err
63+
}
64+
return NodeVersion{protocolVersion, res}, err
65+
}
66+
67+
func FetchRemoteVersion(ctx context.Context, nm network.Manager, peers network.AddrInfoSlice) (NodeVersion, error) {
68+
var err error
69+
for _, peer := range peers {
70+
var peerId network.PeerID
71+
peerId, err = nm.Connect(ctx, network.AddrInfo(peer))
72+
if err != nil {
73+
continue
74+
}
75+
76+
var protocolVersion string
77+
protocolVersion, err = nm.GetPeerProtocolVersion(peerId)
78+
if err != nil {
79+
continue
80+
}
81+
82+
var res common.Hash
83+
res, err = fetchGenesisBlockHash(ctx, nm, peerId)
84+
if err == nil {
85+
return NodeVersion{protocolVersion, res}, nil
86+
}
87+
}
88+
return NodeVersion{}, fmt.Errorf("failed to fetch version from all peers; last error: %w", err)
89+
}
90+
91+
func fetchGenesisBlockHash(ctx context.Context, nm network.Manager, peerId network.PeerID) (common.Hash, error) {
92+
resp, err := nm.SendRequestAndGetResponse(ctx, peerId, topicVersion, nil)
93+
if err != nil {
94+
return common.EmptyHash, fmt.Errorf("failed to fetch genesis block hash: %w", err)
95+
}
96+
97+
var res common.Hash
98+
if err := res.UnmarshalSSZ(resp); err != nil {
99+
return common.EmptyHash, fmt.Errorf("failed to unmarshal genesis block hash: %w", err)
100+
}
101+
102+
return res, nil
103+
}

0 commit comments

Comments
 (0)