Skip to content
This repository was archived by the owner on Dec 5, 2024. It is now read-only.

Commit a3535b0

Browse files
committed
Outgoing transaction size is limited
1 parent 057dea7 commit a3535b0

File tree

4 files changed

+108
-36
lines changed

4 files changed

+108
-36
lines changed

ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.ethereum.net.shh.ShhMessageFactory;
5050
import org.ethereum.net.swarm.bzz.BzzHandler;
5151
import org.ethereum.net.swarm.bzz.BzzMessageFactory;
52+
import org.ethereum.util.CollectionUtils;
5253
import org.slf4j.Logger;
5354
import org.slf4j.LoggerFactory;
5455
import org.springframework.beans.factory.annotation.Autowired;
@@ -121,6 +122,8 @@ public class Channel {
121122

122123
private PeerStatistics peerStats = new PeerStatistics();
123124

125+
public static final int MAX_SAFE_TXS = 192;
126+
124127
public void init(ChannelPipeline pipeline, String remoteId, boolean discoveryMode, ChannelManager channelManager) {
125128
this.channelManager = channelManager;
126129
this.remoteId = remoteId;
@@ -372,8 +375,28 @@ public void prohibitTransactionProcessing() {
372375
eth.disableTransactions();
373376
}
374377

375-
public void sendTransaction(List<Transaction> tx) {
376-
eth.sendTransaction(tx);
378+
/**
379+
* Send transactions from input to peer corresponded with channel
380+
* Using {@link #sendTransactionsSafely(List)} is recommended instead
381+
* @param txs Transactions
382+
*/
383+
public void sendTransactions(List<Transaction> txs) {
384+
eth.sendTransaction(txs);
385+
}
386+
387+
/**
388+
* Sames as {@link #sendTransactions(List)} but input list is randomly sliced to
389+
* contain not more than {@link MAX_SAFE_TXS} if needed
390+
* @param txs List of txs to send
391+
*/
392+
public void sendTransactionsSafely(List<Transaction> txs) {
393+
List<Transaction> slicedTxs;
394+
if (txs.size() <= MAX_SAFE_TXS) {
395+
slicedTxs = txs;
396+
} else {
397+
slicedTxs = CollectionUtils.truncateRand(txs, MAX_SAFE_TXS);
398+
}
399+
eth.sendTransaction(slicedTxs);
377400
}
378401

379402
public void sendNewBlock(Block block) {

ethereumj-core/src/main/java/org/ethereum/net/server/ChannelManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ private void process(Channel peer) {
231231
public void sendTransaction(List<Transaction> txs, Channel receivedFrom) {
232232
for (Channel channel : activePeers.values()) {
233233
if (channel != receivedFrom) {
234-
channel.sendTransaction(txs);
234+
channel.sendTransactionsSafely(txs);
235235
}
236236
}
237237
}
@@ -289,7 +289,7 @@ private void newTxDistributeLoop() {
289289
channel = newActivePeers.take();
290290
List<Transaction> pendingTransactions = pendingState.getPendingTransactions();
291291
if (!pendingTransactions.isEmpty()) {
292-
channel.sendTransaction(pendingTransactions);
292+
channel.sendTransactionsSafely(pendingTransactions);
293293
}
294294
} catch (InterruptedException e) {
295295
break;

ethereumj-core/src/main/java/org/ethereum/util/CollectionUtils.java

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.ethereum.util;
1919

2020
import java.util.*;
21+
import java.util.concurrent.ThreadLocalRandom;
2122
import java.util.function.Function;
2223
import java.util.function.Predicate;
2324

@@ -26,24 +27,7 @@
2627
* @since 14.07.2015
2728
*/
2829
public class CollectionUtils {
29-
30-
public static <K, V> List<V> collectList(Collection<K> items, Function<K, V> collector) {
31-
List<V> collected = new ArrayList<>(items.size());
32-
for(K item : items) {
33-
collected.add(collector.apply(item));
34-
}
35-
return collected;
36-
}
37-
38-
public static <K, V> Set<V> collectSet(Collection<K> items, Function<K, V> collector) {
39-
Set<V> collected = new HashSet<>();
40-
for(K item : items) {
41-
collected.add(collector.apply(item));
42-
}
43-
return collected;
44-
}
45-
46-
public static <T> List<T> truncate(List<T> items, int limit) {
30+
public static <T> List<T> truncate(final List<T> items, int limit) {
4731
if(limit > items.size()) {
4832
return new ArrayList<>(items);
4933
}
@@ -57,23 +41,34 @@ public static <T> List<T> truncate(List<T> items, int limit) {
5741
return truncated;
5842
}
5943

60-
public static <T> List<T> selectList(Collection<T> items, Predicate<T> predicate) {
61-
List<T> selected = new ArrayList<>();
62-
for(T item : items) {
63-
if(predicate.test(item)) {
64-
selected.add(item);
65-
}
44+
public static <T> List<T> truncateRand(final List<T> items, int limit) {
45+
if(limit > items.size()) {
46+
return new ArrayList<>(items);
6647
}
67-
return selected;
68-
}
48+
List<T> truncated = new ArrayList<>(limit);
6949

70-
public static <T> Set<T> selectSet(Collection<T> items, Predicate<T> predicate) {
71-
Set<T> selected = new HashSet<>();
72-
for(T item : items) {
73-
if(predicate.test(item)) {
74-
selected.add(item);
50+
LinkedList<Integer> index = new LinkedList<>();
51+
for (int i = 0; i < items.size(); ++i) {
52+
index.add(i);
53+
}
54+
55+
if (limit * 2 < items.size()) {
56+
// Limit is very small comparing to items.size()
57+
Set<Integer> smallIndex = new HashSet<>();
58+
for (int i = 0; i < limit; ++i) {
59+
int randomNum = ThreadLocalRandom.current().nextInt(0, index.size());
60+
smallIndex.add(index.remove(randomNum));
61+
}
62+
smallIndex.forEach(i -> truncated.add(items.get(i)));
63+
} else {
64+
// Limit is compared to items.size()
65+
for (int i = 0; i < items.size() - limit; ++i) {
66+
int randomNum = ThreadLocalRandom.current().nextInt(0, index.size());
67+
index.remove(randomNum);
7568
}
69+
index.forEach(i -> truncated.add(items.get(i)));
7670
}
77-
return selected;
71+
72+
return truncated;
7873
}
7974
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package org.ethereum.util;
2+
3+
import org.junit.Test;
4+
5+
import java.util.Arrays;
6+
import java.util.HashSet;
7+
import java.util.List;
8+
import java.util.Set;
9+
import java.util.concurrent.atomic.AtomicInteger;
10+
11+
import static org.junit.Assert.assertArrayEquals;
12+
import static org.junit.Assert.assertEquals;
13+
import static org.junit.Assert.assertTrue;
14+
15+
public class CollectionUtilsTest {
16+
17+
@Test
18+
public void test() {
19+
final List<Integer> input = Arrays.asList(2, 4, 6, 8, 10, 12, 14, 16, 18, 20);
20+
assertEquals(10, input.size());
21+
22+
List<Integer> resEqual = CollectionUtils.truncateRand(input, 10);
23+
assertArrayEquals(input.toArray(), resEqual.toArray());
24+
25+
List<Integer> resEqual2 = CollectionUtils.truncateRand(input, 20);
26+
assertArrayEquals(input.toArray(), resEqual2.toArray());
27+
28+
Set<Integer> excluded = new HashSet<>();
29+
for (int i = 0; i < 1000; ++i) {
30+
List<Integer> resMinusOne = CollectionUtils.truncateRand(input, 9);
31+
Set<Integer> resMinusOneSet = new HashSet<>(resMinusOne);
32+
assertEquals(resMinusOne.size(), resMinusOneSet.size());
33+
AtomicInteger exclusionCounter = new AtomicInteger(0);
34+
input.forEach(x -> {
35+
if(!resMinusOneSet.contains(x)) {
36+
excluded.add(x);
37+
exclusionCounter.getAndIncrement();
38+
}
39+
});
40+
assertEquals(1, exclusionCounter.get());
41+
}
42+
assertEquals("Someday I'll fail due to the nature of random", 10, excluded.size());
43+
44+
Set<Integer> included = new HashSet<>();
45+
for (int i = 0; i < 1000; ++i) {
46+
List<Integer> resOne = CollectionUtils.truncateRand(input, 1);
47+
included.add(resOne.get(0));
48+
assertTrue(input.contains(resOne.get(0)));
49+
}
50+
assertEquals("Someday I'll fail due to the nature of random", 10, included.size());
51+
52+
assertEquals(3, CollectionUtils.truncateRand(input, 3).size());
53+
}
54+
}

0 commit comments

Comments
 (0)