20
20
import com .google .common .util .concurrent .FutureCallback ;
21
21
import com .google .common .util .concurrent .Futures ;
22
22
import com .google .common .util .concurrent .ListenableFuture ;
23
+ import com .google .common .util .concurrent .MoreExecutors ;
23
24
import org .apache .commons .collections4 .queue .CircularFifoQueue ;
24
25
import org .ethereum .core .*;
25
26
import org .ethereum .net .server .Channel ;
@@ -196,7 +197,7 @@ public void onFailure(Throwable t) {
196
197
logger .debug ("{}: Error receiving headers. Dropping the peer." , name , t );
197
198
any .getEthHandler ().dropConnection ();
198
199
}
199
- });
200
+ }, MoreExecutors . directExecutor () );
200
201
it .remove ();
201
202
reqHeadersCounter ++;
202
203
}
@@ -252,6 +253,7 @@ public void onFailure(Throwable t) {
252
253
if (blocksToAsk >= MAX_IN_REQUEST ) {
253
254
// SyncQueueIfc.BlocksRequest bReq = syncQueue.requestBlocks(maxBlocks);
254
255
256
+ boolean fewHeadersReqMode = false ;
255
257
if (bReqs .size () == 1 && bReqs .get (0 ).getBlockHeaders ().size () <= 3 ) {
256
258
// new blocks are better to request from the header senders first
257
259
// to get more chances to receive block body promptly
@@ -261,7 +263,9 @@ public void onFailure(Throwable t) {
261
263
ListenableFuture <List <Block >> futureBlocks =
262
264
channel .getEthHandler ().sendGetBlockBodies (singletonList (blockHeaderWrapper ));
263
265
if (futureBlocks != null ) {
264
- Futures .addCallback (futureBlocks , new BlocksCallback (channel ));
266
+ Futures .addCallback (futureBlocks , new BlocksCallback (channel ),
267
+ MoreExecutors .directExecutor ());
268
+ fewHeadersReqMode = true ;
265
269
}
266
270
}
267
271
}
@@ -285,12 +289,21 @@ public void onFailure(Throwable t) {
285
289
any .getEthHandler ().sendGetBlockBodies (blocksRequest .getBlockHeaders ());
286
290
blocksRequested += blocksRequest .getBlockHeaders ().size ();
287
291
if (futureBlocks != null ) {
288
- Futures .addCallback (futureBlocks , new BlocksCallback (any ));
292
+ Futures .addCallback (futureBlocks , new BlocksCallback (any ),
293
+ MoreExecutors .directExecutor ());
289
294
reqBlocksCounter ++;
290
295
it .remove ();
291
296
}
292
297
}
293
298
}
299
+
300
+ // Case when we have requested few headers and was not able
301
+ // to remove request from the list in above cycle because
302
+ // there were no idle peers or whatever
303
+ if (fewHeadersReqMode && !bReqs .isEmpty ()) {
304
+ bReqs .clear ();
305
+ }
306
+
294
307
receivedBlocksLatch = new CountDownLatch (max (reqBlocksCounter - 2 , 1 ));
295
308
receivedBlocksLatch .await (1000 , TimeUnit .MILLISECONDS );
296
309
} else {
0 commit comments