Skip to content
This repository was archived by the owner on Dec 5, 2024. It is now read-only.

Commit bb0d8b6

Browse files
authored
Merge pull request #1074 from ethereum/fix/remove-headers-db
Several fixes for FastSync
2 parents 99e22a5 + 58f0322 commit bb0d8b6

20 files changed

+256
-67
lines changed

ethereumj-core/src/main/java/org/ethereum/datasource/Serializers.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,12 @@ public DataWord deserialize(byte[] stream) {
9696
public final static Serializer<Value, byte[]> TrieNodeSerializer = new Serializer<Value, byte[]>() {
9797
@Override
9898
public byte[] serialize(Value object) {
99-
return object.encode();
99+
return object.asBytes();
100100
}
101101

102102
@Override
103103
public Value deserialize(byte[] stream) {
104-
return Value.fromRlpEncoded(stream);
104+
return new Value(stream);
105105
}
106106
};
107107

ethereumj-core/src/main/java/org/ethereum/db/migrate/MigrateHeaderSourceTotalDiff.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@
4343
/**
4444
* @deprecated
4545
* TODO: Remove after a few versions (current: 1.7.3) or with DB version update
46+
* TODO: Make {@link FastSyncManager#removeHeadersDb(Logger)} private after removing
4647
* Also remove CommonConfig.headerSource with it as no more used
4748
*
4849
* - Repairs Headers DB after FastSync with skipHistory to be usable
4950
* a) Updates incorrect total difficulty
5051
* b) Migrates headers without index to usable scheme with index
5152
* - Removes headers DB otherwise as it's not needed
52-
* TODO: move DB removal to main logic. Not done yet to prevent any conflicts
5353
*/
5454
@Deprecated
5555
public class MigrateHeaderSourceTotalDiff implements Runnable {
@@ -85,19 +85,12 @@ public void run() {
8585
}
8686

8787
logger.info("Fast Sync was used. Checking if migration required.");
88-
if (blockStore.getBestBlock().getNumber() > 0 &&
89-
blockStore.getChainBlockByNumber(1) != null) {
90-
// Everything is cool but maybe we could remove unused DB?
91-
Path headersDbPath = Paths.get(systemProperties.databaseDir(), "headers");
92-
if (Files.exists(headersDbPath)) {
93-
logger.info("Headers DB was used during FastSync but not required any more. Removing.");
94-
FileUtil.recursiveDelete(headersDbPath.toString());
95-
logger.info("Headers DB removed. Migration is over");
96-
} else {
97-
logger.info("No migration required.");
98-
return;
99-
}
100-
} else if (blockStore.getBestBlock().getNumber() > 0) {
88+
boolean dbRemoved = fastSyncManager.removeHeadersDb(logger);
89+
if (dbRemoved) {
90+
logger.info("Migration finished.");
91+
return;
92+
}
93+
if (blockStore.getBestBlock().getNumber() > 0 && blockStore.getChainBlockByNumber(1) == null) {
10194
// Maybe migration of headerStore and totalDifficulty is required?
10295
HeaderStore headerStore = ctx.getBean(HeaderStore.class);
10396
if (headerStore.getHeaderByNumber(1) != null) {
@@ -138,6 +131,8 @@ public void run() {
138131
flushManager.commit();
139132
flushManager.flush();
140133
logger.info("headerStore migration finished. No more migrations required");
134+
} else {
135+
logger.info("No migration required.");
141136
}
142137
}
143138
}

ethereumj-core/src/main/java/org/ethereum/net/eth/handler/Eth62.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -878,11 +878,12 @@ public String getSyncStats() {
878878
int waitResp = lastReqSentTime > 0 ? (int) (System.currentTimeMillis() - lastReqSentTime) / 1000 : 0;
879879
long lifeTime = System.currentTimeMillis() - connectedTime;
880880
return String.format(
881-
"Peer %s: [ %s, %18s, ping %6s ms, difficulty %s, best block %s%s]: (idle %s of %s) %s",
881+
"Peer %s: [ %s, %18s, ping %6s ms, rep: %s, difficulty %s, best block %s%s]: (idle %s of %s) %s",
882882
getVersion(),
883883
channel.getPeerIdShort(),
884884
peerState,
885885
(int)channel.getPeerStats().getAvgLatency(),
886+
channel.getNodeStatistics().getReputation(),
886887
getTotalDifficulty(),
887888
getBestKnownBlock().getNumber(),
888889
waitResp > 5 ? ", wait " + waitResp + "s" : " ",

ethereumj-core/src/main/java/org/ethereum/net/eth/handler/Eth63.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.ethereum.net.eth.message.NodeDataMessage;
3434
import org.ethereum.net.eth.message.ReceiptsMessage;
3535

36+
import org.ethereum.net.message.ReasonCode;
3637
import org.ethereum.sync.PeerState;
3738
import org.ethereum.util.ByteArraySet;
3839
import org.ethereum.util.Value;
@@ -45,6 +46,7 @@
4546
import java.util.List;
4647
import java.util.Set;
4748

49+
import static org.ethereum.crypto.HashUtil.sha3;
4850
import static org.ethereum.net.eth.EthVersion.V63;
4951
import static org.ethereum.util.ByteUtil.toHexString;
5052

@@ -186,19 +188,22 @@ protected synchronized void processNodeData(NodeDataMessage msg) {
186188

187189
List<Pair<byte[], byte[]>> ret = new ArrayList<>();
188190
if(msg.getDataList().isEmpty()) {
189-
String err = "Received NodeDataMessage contains empty node data. Dropping peer " + channel;
190-
dropUselessPeer(err);
191+
String err = String.format("Received NodeDataMessage contains empty node data. Dropping peer %s", channel);
192+
logger.debug(err);
193+
requestNodesFuture.setException(new RuntimeException(err));
194+
// Not fatal but let us touch it later
195+
channel.getChannelManager().disconnect(channel, ReasonCode.TOO_MANY_PEERS);
191196
return;
192197
}
193198

194199
for (Value nodeVal : msg.getDataList()) {
195-
byte[] hash = nodeVal.hash();
200+
byte[] hash = sha3(nodeVal.asBytes());
196201
if (!requestedNodes.contains(hash)) {
197202
String err = "Received NodeDataMessage contains non-requested node with hash :" + toHexString(hash) + " . Dropping peer " + channel;
198203
dropUselessPeer(err);
199204
return;
200205
}
201-
ret.add(Pair.of(hash, nodeVal.encode()));
206+
ret.add(Pair.of(hash, nodeVal.asBytes()));
202207
}
203208
requestNodesFuture.set(ret);
204209

ethereumj-core/src/main/java/org/ethereum/net/eth/message/NodeDataMessage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private void parse() {
4949
dataList = new ArrayList<>();
5050
for (int i = 0; i < paramsList.size(); ++i) {
5151
// Need it AS IS
52-
dataList.add(Value.fromRlpEncoded(paramsList.get(i).getRLPData()));
52+
dataList.add(new Value(paramsList.get(i).getRLPData()));
5353
}
5454
parsed = true;
5555
}
@@ -58,7 +58,7 @@ private void encode() {
5858
List<byte[]> dataListRLP = new ArrayList<>();
5959
for (Value value: dataList) {
6060
if (value == null) continue; // Bad sign
61-
dataListRLP.add(value.getData());
61+
dataListRLP.add(RLP.encodeElement(value.asBytes()));
6262
}
6363
byte[][] encodedElementArray = dataListRLP.toArray(new byte[dataListRLP.size()][]);
6464
this.encoded = RLP.encodeList(encodedElementArray);

ethereumj-core/src/main/java/org/ethereum/net/rlpx/MessageCodec.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ protected void decode(ChannelHandlerContext ctx, Frame frame, List<Object> out)
9696
Frame completeFrame = null;
9797
if (frame.isChunked()) {
9898
if (!supportChunkedFrames && frame.totalFrameSize > 0) {
99-
throw new RuntimeException("Faming is not supported in this configuration.");
99+
throw new RuntimeException("Framing is not supported in this configuration.");
100100
}
101101

102102
Pair<? extends List<Frame>, AtomicInteger> frameParts = incompleteFrames.get(frame.contextId);
@@ -157,15 +157,15 @@ private Message decodeMessage(ChannelHandlerContext ctx, List<Frame> frames) thr
157157
Message msg;
158158
try {
159159
msg = createMessage((byte) frameType, payload);
160+
161+
if (loggerNet.isDebugEnabled())
162+
loggerNet.debug("From: {} Recv: {}", channel, msg.toString());
160163
} catch (Exception ex) {
161-
loggerNet.debug("Incorrectly encoded message from: \t{}, dropping peer", channel);
164+
loggerNet.debug(String.format("Incorrectly encoded message from: \t%s, dropping peer", channel), ex);
162165
channel.disconnect(ReasonCode.BAD_PROTOCOL);
163166
return null;
164167
}
165168

166-
if (loggerNet.isDebugEnabled())
167-
loggerNet.debug("From: {} Recv: {}", channel, msg.toString());
168-
169169
ethereumListener.onRecvMessage(channel, msg);
170170

171171
channel.getNodeStatistics().rlpxInMessages.add();

ethereumj-core/src/main/java/org/ethereum/net/rlpx/SnappyCodec.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ protected void encode(ChannelHandlerContext ctx, FrameCodec.Frame msg, List<Obje
5959
// stay consistent with decoding party
6060
if (msg.size > MAX_SIZE) {
6161
logger.info("{}: outgoing frame size exceeds the limit ({} bytes), disconnect", channel, msg.size);
62-
channel.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.USELESS_PEER);
6362
channel.disconnect(ReasonCode.USELESS_PEER);
6463
return;
6564
}
@@ -81,7 +80,6 @@ protected void decode(ChannelHandlerContext ctx, FrameCodec.Frame msg, List<Obje
8180
long uncompressedLength = Snappy.uncompressedLength(in) & 0xFFFFFFFFL;
8281
if (uncompressedLength > MAX_SIZE) {
8382
logger.info("{}: uncompressed frame size exceeds the limit ({} bytes), drop the peer", channel, uncompressedLength);
84-
channel.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.BAD_PROTOCOL);
8583
channel.disconnect(ReasonCode.BAD_PROTOCOL);
8684
return;
8785
}
@@ -94,7 +92,6 @@ protected void decode(ChannelHandlerContext ctx, FrameCodec.Frame msg, List<Obje
9492
// 5 - error code for framed snappy
9593
if (detailMessage.startsWith("FAILED_TO_UNCOMPRESS") && detailMessage.contains("5")) {
9694
logger.info("{}: Snappy frames are not allowed in DEVp2p protocol, drop the peer", channel);
97-
channel.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.BAD_PROTOCOL);
9895
channel.disconnect(ReasonCode.BAD_PROTOCOL);
9996
return;
10097
} else {

ethereumj-core/src/main/java/org/ethereum/net/rlpx/discover/NodeManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,16 @@ public NodeStatistics getNodeStatistics(Node n) {
240240
return getNodeHandler(n).getNodeStatistics();
241241
}
242242

243+
/**
244+
* Checks whether peers with such InetSocketAddress has penalize disconnect record
245+
* @param addr Peer address
246+
* @return true if penalized, false if not or no records
247+
*/
248+
public boolean isReputationPenalized(InetSocketAddress addr) {
249+
return getNodeStatistics(new Node(new byte[0], addr.getHostString(),
250+
addr.getPort())).isReputationPenalized();
251+
}
252+
243253
@Override
244254
public void accept(DiscoveryEvent discoveryEvent) {
245255
handleInbound(discoveryEvent);

ethereumj-core/src/main/java/org/ethereum/net/rlpx/discover/NodeStatistics.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.atomic.AtomicLong;
3030

3131
import static java.lang.Math.min;
32+
import static org.ethereum.net.server.ChannelManager.INBOUND_CONNECTION_BAN_TIMEOUT;
3233

3334
/**
3435
* Handles all possible statistics related to a Node
@@ -42,7 +43,6 @@ public class NodeStatistics {
4243
public final static int REPUTATION_HANDSHAKE = 3000;
4344
public final static int REPUTATION_AUTH = 1000;
4445
public final static int REPUTATION_DISCOVER_PING = 1;
45-
public final static long TOO_MANY_PEERS_PENALIZE_TIMEOUT = 10 * 1000;
4646

4747
public class StatHandler {
4848
AtomicLong count = new AtomicLong(0);
@@ -145,14 +145,14 @@ public int getReputation() {
145145
return isReputationPenalized() ? 0 : persistedReputation / 2 + getSessionReputation();
146146
}
147147

148-
private boolean isReputationPenalized() {
148+
public boolean isReputationPenalized() {
149149
if (wrongFork) return true;
150150
if (wasDisconnected() && rlpxLastRemoteDisconnectReason == ReasonCode.TOO_MANY_PEERS &&
151-
System.currentTimeMillis() - lastDisconnectedTime < TOO_MANY_PEERS_PENALIZE_TIMEOUT) {
151+
System.currentTimeMillis() - lastDisconnectedTime < INBOUND_CONNECTION_BAN_TIMEOUT) {
152152
return true;
153153
}
154154
if (wasDisconnected() && rlpxLastRemoteDisconnectReason == ReasonCode.DUPLICATE_PEER &&
155-
System.currentTimeMillis() - lastDisconnectedTime < TOO_MANY_PEERS_PENALIZE_TIMEOUT) {
155+
System.currentTimeMillis() - lastDisconnectedTime < INBOUND_CONNECTION_BAN_TIMEOUT) {
156156
return true;
157157
}
158158
return rlpxLastLocalDisconnectReason == ReasonCode.NULL_IDENTITY ||

ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ public ByteArrayWrapper getNodeIdWrapper() {
314314
}
315315

316316
public void disconnect(ReasonCode reason) {
317+
getNodeStatistics().nodeDisconnectedLocal(reason);
317318
msgQueue.disconnect(reason);
318319
}
319320

ethereumj-core/src/main/java/org/ethereum/net/server/ChannelManager.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,11 @@ public class ChannelManager {
5454
// If the inbound peer connection was dropped by us with a reason message
5555
// then we ban that peer IP on any connections for some time to protect from
5656
// too active peers
57-
private static final int inboundConnectionBanTimeout = 10 * 1000;
57+
public static final int INBOUND_CONNECTION_BAN_TIMEOUT = 120 * 1000;
5858

5959
private List<Channel> newPeers = new CopyOnWriteArrayList<>();
60+
// Limiting number of new peers to avoid delays in processing
61+
private static final int MAX_NEW_PEERS = 128;
6062
private final Map<ByteArrayWrapper, Channel> activePeers = new ConcurrentHashMap<>();
6163

6264
private ScheduledExecutorService mainWorker = Executors.newSingleThreadScheduledExecutor();
@@ -192,7 +194,7 @@ private void processNewPeers() {
192194
newPeers.removeAll(processed);
193195
}
194196

195-
private void disconnect(Channel peer, ReasonCode reason) {
197+
public void disconnect(Channel peer, ReasonCode reason) {
196198
logger.debug("Disconnecting peer with reason " + reason + ": " + peer);
197199
peer.disconnect(reason);
198200
recentlyDisconnected.put(peer.getInetSocketAddress().getAddress(), new Date());
@@ -201,7 +203,7 @@ private void disconnect(Channel peer, ReasonCode reason) {
201203
public boolean isRecentlyDisconnected(InetAddress peerAddr) {
202204
Date disconnectTime = recentlyDisconnected.get(peerAddr);
203205
if (disconnectTime != null &&
204-
System.currentTimeMillis() - disconnectTime.getTime() < inboundConnectionBanTimeout) {
206+
System.currentTimeMillis() - disconnectTime.getTime() < INBOUND_CONNECTION_BAN_TIMEOUT) {
205207
return true;
206208
} else {
207209
recentlyDisconnected.remove(peerAddr);
@@ -343,10 +345,23 @@ public Collection<Channel> getActivePeers() {
343345
return new ArrayList<>(activePeers.values());
344346
}
345347

348+
/**
349+
* Checks whether newPeers is not full
350+
* newPeers are used to fill up active peers
351+
* @return True if there are free slots for new peers
352+
*/
353+
public boolean acceptingNewPeers() {
354+
return newPeers.size() < Math.max(config.maxActivePeers(), MAX_NEW_PEERS);
355+
}
356+
346357
public Channel getActivePeer(byte[] nodeId) {
347358
return activePeers.get(new ByteArrayWrapper(nodeId));
348359
}
349360

361+
public SyncManager getSyncManager() {
362+
return syncManager;
363+
}
364+
350365
public void close() {
351366
try {
352367
logger.info("Shutting down block and tx distribute threads...");

ethereumj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import io.netty.channel.*;
2121
import io.netty.channel.socket.nio.NioSocketChannel;
22+
import org.ethereum.net.rlpx.Node;
23+
import org.ethereum.net.rlpx.discover.NodeManager;
2224
import org.slf4j.Logger;
2325
import org.slf4j.LoggerFactory;
2426
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,6 +44,9 @@ public class EthereumChannelInitializer extends ChannelInitializer<NioSocketChan
4244
@Autowired
4345
ChannelManager channelManager;
4446

47+
@Autowired
48+
NodeManager nodeManager;
49+
4550
private String remoteId;
4651

4752
private boolean peerDiscoveryMode = false;
@@ -57,11 +62,34 @@ public void initChannel(NioSocketChannel ch) throws Exception {
5762
logger.debug("Open {} connection, channel: {}", isInbound() ? "inbound" : "outbound", ch.toString());
5863
}
5964

60-
if (isInbound() && channelManager.isRecentlyDisconnected(ch.remoteAddress().getAddress())) {
61-
// avoid too frequent connection attempts
62-
logger.debug("Drop connection - the same IP was disconnected recently, channel: {}", ch.toString());
63-
ch.disconnect();
64-
return;
65+
// For incoming connection drop if..
66+
if (isInbound()) {
67+
boolean needToDrop = false;
68+
// Bad remote address
69+
if (ch.remoteAddress() == null) {
70+
logger.debug("Drop connection - bad remote address, channel: {}", ch.toString());
71+
needToDrop = true;
72+
}
73+
// Avoid too frequent connection attempts
74+
if (!needToDrop && channelManager.isRecentlyDisconnected(ch.remoteAddress().getAddress())) {
75+
logger.debug("Drop connection - the same IP was disconnected recently, channel: {}", ch.toString());
76+
needToDrop = true;
77+
}
78+
// Drop bad peers before creating channel
79+
if (!needToDrop && nodeManager.isReputationPenalized(ch.remoteAddress())) {
80+
logger.debug("Drop connection - bad peer, channel: {}", ch.toString());
81+
needToDrop = true;
82+
}
83+
// Drop if we have long waiting queue already
84+
if (!needToDrop && !channelManager.acceptingNewPeers()) {
85+
logger.debug("Drop connection - many new peers are not processed, channel: {}", ch.toString());
86+
needToDrop = true;
87+
}
88+
89+
if (needToDrop) {
90+
ch.disconnect();
91+
return;
92+
}
6593
}
6694

6795
final Channel channel = ctx.getBean(Channel.class);

ethereumj-core/src/main/java/org/ethereum/sync/BlockDownloader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ private void headerRetrieveLoop() {
178178
logger.debug("{} headerRetrieveLoop: No IDLE peers found", name);
179179
break;
180180
} else {
181-
logger.debug("{} headerRetrieveLoop: request headers (" + headersRequest.getStart() + ") from " + any.getNode(), name);
181+
logger.debug("{} headerRetrieveLoop: request headers (" + headersRequest.toString() + ") from " + any.getNode(), name);
182182
ListenableFuture<List<BlockHeader>> futureHeaders = headersRequest.getHash() == null ?
183183
any.getEthHandler().sendGetBlockHeaders(headersRequest.getStart(), headersRequest.getCount(), headersRequest.isReverse()) :
184184
any.getEthHandler().sendGetBlockHeaders(headersRequest.getHash(), headersRequest.getCount(), headersRequest.getStep(), headersRequest.isReverse());

0 commit comments

Comments
 (0)