Skip to content

Commit fd5acb4

Browse files
committed
Improve concurrency for needed parts. (apache#3107)
* Change concurrent Map * Change concurrent Map * HashMap changes for unneeded parts. * HashMap changes for unneeded parts. * Review changes * Changes HashMap for unneeded parts. * Improve concurrency for needed parts. * Remove unused imports. * Remove unused imports. * Remove unused imports. * Fix NPE (cherry picked from commit 545d381) * Fix WhitespaceAround * Add dummy Object * Fix ConstantName (cherry picked from commit 8d6d506)
1 parent e8d0bdc commit fd5acb4

File tree

13 files changed

+38
-27
lines changed

13 files changed

+38
-27
lines changed

heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
2425

2526
public class MultiAssignableMetric<T extends Number> implements IMetric<Map<String, T>> {
26-
private final Map<String, AssignableMetric<T>> value = new HashMap<>();
27+
private final Map<String, AssignableMetric<T>> value = new ConcurrentHashMap<>();
2728
private T initialValue;
2829

2930
public MultiAssignableMetric(T initialValue) {

heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
2425

2526
public class MultiCountMetric implements IMetric<Map<String, Long>> {
26-
private Map<String, CountMetric> value = new HashMap<>();
27+
private Map<String, CountMetric> value = new ConcurrentHashMap<>();
2728

2829
public MultiCountMetric() {
2930
}

heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
2425

2526
/*
2627
* A reduce metric that can hold multiple scoped values.
@@ -29,7 +30,7 @@
2930
* @param <V> type of reduced value
3031
*/
3132
public class MultiReducedMetric<T, U, V> implements IMetric<Map<String, V>> {
32-
private Map<String, ReducedMetric<T, U, V>> value = new HashMap<>();
33+
private Map<String, ReducedMetric<T, U, V>> value = new ConcurrentHashMap<>();
3334
private IReducer<T, U, V> reducer;
3435

3536
public MultiReducedMetric(IReducer<T, U, V> reducer) {

heron/api/src/java/org/apache/heron/api/tuple/Fields.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@
2222
import java.io.Serializable;
2323
import java.util.ArrayList;
2424
import java.util.Arrays;
25-
import java.util.HashMap;
2625
import java.util.Iterator;
2726
import java.util.List;
2827
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
2929

3030
public class Fields implements Iterable<String>, Serializable {
3131
private static final long serialVersionUID = -1045737418722082345L;
3232

3333
private List<String> fields;
34-
private Map<String, Integer> mIndex = new HashMap<String, Integer>();
34+
private Map<String, Integer> mIndex = new ConcurrentHashMap<String, Integer>();
3535

3636
public Fields(String... pFields) {
3737
this(Arrays.asList(pFields));

heron/common/src/java/org/apache/heron/common/network/HeronClient.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
import java.nio.channels.SelectableChannel;
2525
import java.nio.channels.SocketChannel;
2626
import java.time.Duration;
27-
import java.util.HashMap;
2827
import java.util.List;
2928
import java.util.Map;
29+
import java.util.Objects;
30+
import java.util.concurrent.ConcurrentHashMap;
3031
import java.util.logging.Level;
3132
import java.util.logging.Logger;
3233

@@ -57,6 +58,7 @@
5758
*/
5859
public abstract class HeronClient implements ISelectHandler {
5960
private static final Logger LOG = Logger.getLogger(HeronClient.class.getName());
61+
private static final Object DUMMY = new Object();
6062

6163
// When we send a request, we need to:
6264
// record the the context for this particular RID, and prepare the response for that RID
@@ -99,9 +101,9 @@ public HeronClient(NIOLooper s, String host, int port, HeronSocketOptions option
99101
socketOptions = options;
100102

101103
isConnected = false;
102-
contextMap = new HashMap<REQID, Object>();
103-
responseMessageMap = new HashMap<REQID, Message.Builder>();
104-
messageMap = new HashMap<String, Message.Builder>();
104+
contextMap = new ConcurrentHashMap<REQID, Object>();
105+
responseMessageMap = new ConcurrentHashMap<REQID, Message.Builder>();
106+
messageMap = new ConcurrentHashMap<String, Message.Builder>();
105107
}
106108

107109
// Register the protobuf Message's name with protobuf Message
@@ -193,7 +195,7 @@ public void sendRequest(Message request, Object context, Message.Builder respons
193195
Duration timeout) {
194196
// Pack it as a no-timeout request and send it!
195197
final REQID rid = REQID.generate();
196-
contextMap.put(rid, context);
198+
contextMap.put(rid, Objects.nonNull(context) ? context : DUMMY); // Fix NPE
197199
responseMessageMap.put(rid, responseBuilder);
198200

199201
// Add timeout for this request if necessary
@@ -402,15 +404,15 @@ public void forceFlushWithBestEffort() {
402404
// Following protected methods are just used for testing
403405
/////////////////////////////////////////////////////////
404406
protected Map<String, Message.Builder> getMessageMap() {
405-
return new HashMap<String, Message.Builder>(messageMap);
407+
return new ConcurrentHashMap<String, Message.Builder>(messageMap);
406408
}
407409

408410
protected Map<REQID, Message.Builder> getResponseMessageMap() {
409-
return new HashMap<REQID, Message.Builder>(responseMessageMap);
411+
return new ConcurrentHashMap<REQID, Message.Builder>(responseMessageMap);
410412
}
411413

412414
protected Map<REQID, Object> getContextMap() {
413-
return new HashMap<REQID, Object>(contextMap);
415+
return new ConcurrentHashMap<>(contextMap);
414416
}
415417

416418
protected SocketChannelHelper getSocketChannelHelper() {

heron/common/src/java/org/apache/heron/common/network/HeronServer.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import java.nio.channels.ServerSocketChannel;
2727
import java.nio.channels.SocketChannel;
2828
import java.time.Duration;
29-
import java.util.HashMap;
3029
import java.util.List;
3130
import java.util.Map;
31+
import java.util.concurrent.ConcurrentHashMap;
3232
import java.util.logging.Level;
3333
import java.util.logging.Logger;
3434

@@ -71,9 +71,9 @@ public HeronServer(NIOLooper s, String host, int port, HeronSocketOptions option
7171
nioLooper = s;
7272
endpoint = new InetSocketAddress(host, port);
7373
socketOptions = options;
74-
requestMap = new HashMap<String, Message.Builder>();
75-
messageMap = new HashMap<String, Message.Builder>();
76-
activeConnections = new HashMap<SocketChannel, SocketChannelHelper>();
74+
requestMap = new ConcurrentHashMap<String, Message.Builder>();
75+
messageMap = new ConcurrentHashMap<String, Message.Builder>();
76+
activeConnections = new ConcurrentHashMap<SocketChannel, SocketChannelHelper>();
7777
}
7878

7979
public InetSocketAddress getEndpoint() {

heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222

2323
import java.time.Duration;
2424
import java.util.Collection;
25-
import java.util.HashMap;
2625
import java.util.LinkedList;
2726
import java.util.List;
2827
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.logging.Logger;
3030

3131
import org.apache.heron.api.metric.CumulativeCountMetric;
@@ -53,8 +53,8 @@ public class MetricsCollector implements IMetricsRegister {
5353

5454
public MetricsCollector(WakeableLooper runnableToGatherMetrics,
5555
Communicator<Metrics.MetricPublisherPublishMessage> queue) {
56-
metrics = new HashMap<>();
57-
timeBucketToMetricNames = new HashMap<>();
56+
metrics = new ConcurrentHashMap<>();
57+
timeBucketToMetricNames = new ConcurrentHashMap<>();
5858
this.queue = queue;
5959
this.runnableToGatherMetrics = runnableToGatherMetrics;
6060
metricCollectionCount = new CumulativeCountMetric();

heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.LinkedList;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.concurrent.ConcurrentHashMap;
2930
import java.util.logging.Logger;
3031

3132
import org.apache.heron.api.Config;
@@ -101,7 +102,7 @@ public PhysicalPlanHelper(PhysicalPlans.PhysicalPlan pplan, String instanceId) {
101102
}
102103

103104
// setup outputSchema
104-
outputSchema = new HashMap<String, Integer>();
105+
outputSchema = new ConcurrentHashMap<String, Integer>();
105106
List<TopologyAPI.OutputStream> outputs;
106107
if (mySpout != null) {
107108
outputs = mySpout.getOutputsList();

heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Set;
29+
import java.util.concurrent.ConcurrentHashMap;
2930

3031
import org.apache.heron.api.Config;
3132
import org.apache.heron.api.generated.TopologyAPI;
@@ -63,8 +64,8 @@ public GeneralTopologyContextImpl(Map<String, Object> clusterConfig,
6364
this.topology = topology;
6465
this.topologyConfig = new HashMap<>(clusterConfig);
6566
this.taskToComponentMap = taskToComponentMap;
66-
this.inputs = new HashMap<>();
67-
this.outputs = new HashMap<>();
67+
this.inputs = new ConcurrentHashMap<>();
68+
this.outputs = new ConcurrentHashMap<>();
6869
this.componentsOutputFields = new HashMap<>();
6970

7071
for (int i = 0; i < this.topology.getSpoutsCount(); ++i) {

storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020

2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
2324

2425
public class MultiCountMetric implements IMetric {
25-
private Map<String, CountMetric> value = new HashMap<>();
26+
private Map<String, CountMetric> value = new ConcurrentHashMap<>();
2627

2728
public MultiCountMetric() {
2829
}

storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020

2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
2324

2425
@SuppressWarnings("rawtypes")
2526
public class MultiReducedMetric implements IMetric {
26-
private Map<String, ReducedMetric> value = new HashMap<>();
27+
private Map<String, ReducedMetric> value = new ConcurrentHashMap<>();
2728
private IReducer reducer;
2829

2930
public MultiReducedMetric(IReducer reducer) {

storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020

2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
2324

2425
public class MultiCountMetric implements IMetric {
25-
private Map<String, CountMetric> value = new HashMap<>();
26+
private Map<String, CountMetric> value = new ConcurrentHashMap<>();
2627

2728
public MultiCountMetric() {
2829
}

storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020

2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
2324

2425
@SuppressWarnings("rawtypes")
2526
public class MultiReducedMetric implements IMetric {
26-
private Map<String, ReducedMetric> value = new HashMap<>();
27+
private Map<String, ReducedMetric> value = new ConcurrentHashMap<>();
2728
private IReducer reducer;
2829

2930
public MultiReducedMetric(IReducer reducer) {

0 commit comments

Comments
 (0)