Skip to content
This repository was archived by the owner on Mar 3, 2023. It is now read-only.

Commit ed6fd81

Browse files
authored
Add an example with Streamlet API using component config (#3496)
An example Streamlet topology demonstrating how to set cpu and memory limits per component.
1 parent 0f27ef4 commit ed6fd81

File tree

1 file changed

+95
-0
lines changed

1 file changed

+95
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
21+
package org.apache.heron.examples.streamlet;
22+
23+
import java.util.Arrays;
24+
import java.util.List;
25+
import java.util.logging.Logger;
26+
27+
import org.apache.heron.common.basics.ByteAmount;
28+
import org.apache.heron.examples.streamlet.utils.StreamletUtils;
29+
import org.apache.heron.streamlet.Builder;
30+
import org.apache.heron.streamlet.Config;
31+
import org.apache.heron.streamlet.Runner;
32+
33+
/**
34+
* This topology is an implementation of a simple topology using the
35+
* Streamlet API. CPU and ram are configured for each component.
36+
*/
37+
public final class ComponentConfigTopology {
38+
private ComponentConfigTopology() {
39+
}
40+
41+
private static final Logger LOG =
42+
Logger.getLogger(ComponentConfigTopology.class.getName());
43+
44+
private static final List<String> SENTENCES = Arrays.asList(
45+
"I have nothing to declare but my genius",
46+
"You can even",
47+
"Compassion is an action word with no boundaries",
48+
"To thine own self be true"
49+
);
50+
51+
public static void main(String[] args) throws Exception {
52+
Builder processingGraphBuilder = Builder.newBuilder();
53+
54+
processingGraphBuilder
55+
// The origin of the processing graph: an indefinite series of sentences chosen
56+
// from the list
57+
.newSource(() -> StreamletUtils.randomFromList(SENTENCES))
58+
.setName("random-sentences-source")
59+
// Each sentence is "flattened" into a Streamlet<String> of individual words
60+
.flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
61+
.setName("flatten-into-individual-words")
62+
// The final output is logged using a user-supplied format
63+
.consume(w -> {
64+
String logMessage = String.format("(word: %s)", w);
65+
LOG.info(logMessage);
66+
})
67+
.setName("consumer");
68+
69+
// The topology's parallelism (the number of containers across which the topology's
70+
// processing instance will be split) can be defined via the second command-line
71+
// argument (or else the default of 2 will be used).
72+
int topologyParallelism = StreamletUtils.getParallelism(args, 2);
73+
74+
Config config = Config.newBuilder()
75+
.setNumContainers(topologyParallelism)
76+
.setPerContainerCpu(1)
77+
.build();
78+
79+
config.getHeronConfig().setComponentCpu("random-sentences-source", 0.3);
80+
config.getHeronConfig().setComponentRam("random-sentences-source",
81+
ByteAmount.fromMegabytes(300));
82+
config.getHeronConfig().setComponentCpu("flatten-into-individual-words", 0.3);
83+
config.getHeronConfig().setComponentRam("flatten-into-individual-words",
84+
ByteAmount.fromMegabytes(300));
85+
config.getHeronConfig().setComponentCpu("consumer", 0.2);
86+
config.getHeronConfig().setComponentRam("consumer", ByteAmount.fromMegabytes(200));
87+
88+
// Fetches the topology name from the first command-line argument
89+
String topologyName = StreamletUtils.getTopologyName(args);
90+
91+
// Finally, the processing graph and configuration are passed to the Runner, which converts
92+
// the graph into a Heron topology that can be run in a Heron cluster.
93+
new Runner().run(topologyName, config, processingGraphBuilder);
94+
}
95+
}

0 commit comments

Comments
 (0)