Skip to content

[version] Restart services on chain or protocol change #884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions nil/common/concurrent/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package concurrent

import (
"context"
"errors"
"fmt"
"runtime/debug"
"sync"
Expand All @@ -17,6 +18,9 @@ const (
RootContextNameLabel LabelContextKey = "rootContextName"
)

// ErrStopIteration indicates that the iteration should stop without producing an error.
var ErrStopIteration = errors.New("stop iteration")

type Func = func(context.Context) error

type Task struct {
Expand Down Expand Up @@ -134,6 +138,27 @@ func RunTickerLoop(ctx context.Context, interval time.Duration, onTick func(cont
}
}

// RunTickerLoopWithErr runs a loop that executes a function at regular intervals
// and returns an error if the function returns one.
func RunTickerLoopWithErr(ctx context.Context, interval time.Duration, onTick func(context.Context) error) error {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := onTick(ctx); err != nil {
if errors.Is(err, ErrStopIteration) {
return nil
}
return err
}
case <-ctx.Done():
return nil
}
}
}

func RunWithRetries[T any](ctx context.Context, interval time.Duration, retries int, f func() (T, error)) (T, error) {
timer := time.NewTimer(interval)
defer timer.Stop()
Expand Down
46 changes: 0 additions & 46 deletions nil/internal/collate/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@ package collate

import (
"context"
"fmt"

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/check"
"github.com/NilFoundation/nil/nil/common/logging"
"github.com/NilFoundation/nil/nil/internal/db"
"github.com/NilFoundation/nil/nil/internal/network"
"github.com/NilFoundation/nil/nil/internal/types"
)

const topicVersion = "/nil/version"

func topicBootstrapShard() network.ProtocolID {
return "/nil/snap"
}
Expand Down Expand Up @@ -53,32 +47,6 @@ func SetBootstrapHandler(ctx context.Context, nm network.Manager, db db.DB) {
logger.Info().Msg("Enabled bootstrap endpoint")
}

func SetVersionHandler(ctx context.Context, nm network.Manager, fabric db.DB) error {
tx, err := fabric.CreateRoTx(ctx)
if err != nil {
return fmt.Errorf("failed to create transaction: %w", err)
}
defer tx.Rollback()

// The genesis block must have been initialized before this method is called.
version, err := db.ReadBlockHashByNumber(tx, types.MainShardId, 0)
if err != nil {
return fmt.Errorf("failed to read genesis block hash: %w", err)
}
check.PanicIfNot(!version.Empty())

resp, err := version.MarshalNil()
if err != nil {
return fmt.Errorf("failed to marshal genesis block hash: %w", err)
}

nm.SetRequestHandler(ctx, topicVersion, func(ctx context.Context, _ []byte) ([]byte, error) {
return resp, nil
})

return nil
}

func fetchShardSnap(
ctx context.Context,
nm network.Manager,
Expand Down Expand Up @@ -119,17 +87,3 @@ func fetchSnapshot(
}
return fetchShardSnap(ctx, nm, peerId, db, logger)
}

func fetchGenesisBlockHash(ctx context.Context, nm network.Manager, peerId network.PeerID) (common.Hash, error) {
resp, err := nm.SendRequestAndGetResponse(ctx, peerId, topicVersion, nil)
if err != nil {
return common.EmptyHash, fmt.Errorf("failed to fetch genesis block hash: %w", err)
}

var res common.Hash
if err := res.UnmarshalNil(resp); err != nil {
return common.EmptyHash, fmt.Errorf("failed to unmarshal genesis block hash: %w", err)
}

return res, nil
}
119 changes: 59 additions & 60 deletions nil/internal/collate/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"sync"
"time"

"github.com/NilFoundation/nil/nil/common"
"github.com/NilFoundation/nil/nil/common/assert"
"github.com/NilFoundation/nil/nil/common/check"
"github.com/NilFoundation/nil/nil/common/concurrent"
"github.com/NilFoundation/nil/nil/common/logging"
cerrors "github.com/NilFoundation/nil/nil/internal/collate/errors"
"github.com/NilFoundation/nil/nil/internal/db"
Expand Down Expand Up @@ -56,7 +56,8 @@ type Syncer struct {

waitForSync *sync.WaitGroup

validator *Validator
validator *Validator
versionChecker *VersionChecker
}

func NewSyncer(cfg *SyncerConfig, validator *Validator, db db.DB, networkManager network.Manager) (*Syncer, error) {
Expand All @@ -68,15 +69,17 @@ func NewSyncer(cfg *SyncerConfig, validator *Validator, db db.DB, networkManager
if networkManager != nil {
loggerCtx = loggerCtx.Stringer(logging.FieldP2PIdentity, networkManager.ID())
}
logger := loggerCtx.Logger()

return &Syncer{
config: cfg,
topic: topicShardBlocks(cfg.ShardId),
db: db,
networkManager: networkManager,
logger: loggerCtx.Logger(),
logger: logger,
waitForSync: &waitForSync,
validator: validator,
versionChecker: NewVersionChecker(networkManager, db, logger),
}, nil
}

Expand All @@ -102,54 +105,6 @@ func (s *Syncer) WaitComplete(ctx context.Context) error {
}
}

func (s *Syncer) getLocalVersion(ctx context.Context) (*NodeVersion, error) {
protocolVersion := s.networkManager.ProtocolVersion()

rotx, err := s.db.CreateRoTx(ctx)
if err != nil {
return nil, err
}
defer rotx.Rollback()

res, err := db.ReadBlockHashByNumber(rotx, types.MainShardId, 0)
if err != nil {
if errors.Is(err, db.ErrKeyNotFound) {
return &NodeVersion{protocolVersion, common.EmptyHash}, nil
}
return nil, err
}
return &NodeVersion{protocolVersion, res}, err
}

type NodeVersion struct {
ProtocolVersion string
GenesisBlockHash common.Hash
}

func (s *Syncer) fetchRemoteVersion(ctx context.Context) (NodeVersion, error) {
var err error
for _, peer := range s.config.BootstrapPeers {
var peerId network.PeerID
peerId, err = s.networkManager.Connect(ctx, network.AddrInfo(peer))
if err != nil {
continue
}

var protocolVersion string
protocolVersion, err = s.networkManager.GetPeerProtocolVersion(peerId)
if err != nil {
continue
}

var res common.Hash
res, err = fetchGenesisBlockHash(ctx, s.networkManager, peerId)
if err == nil {
return NodeVersion{protocolVersion, res}, nil
}
}
return NodeVersion{}, fmt.Errorf("failed to fetch version from all peers; last error: %w", err)
}

func (s *Syncer) fetchSnapshot(ctx context.Context) error {
var err error
for _, peer := range s.config.BootstrapPeers {
Expand All @@ -171,12 +126,12 @@ func (s *Syncer) Init(ctx context.Context, allowDbDrop bool) error {
return nil
}

version, err := s.getLocalVersion(ctx)
version, err := s.versionChecker.GetLocalVersion(ctx)
if err != nil {
return err
}

remoteVersion, err := s.fetchRemoteVersion(ctx)
remoteVersion, err := s.versionChecker.FetchRemoteVersion(ctx, s.config.BootstrapPeers)
if err != nil {
// Nodes with allowDbDrop are supposed to be secondary, so they must sync with some reliable peer.
// We need some nodes to start without fetching a remote version
Expand Down Expand Up @@ -220,7 +175,7 @@ func (s *Syncer) Init(ctx context.Context, allowDbDrop bool) error {
// SetHandlers sets the handlers for generic (shard-independent) protocols.
// It must be called after the initial sync of ALL shards is completed.
// It should be called once, e.g., by the main shard syncer.
// (Subsequent calls do not have any side effects, but might return an error.)
// (Subsequent calls do not have any side effects but might return an error.)
func (s *Syncer) SetHandlers(ctx context.Context) error {
if s.networkManager == nil {
return nil
Expand All @@ -236,8 +191,11 @@ func (s *Syncer) Run(ctx context.Context) error {
return nil
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

if s.config.ShardId.IsMainShard() {
if err := SetVersionHandler(ctx, s.networkManager, s.db); err != nil {
if err := s.versionChecker.SetVersionHandler(ctx); err != nil {
return fmt.Errorf("failed to set version handler: %w", err)
}
}
Expand All @@ -249,24 +207,62 @@ func (s *Syncer) Run(ctx context.Context) error {
s.fetchBlocks(ctx)
s.waitForSync.Done()

s.logger.Info().Msg("Syncer initialization complete")

if ctx.Err() != nil {
return nil
}

s.logger.Info().Msg("Running syncer...")

sub, err := s.networkManager.PubSub().Subscribe(s.topic)
if err != nil {
return fmt.Errorf("failed to subscribe to %s: %w", s.topic, err)
}
defer sub.Close()

if len(s.config.BootstrapPeers) == 0 {
s.logger.Info().Msg("No bootstrap peers. Skipping version check")

s.doFetch(ctx, sub)
return nil
}

go s.doFetch(ctx, sub)

return s.runVersionCheckLoop(ctx)
}

func (s *Syncer) runVersionCheckLoop(ctx context.Context) error {
version, err := s.versionChecker.GetLocalVersion(ctx)
if err != nil {
return err
}

return concurrent.RunTickerLoopWithErr(ctx, 10*time.Second, func(ctx context.Context) error {
remoteVersion, err := s.versionChecker.FetchRemoteVersion(ctx, s.config.BootstrapPeers)
if err != nil {
s.logger.Warn().Err(err).Msg("Failed to fetch remote version")
return nil
}

if version.ProtocolVersion != remoteVersion.ProtocolVersion {
return &ProtocolVersionMismatchError{
version.ProtocolVersion,
remoteVersion.ProtocolVersion,
}
}

if version.GenesisBlockHash != remoteVersion.GenesisBlockHash {
return fmt.Errorf("local version is outdated; local: %s, remote: %s", version, remoteVersion)
}

return nil
})
}

func (s *Syncer) doFetch(ctx context.Context, sub *network.Subscription) {
ch := sub.Start(ctx, true)
for {
select {
case <-ctx.Done():
s.logger.Debug().Msg("Syncer is terminated")
return nil
case msg := <-ch:
saved, err := s.processTopicTransaction(ctx, msg.Data)
if err != nil {
Expand All @@ -287,6 +283,9 @@ func (s *Syncer) Run(ctx context.Context) error {
s.logger.Warn().Msgf("No new block in the topic for %s, pulling blocks actively", s.config.Timeout)

s.fetchBlocks(ctx)
case <-ctx.Done():
s.logger.Info().Msg("Fetch is terminated")
return
}
}
}
Expand Down
Loading