@@ -22,6 +22,15 @@ import (
22
22
"google.golang.org/protobuf/proto"
23
23
)
24
24
25
+ type ProtocolVersionMismatchError struct {
26
+ LocalVersion string
27
+ RemoteVersion string
28
+ }
29
+
30
+ func (e * ProtocolVersionMismatchError ) Error () string {
31
+ return fmt .Sprintf ("protocol version mismatch; local: %s, remote: %s" , e .LocalVersion , e .RemoteVersion )
32
+ }
33
+
25
34
type SyncerConfig struct {
26
35
execution.BlockGeneratorParams
27
36
@@ -75,34 +84,67 @@ func (s *Syncer) shardIsEmpty(ctx context.Context) (bool, error) {
75
84
return block == nil , nil
76
85
}
77
86
78
- func (s * Syncer ) WaitComplete () {
79
- s .waitForSync .Wait ()
87
+ func (s * Syncer ) WaitComplete (ctx context.Context ) error {
88
+ c := make (chan struct {}, 1 )
89
+ go func () {
90
+ defer close (c )
91
+ s .waitForSync .Wait ()
92
+ }()
93
+ select {
94
+ case <- ctx .Done ():
95
+ return ctx .Err ()
96
+ case <- c :
97
+ return nil
98
+ }
80
99
}
81
100
82
- func (s * Syncer ) readLocalVersion (ctx context.Context ) (common.Hash , error ) {
101
+ func (s * Syncer ) getLocalVersion (ctx context.Context ) (* NodeVersion , error ) {
102
+ protocolVersion := s .networkManager .ProtocolVersion ()
103
+
83
104
rotx , err := s .db .CreateRoTx (ctx )
84
105
if err != nil {
85
- return common . EmptyHash , err
106
+ return nil , err
86
107
}
87
108
defer rotx .Rollback ()
88
109
89
110
res , err := db .ReadBlockHashByNumber (rotx , types .MainShardId , 0 )
90
- if errors .Is (err , db .ErrKeyNotFound ) {
91
- return common .EmptyHash , nil
111
+ if err != nil {
112
+ if errors .Is (err , db .ErrKeyNotFound ) {
113
+ return & NodeVersion {protocolVersion , common .EmptyHash }, nil
114
+ } else {
115
+ return nil , err
116
+ }
92
117
}
93
- return res , err
118
+ return & NodeVersion { protocolVersion , res } , err
94
119
}
95
120
96
- func (s * Syncer ) fetchRemoteVersion (ctx context.Context ) (common.Hash , error ) {
121
+ type NodeVersion struct {
122
+ ProtocolVersion string
123
+ GenesisBlockHash common.Hash
124
+ }
125
+
126
+ func (s * Syncer ) fetchRemoteVersion (ctx context.Context ) (NodeVersion , error ) {
97
127
var err error
98
128
for _ , peer := range s .config .BootstrapPeers {
129
+ var peerId network.PeerID
130
+ peerId , err = s .networkManager .Connect (ctx , peer )
131
+ if err != nil {
132
+ continue
133
+ }
134
+
135
+ var protocolVersion string
136
+ protocolVersion , err = s .networkManager .GetPeerProtocolVersion (peerId )
137
+ if err != nil {
138
+ continue
139
+ }
140
+
99
141
var res common.Hash
100
- res , err = fetchGenesisBlockHash (ctx , s .networkManager , peer )
142
+ res , err = fetchGenesisBlockHash (ctx , s .networkManager , peerId )
101
143
if err == nil {
102
- return res , nil
144
+ return NodeVersion { protocolVersion , res } , nil
103
145
}
104
146
}
105
- return common . EmptyHash , fmt .Errorf ("failed to fetch version from all peers; last error: %w" , err )
147
+ return NodeVersion {} , fmt .Errorf ("failed to fetch version from all peers; last error: %w" , err )
106
148
}
107
149
108
150
func (s * Syncer ) fetchSnapshot (ctx context.Context ) error {
@@ -126,14 +168,10 @@ func (s *Syncer) Init(ctx context.Context, allowDbDrop bool) error {
126
168
return nil
127
169
}
128
170
129
- version , err := s .readLocalVersion (ctx )
171
+ version , err := s .getLocalVersion (ctx )
130
172
if err != nil {
131
173
return err
132
174
}
133
- if version .Empty () {
134
- s .logger .Info ().Msg ("Local version is empty. Fetching snapshot..." )
135
- return s .fetchSnapshot (ctx )
136
- }
137
175
138
176
remoteVersion , err := s .fetchRemoteVersion (ctx )
139
177
if err != nil {
@@ -142,7 +180,20 @@ func (s *Syncer) Init(ctx context.Context, allowDbDrop bool) error {
142
180
"Failed to fetch remote version. For now we assume that local version %s is up to date" , version )
143
181
return nil
144
182
}
145
- if version == remoteVersion {
183
+
184
+ if version .ProtocolVersion != remoteVersion .ProtocolVersion {
185
+ return & ProtocolVersionMismatchError {
186
+ version .ProtocolVersion ,
187
+ remoteVersion .ProtocolVersion ,
188
+ }
189
+ }
190
+
191
+ if version .GenesisBlockHash .Empty () {
192
+ s .logger .Info ().Msg ("Local version is empty. Fetching snapshot..." )
193
+ return s .fetchSnapshot (ctx )
194
+ }
195
+
196
+ if version .GenesisBlockHash == remoteVersion .GenesisBlockHash {
146
197
s .logger .Info ().Msgf ("Local version %s is up to date. Finished initialization" , version )
147
198
return nil
148
199
}
@@ -338,7 +389,7 @@ func (s *Syncer) saveBlock(ctx context.Context, block *types.BlockWithExtractedD
338
389
return nil
339
390
}
340
391
341
- func (s * Syncer ) GenerateZerostate (ctx context.Context ) error {
392
+ func (s * Syncer ) GenerateZerostateIfShardIsEmpty (ctx context.Context ) error {
342
393
ctx , cancel := context .WithTimeout (ctx , s .config .Timeout )
343
394
defer cancel ()
344
395
0 commit comments