Skip to content
This repository was archived by the owner on Mar 3, 2023. It is now read-only.

Commit f0b6e1b

Browse files
authored
Make log/sink/consume Streamlet component support setName and setNumPartitions (#3459)
1 parent 1c59862 commit f0b6e1b

File tree

17 files changed

+472
-209
lines changed

17 files changed

+472
-209
lines changed

examples/src/java/org/apache/heron/examples/streamlet/StreamletCloneTopology.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,15 @@ public static void main(String[] args) throws Exception {
144144
* Elements in the first cloned streamlet go to the database sink.
145145
*/
146146
splitGameScoreStreamlet.get(0)
147-
.toSink(new DatabaseSink());
147+
.toSink(new DatabaseSink())
148+
.setName("sink0");
148149

149150
/**
150151
* Elements in the second cloned streamlet go to the logging sink.
151152
*/
152153
splitGameScoreStreamlet.get(1)
153-
.toSink(new FormattedLogSink());
154+
.toSink(new FormattedLogSink())
155+
.setName("sink1");
154156

155157
Config config = Config.defaultConfig();
156158

examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaClassicalMusicTopology.scala

+1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ object ScalaClassicalMusicTopology {
9292
)
9393
.setName("joined-classical-musics-by-year")
9494
.log()
95+
.setName("log")
9596

9697
val config = Config.defaultConfig()
9798

examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaIntegerProcessingTopology.scala

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ object ScalaIntegerProcessingTopology {
5858
.union(zeroes)
5959
.setName("union-of-numbers")
6060
.log()
61+
.setName("log")
62+
.setNumPartitions(1)
6163

6264
val config = Config.newBuilder
6365
.setNumContainers(NUM_CONTAINERS)

heron/api/src/java/org/apache/heron/streamlet/Streamlet.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
* Streamlet before doing the transformation.
4646
*/
4747
@InterfaceStability.Evolving
48-
public interface Streamlet<R> {
48+
public interface Streamlet<R> extends StreamletBase<R> {
4949

5050
/**
5151
* Sets the name of the BaseStreamlet.
@@ -299,21 +299,21 @@ <K> KVStreamlet<KeyedWindow<K>, Long> countByKeyAndWindow(
299299
* Logs every element of the streamlet using String.valueOf function
300300
* This is one of the sink functions in the sense that this operation returns void
301301
*/
302-
void log();
302+
StreamletBase<R> log();
303303

304304
/**
305305
* Applies the consumer function to every element of the stream
306306
* This function does not return anything.
307307
* @param consumer The user supplied consumer function that is invoked for each element
308308
* of this streamlet.
309309
*/
310-
void consume(SerializableConsumer<R> consumer);
310+
StreamletBase<R> consume(SerializableConsumer<R> consumer);
311311

312312
/**
313313
* Applies the sink's put function to every element of the stream
314314
* This function does not return anything.
315315
* @param sink The Sink whose put method consumes each element
316316
* of this streamlet.
317317
*/
318-
void toSink(Sink<R> sink);
318+
StreamletBase<R> toSink(Sink<R> sink);
319319
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.heron.streamlet;
21+
22+
/**
23+
* A Streamlet is a (potentially unbounded) ordered collection of tuples.
24+
* The StreamletBase class contains basic information of a Streamlet
25+
* such as name and partition count without the connection functions
26+
* such as map() and filter().
27+
*/
28+
public interface StreamletBase<R> {
29+
30+
/**
31+
* Sets the name of the BaseStreamlet.
32+
* @param sName The name given by the user for this BaseStreamlet
33+
* @return Returns back the Streamlet with changed name
34+
*/
35+
StreamletBase<R> setName(String sName);
36+
37+
/**
38+
* Gets the name of the Streamlet.
39+
* @return Returns the name of the Streamlet
40+
*/
41+
String getName();
42+
43+
/**
44+
* Sets the number of partitions of the streamlet
45+
* @param numPartitions The user assigned number of partitions
46+
* @return Returns back the Streamlet with changed number of partitions
47+
*/
48+
StreamletBase<R> setNumPartitions(int numPartitions);
49+
50+
/**
51+
* Gets the number of partitions of this Streamlet.
52+
* @return the number of partitions of this Streamlet
53+
*/
54+
int getNumPartitions();
55+
56+
// This is the main interface that every Streamlet implementation should implement
57+
// The main tasks are generally to make sure that appropriate names/partitions are
58+
// computed and add a spout/bolt to the TopologyBuilder
59+
// void build(TopologyBuilder bldr, Set<String> stageNames);
60+
}

heron/api/src/java/org/apache/heron/streamlet/impl/BuilderImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public TopologyBuilder build(TopologyBuilder builder) {
9191
streamlet.build(builder, stageNames);
9292
}
9393
for (StreamletImpl<?> streamlet : sources) {
94-
if (!streamlet.allBuilt()) {
94+
if (!streamlet.isFullyBuilt()) {
9595
throw new RuntimeException("Topology cannot be fully built! Are all sources added?");
9696
}
9797
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.heron.streamlet.impl;
20+
21+
import java.util.LinkedList;
22+
import java.util.List;
23+
import java.util.Set;
24+
import java.util.logging.Logger;
25+
26+
import org.apache.heron.api.topology.TopologyBuilder;
27+
import org.apache.heron.streamlet.StreamletBase;
28+
29+
import static org.apache.heron.streamlet.impl.utils.StreamletUtils.checkNotBlank;
30+
import static org.apache.heron.streamlet.impl.utils.StreamletUtils.require;
31+
32+
/**
33+
* A Streamlet is a (potentially unbounded) ordered collection of tuples.
34+
* Streamlets originate from pub/sub systems(such Pulsar/Kafka), or from
35+
* static data(such as csv files, HDFS files), or for that matter any other
36+
* source. They are also created by transforming existing Streamlets using
37+
* operations such as map/flatMap, etc.
38+
* Besides the tuples, a Streamlet has the following properties associated with it
39+
* a) name. User assigned or system generated name to refer the streamlet
40+
* b) nPartitions. Number of partitions that the streamlet is composed of. Thus the
41+
* ordering of the tuples in a Streamlet is wrt the tuples within a partition.
42+
* This allows the system to distribute each partition to different nodes across the cluster.
43+
* A bunch of transformations can be done on Streamlets(like map/flatMap, etc.). Each
44+
* of these transformations operate on every tuple of the Streamlet and produce a new
45+
* Streamlet. One can think of a transformation attaching itself to the stream and processing
46+
* each tuple as they go by. Thus the parallelism of any operator is implicitly determined
47+
* by the number of partitions of the stream that it is operating on. If a particular
48+
* transformation wants to operate at a different parallelism, one can repartition the
49+
* Streamlet before doing the transformation.
50+
*/
51+
public abstract class StreamletBaseImpl<R> implements StreamletBase<R> {
52+
private static final Logger LOG = Logger.getLogger(StreamletBaseImpl.class.getName());
53+
protected String name;
54+
protected int nPartitions;
55+
private List<StreamletBaseImpl<?>> children;
56+
private boolean built;
57+
58+
/**
59+
* Only used by the implementors
60+
*/
61+
protected StreamletBaseImpl() {
62+
this.name = null;
63+
this.nPartitions = -1;
64+
this.children = new LinkedList<>();
65+
this.built = false;
66+
}
67+
68+
protected enum StreamletNamePrefix {
69+
CONSUMER("consumer"),
70+
COUNT("count"),
71+
CUSTOM("custom"),
72+
CUSTOM_BASIC("customBasic"),
73+
CUSTOM_WINDOW("customWindow"),
74+
FILTER("filter"),
75+
FLATMAP("flatmap"),
76+
JOIN("join"),
77+
KEYBY("keyBy"),
78+
LOGGER("logger"),
79+
MAP("map"),
80+
SOURCE("generator"),
81+
REDUCE("reduce"),
82+
REMAP("remap"),
83+
SINK("sink"),
84+
SPLIT("split"),
85+
SPOUT("spout"),
86+
SUPPLIER("supplier"),
87+
TRANSFORM("transform"),
88+
UNION("union");
89+
90+
private final String prefix;
91+
92+
StreamletNamePrefix(final String prefix) {
93+
this.prefix = prefix;
94+
}
95+
96+
@Override
97+
public String toString() {
98+
return prefix;
99+
}
100+
}
101+
102+
/**
103+
* Sets the name of the Streamlet.
104+
* @param sName The name given by the user for this streamlet
105+
* @return Returns back the Streamlet with changed name
106+
*/
107+
@Override
108+
public StreamletBase<R> setName(String sName) {
109+
checkNotBlank(sName, "Streamlet name cannot be null/blank");
110+
111+
this.name = sName;
112+
return this;
113+
}
114+
115+
/**
116+
* Gets the name of the Streamlet.
117+
* @return Returns the name of the Streamlet
118+
*/
119+
@Override
120+
public String getName() {
121+
return name;
122+
}
123+
124+
private String defaultNameCalculator(StreamletNamePrefix prefix, Set<String> stageNames) {
125+
int index = 1;
126+
String calculatedName;
127+
while (true) {
128+
calculatedName = new StringBuilder(prefix.toString()).append(index).toString();
129+
if (!stageNames.contains(calculatedName)) {
130+
break;
131+
}
132+
index++;
133+
}
134+
LOG.info("Calculated stage Name as " + calculatedName);
135+
return calculatedName;
136+
}
137+
138+
/**
139+
* Sets a default unique name to the Streamlet by type if it is not set.
140+
* Otherwise, just checks its uniqueness.
141+
* @param prefix The name prefix of this streamlet
142+
* @param stageNames The collections of created streamlet/stage names
143+
*/
144+
protected void setDefaultNameIfNone(StreamletNamePrefix prefix, Set<String> stageNames) {
145+
if (getName() == null) {
146+
setName(defaultNameCalculator(prefix, stageNames));
147+
}
148+
if (stageNames.contains(getName())) {
149+
throw new RuntimeException(String.format(
150+
"The stage name %s is used multiple times in the same topology", getName()));
151+
}
152+
stageNames.add(getName());
153+
}
154+
155+
/**
156+
* Sets the number of partitions of the streamlet
157+
* @param numPartitions The user assigned number of partitions
158+
* @return Returns back the Streamlet with changed number of partitions
159+
*/
160+
@Override
161+
public StreamletBase<R> setNumPartitions(int numPartitions) {
162+
require(numPartitions > 0, "Streamlet's partitions number should be > 0");
163+
164+
this.nPartitions = numPartitions;
165+
return this;
166+
}
167+
168+
/**
169+
* Gets the number of partitions of this Streamlet.
170+
* @return the number of partitions of this Streamlet
171+
*/
172+
@Override
173+
public int getNumPartitions() {
174+
return nPartitions;
175+
}
176+
177+
public <T> void addChild(StreamletBaseImpl<T> child) {
178+
children.add(child);
179+
}
180+
181+
/**
182+
* Gets all the children of this streamlet.
183+
* Children of a streamlet are streamlets that are resulting from transformations of elements of
184+
* this and potentially other streamlets.
185+
* @return The kid streamlets
186+
*/
187+
public List<StreamletBaseImpl<?>> getChildren() {
188+
return children;
189+
}
190+
191+
public void build(TopologyBuilder bldr, Set<String> stageNames) {
192+
if (built) {
193+
throw new RuntimeException("Logic Error While building " + getName());
194+
}
195+
196+
if (doBuild(bldr, stageNames)) {
197+
built = true;
198+
for (StreamletBaseImpl<?> streamlet : getChildren()) {
199+
streamlet.build(bldr, stageNames);
200+
}
201+
}
202+
}
203+
204+
public boolean isBuilt() {
205+
return built;
206+
}
207+
208+
public boolean isFullyBuilt() {
209+
if (!isBuilt()) {
210+
return false;
211+
}
212+
for (StreamletBaseImpl<?> child : children) {
213+
if (!child.isFullyBuilt()) {
214+
return false;
215+
}
216+
}
217+
return true;
218+
}
219+
220+
// This is the main interface that every Streamlet implementation should implement
221+
// The main tasks are generally to make sure that appropriate names/partitions are
222+
// computed and add a spout/bolt to the TopologyBuilder
223+
protected abstract boolean doBuild(TopologyBuilder bldr, Set<String> stageNames);
224+
}

0 commit comments

Comments
 (0)