Skip to content

Commit 2b88a78

Browse files
committed
Merge branch 'master' into feature/fix-prometheus-metrics
* master: Improve concurrency for needed parts. (apache#3107) Add documents for setting up a docker based development environment (apache#3475) Patch to fix cppcheck with newer glibc (apache#3471) Make log/sink/consume Streamlet component support setName and setNumPartitions (apache#3459)
2 parents 249c66d + ee1c44a commit 2b88a78

File tree

36 files changed

+572
-250
lines changed

36 files changed

+572
-250
lines changed

WORKSPACE

+5-3
Original file line numberDiff line numberDiff line change
@@ -930,10 +930,12 @@ http_archive(
930930

931931
http_archive(
932932
name = "com_github_danmar_cppcheck",
933-
urls = ["https://github.com/danmar/cppcheck/archive/1.87.zip"],
934-
strip_prefix = "cppcheck-1.87",
935933
build_file = "@//:third_party/cppcheck/cppcheck.BUILD",
936-
sha256 = "b3de7fbdc1a23d7341b55f7f88877e106a76847bd5a07fa721c07310b625318b",
934+
patch_args = ["-p2"],
935+
patches = ["//third_party/cppcheck:cppcheck-readdir-fix.patch"],
936+
sha256 = "cb0e66cbe2d6b655fce430cfaaa74b83ad11c91f221e3926f1ca3211bb7c906b",
937+
strip_prefix = "cppcheck-1.90",
938+
urls = ["https://github.com/danmar/cppcheck/archive/1.90.zip"],
937939
)
938940

939941
http_archive(

examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,15 @@ public static void main(String[] args) throws Exception {
144144
* Elements in the first cloned streamlet go to the database sink.
145145
*/
146146
splitGameScoreStreamlet.get(0)
147-
.toSink(new DatabaseSink());
147+
.toSink(new DatabaseSink())
148+
.setName("sink0");
148149

149150
/**
150151
* Elements in the second cloned streamlet go to the logging sink.
151152
*/
152153
splitGameScoreStreamlet.get(1)
153-
.toSink(new FormattedLogSink());
154+
.toSink(new FormattedLogSink())
155+
.setName("sink1");
154156

155157
Config config = Config.defaultConfig();
156158

examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala

+1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ object ScalaClassicalMusicTopology {
9292
)
9393
.setName("joined-classical-musics-by-year")
9494
.log()
95+
.setName("log")
9596

9697
val config = Config.defaultConfig()
9798

examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ object ScalaIntegerProcessingTopology {
5858
.union(zeroes)
5959
.setName("union-of-numbers")
6060
.log()
61+
.setName("log")
62+
.setNumPartitions(1)
6163

6264
val config = Config.newBuilder
6365
.setNumContainers(NUM_CONTAINERS)

heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
2425

2526
public class MultiAssignableMetric<T extends Number> implements IMetric<Map<String, T>> {
26-
private final Map<String, AssignableMetric<T>> value = new HashMap<>();
27+
private final Map<String, AssignableMetric<T>> value = new ConcurrentHashMap<>();
2728
private T initialValue;
2829

2930
public MultiAssignableMetric(T initialValue) {

heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
2425

2526
public class MultiCountMetric implements IMetric<Map<String, Long>> {
26-
private Map<String, CountMetric> value = new HashMap<>();
27+
private Map<String, CountMetric> value = new ConcurrentHashMap<>();
2728

2829
public MultiCountMetric() {
2930
}

heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
2425

2526
/*
2627
* A reduce metric that can hold multiple scoped values.
@@ -29,7 +30,7 @@
2930
* @param <V> type of reduced value
3031
*/
3132
public class MultiReducedMetric<T, U, V> implements IMetric<Map<String, V>> {
32-
private Map<String, ReducedMetric<T, U, V>> value = new HashMap<>();
33+
private Map<String, ReducedMetric<T, U, V>> value = new ConcurrentHashMap<>();
3334
private IReducer<T, U, V> reducer;
3435

3536
public MultiReducedMetric(IReducer<T, U, V> reducer) {

heron/api/src/java/org/apache/heron/api/tuple/Fields.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@
2222
import java.io.Serializable;
2323
import java.util.ArrayList;
2424
import java.util.Arrays;
25-
import java.util.HashMap;
2625
import java.util.Iterator;
2726
import java.util.List;
2827
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
2929

3030
public class Fields implements Iterable<String>, Serializable {
3131
private static final long serialVersionUID = -1045737418722082345L;
3232

3333
private List<String> fields;
34-
private Map<String, Integer> mIndex = new HashMap<String, Integer>();
34+
private Map<String, Integer> mIndex = new ConcurrentHashMap<String, Integer>();
3535

3636
public Fields(String... pFields) {
3737
this(Arrays.asList(pFields));

heron/api/src/java/org/apache/heron/streamlet/Streamlet.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
* Streamlet before doing the transformation.
4646
*/
4747
@InterfaceStability.Evolving
48-
public interface Streamlet<R> {
48+
public interface Streamlet<R> extends StreamletBase<R> {
4949

5050
/**
5151
* Sets the name of the BaseStreamlet.
@@ -299,21 +299,21 @@ <K> KVStreamlet<KeyedWindow<K>, Long> countByKeyAndWindow(
299299
* Logs every element of the streamlet using String.valueOf function
300300
* This is one of the sink functions in the sense that this operation returns void
301301
*/
302-
void log();
302+
StreamletBase<R> log();
303303

304304
/**
305305
* Applies the consumer function to every element of the stream
306306
* This function does not return anything.
307307
* @param consumer The user supplied consumer function that is invoked for each element
308308
* of this streamlet.
309309
*/
310-
void consume(SerializableConsumer<R> consumer);
310+
StreamletBase<R> consume(SerializableConsumer<R> consumer);
311311

312312
/**
313313
* Applies the sink's put function to every element of the stream
314314
* This function does not return anything.
315315
* @param sink The Sink whose put method consumes each element
316316
* of this streamlet.
317317
*/
318-
void toSink(Sink<R> sink);
318+
StreamletBase<R> toSink(Sink<R> sink);
319319
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.heron.streamlet;
21+
22+
/**
23+
* A Streamlet is a (potentially unbounded) ordered collection of tuples.
24+
* The StreamletBase class contains basic information of a Streamlet
25+
* such as name and partition count without the connection functions
26+
* such as map() and filter().
27+
*/
28+
public interface StreamletBase<R> {
29+
30+
/**
31+
* Sets the name of the BaseStreamlet.
32+
* @param sName The name given by the user for this BaseStreamlet
33+
* @return Returns back the Streamlet with changed name
34+
*/
35+
StreamletBase<R> setName(String sName);
36+
37+
/**
38+
* Gets the name of the Streamlet.
39+
* @return Returns the name of the Streamlet
40+
*/
41+
String getName();
42+
43+
/**
44+
* Sets the number of partitions of the streamlet
45+
* @param numPartitions The user assigned number of partitions
46+
* @return Returns back the Streamlet with changed number of partitions
47+
*/
48+
StreamletBase<R> setNumPartitions(int numPartitions);
49+
50+
/**
51+
* Gets the number of partitions of this Streamlet.
52+
* @return the number of partitions of this Streamlet
53+
*/
54+
int getNumPartitions();
55+
56+
// This is the main interface that every Streamlet implementation should implement
57+
// The main tasks are generally to make sure that appropriate names/partitions are
58+
// computed and add a spout/bolt to the TopologyBuilder
59+
// void build(TopologyBuilder bldr, Set<String> stageNames);
60+
}

heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public TopologyBuilder build(TopologyBuilder builder) {
9191
streamlet.build(builder, stageNames);
9292
}
9393
for (StreamletImpl<?> streamlet : sources) {
94-
if (!streamlet.allBuilt()) {
94+
if (!streamlet.isFullyBuilt()) {
9595
throw new RuntimeException("Topology cannot be fully built! Are all sources added?");
9696
}
9797
}

0 commit comments

Comments
 (0)