Skip to content

Commit a1b761f

Browse files
xiaoyao1991huijunwu
authored andcommitted
Validate resource constraint (RAM and CPU) in RoundRobinPacking (apache#3142)
* init * general resource constraint validation * pass existing unit tests * add more tests * rename * rename * generic ResourceMeasure * fixed wc example * even more general generics * address comments * address comments by putting more tests * set safe amount of cpu * meaningful constants in ExampleResource
1 parent 5270725 commit a1b761f

File tree

8 files changed

+590
-152
lines changed

8 files changed

+590
-152
lines changed

examples/src/java/org/apache/heron/examples/api/ExampleResources.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
public final class ExampleResources {
2525

26+
static final long DEFAULT_RAM_PADDING_PER_CONTAINER = 2 * 1024;
2627
static final long COMPONENT_RAM_MB = 1024;
28+
static final double DEFAULT_CPU_PADDING_PER_CONTAINER = 1.0;
2729

2830
static ByteAmount getComponentRam() {
2931
return ByteAmount.fromMegabytes(COMPONENT_RAM_MB);
@@ -35,7 +37,13 @@ static ByteAmount getContainerDisk(int components, int containers) {
3537

3638
static ByteAmount getContainerRam(int components, int containers) {
3739
final int componentsPerContainer = Math.max(components / containers, 1);
38-
return ByteAmount.fromMegabytes(COMPONENT_RAM_MB * componentsPerContainer);
40+
return ByteAmount.fromMegabytes(COMPONENT_RAM_MB * componentsPerContainer
41+
+ DEFAULT_RAM_PADDING_PER_CONTAINER);
42+
}
43+
44+
static double getContainerCpu(int components, int containers) {
45+
final int componentsPerContainer = Math.max(components / containers, 1);
46+
return componentsPerContainer + DEFAULT_CPU_PADDING_PER_CONTAINER;
3947
}
4048

4149
private ExampleResources() {

examples/src/java/org/apache/heron/examples/api/WordCountTopology.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,16 @@ public static void main(String[] args) throws AlreadyAliveException, InvalidTopo
188188
conf.setComponentRam("consumer",
189189
ByteAmount.fromMegabytes(ExampleResources.COMPONENT_RAM_MB));
190190

191+
conf.setComponentCpu("word", 1.0);
192+
conf.setComponentCpu("consumer", 1.0);
193+
191194
// configure container resources
192195
conf.setContainerDiskRequested(
193196
ExampleResources.getContainerDisk(2 * parallelism, parallelism));
194197
conf.setContainerRamRequested(
195198
ExampleResources.getContainerRam(2 * parallelism, parallelism));
196-
conf.setContainerCpuRequested(2);
199+
conf.setContainerCpuRequested(
200+
ExampleResources.getContainerCpu(2 * parallelism, parallelism));
197201

198202
HeronSubmitter.submitTopology(args[0], conf, builder.createTopology());
199203
}

heron/common/src/java/org/apache/heron/common/basics/ByteAmount.java

+37-79
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
/**
2323
* Class that encapsulates number of bytes, with helpers to handle units properly.
2424
*/
25-
public final class ByteAmount implements Comparable<ByteAmount> {
25+
public final class ByteAmount extends ResourceMeasure<Long> {
2626
private static final long KB = 1024L;
2727
private static final long MB = KB * 1024;
2828
private static final long GB = MB * 1024;
@@ -34,10 +34,9 @@ public final class ByteAmount implements Comparable<ByteAmount> {
3434
private static final long MAX_KB = Math.round(Long.MAX_VALUE / KB);
3535

3636
public static final ByteAmount ZERO = ByteAmount.fromBytes(0);
37-
private final long bytes;
3837

39-
private ByteAmount(long bytes) {
40-
this.bytes = bytes;
38+
private ByteAmount(Long value) {
39+
super(value);
4140
}
4241

4342
/**
@@ -67,7 +66,8 @@ public static ByteAmount fromMegabytes(long megabytes) {
6766

6867
/**
6968
* Creates a ByteAmount value in gigabytes. If the gigabytes value
70-
* is &gt;= Long.MAX_VALUE / 1024 / 1024 / 1024, the byte representation is capped at Long.MAX_VALUE.
69+
* is &gt;= Long.MAX_VALUE / 1024 / 1024 / 1024,
70+
* the byte representation is capped at Long.MAX_VALUE.
7171
*
7272
* @param gigabytes value in gigabytes to represent
7373
* @return a ByteAmount object repressing the number of GBs passed
@@ -85,7 +85,7 @@ public static ByteAmount fromGigabytes(long gigabytes) {
8585
* @return number of bytes
8686
*/
8787
public long asBytes() {
88-
return bytes;
88+
return super.getValue();
8989
}
9090

9191
/**
@@ -97,7 +97,7 @@ public long asBytes() {
9797
* @return returns the ByteValue in MBs or 0 if the value is &lt; (1024 * 1024) / 2
9898
*/
9999
public long asMegabytes() {
100-
return Math.round((double) bytes / MB);
100+
return Math.round(value.doubleValue() / MB);
101101
}
102102

103103
/**
@@ -109,7 +109,7 @@ public long asMegabytes() {
109109
* @return returns the ByteValue in KBs or 0 if the value is &lt; (1024) / 2
110110
*/
111111
public long asKilobytes() {
112-
return Math.round((double) bytes / KB);
112+
return Math.round(value.doubleValue() / KB);
113113
}
114114

115115
/**
@@ -121,15 +121,7 @@ public long asKilobytes() {
121121
* @return returns the ByteValue in GBs or 0 if the value is &lt; (1024 * 1024 * 1024) / 2
122122
*/
123123
public long asGigabytes() {
124-
return Math.round((double) bytes / GB);
125-
}
126-
127-
/**
128-
* Convenience methdod to determine if byte value is zero
129-
* @return true if the byte value is 0
130-
*/
131-
public boolean isZero() {
132-
return ZERO.equals(this);
124+
return Math.round(value.doubleValue() / GB);
133125
}
134126

135127
/**
@@ -138,10 +130,11 @@ public boolean isZero() {
138130
* @return a new ByteValue of this minus other ByteValue
139131
* @throws IllegalArgumentException if subtraction would overshoot Long.MIN_VALUE
140132
*/
141-
public ByteAmount minus(ByteAmount other) {
142-
checkArgument(Long.MIN_VALUE + other.asBytes() <= asBytes(), String.format(
143-
"Subtracting %s from %s would overshoot Long.MIN_LONG", other, this));
144-
return ByteAmount.fromBytes(asBytes() - other.asBytes());
133+
@Override
134+
public ByteAmount minus(ResourceMeasure<Long> other) {
135+
checkArgument(Long.MIN_VALUE + other.value <= value,
136+
String.format("Subtracting %s from %s would overshoot Long.MIN_LONG", other, this));
137+
return ByteAmount.fromBytes(value - other.value);
145138
}
146139

147140
/**
@@ -150,10 +143,11 @@ public ByteAmount minus(ByteAmount other) {
150143
* @return a new ByteValue of this plus other ByteValue
151144
* @throws IllegalArgumentException if addition would exceed Long.MAX_VALUE
152145
*/
153-
public ByteAmount plus(ByteAmount other) {
154-
checkArgument(Long.MAX_VALUE - asBytes() >= other.asBytes(), String.format(
155-
"Adding %s to %s would exceed Long.MAX_LONG", other, this));
156-
return ByteAmount.fromBytes(asBytes() + other.asBytes());
146+
@Override
147+
public ByteAmount plus(ResourceMeasure<Long> other) {
148+
checkArgument(Long.MAX_VALUE - value >= other.value,
149+
String.format("Adding %s to %s would exceed Long.MAX_LONG", other, this));
150+
return ByteAmount.fromBytes(value + other.value);
157151
}
158152

159153
/**
@@ -162,10 +156,11 @@ public ByteAmount plus(ByteAmount other) {
162156
* @return a new ByteValue of this ByteValue multiplied by factor
163157
* @throws IllegalArgumentException if multiplication would exceed Long.MAX_VALUE
164158
*/
159+
@Override
165160
public ByteAmount multiply(int factor) {
166-
checkArgument(asBytes() <= Long.MAX_VALUE / factor, String.format(
167-
"Multiplying %s by %d would exceed Long.MAX_LONG", this, factor));
168-
return ByteAmount.fromBytes(asBytes() * factor);
161+
checkArgument(value <= Long.MAX_VALUE / factor,
162+
String.format("Multiplying %s by %d would exceed Long.MAX_LONG", this, factor));
163+
return ByteAmount.fromBytes(value * factor);
169164
}
170165

171166
/**
@@ -175,9 +170,10 @@ public ByteAmount multiply(int factor) {
175170
* @param factor value to divide by
176171
* @return a new ByteValue of this ByteValue divided by factor
177172
*/
173+
@Override
178174
public ByteAmount divide(int factor) {
179175
checkArgument(factor != 0, String.format("Can not divide %s by 0", this));
180-
return ByteAmount.fromBytes(Math.round((double) this.asBytes() / (double) factor));
176+
return ByteAmount.fromBytes(Math.round(value.doubleValue() / factor));
181177
}
182178

183179
/**
@@ -187,30 +183,15 @@ public ByteAmount divide(int factor) {
187183
* @return a new ByteValue of this ByteValue increased by percentage
188184
* @throws IllegalArgumentException if increase would exceed Long.MAX_VALUE
189185
*/
186+
@Override
190187
public ByteAmount increaseBy(int percentage) {
191-
checkArgument(percentage >= 0, String.format(
192-
"Increasing by negative percent (%d) not supported", percentage));
188+
checkArgument(percentage >= 0,
189+
String.format("Increasing by negative percent (%d) not supported", percentage));
193190
double factor = 1.0 + ((double) percentage / 100);
194191
long max = Math.round(Long.MAX_VALUE / factor);
195-
checkArgument(asBytes() <= max, String.format(
196-
"Increasing %s by %d percent would exceed Long.MAX_LONG", this, percentage));
197-
return ByteAmount.fromBytes(Math.round((double) asBytes() * factor));
198-
}
199-
200-
public boolean greaterThan(ByteAmount other) {
201-
return this.asBytes() > other.asBytes();
202-
}
203-
204-
public boolean greaterOrEqual(ByteAmount other) {
205-
return this.asBytes() >= other.asBytes();
206-
}
207-
208-
public boolean lessThan(ByteAmount other) {
209-
return this.asBytes() < other.asBytes();
210-
}
211-
212-
public boolean lessOrEqual(ByteAmount other) {
213-
return this.asBytes() <= other.asBytes();
192+
checkArgument(value <= max,
193+
String.format("Increasing %s by %d percent would exceed Long.MAX_LONG", this, percentage));
194+
return ByteAmount.fromBytes(Math.round(value.doubleValue() * factor));
214195
}
215196

216197
public ByteAmount max(ByteAmount other) {
@@ -221,42 +202,19 @@ public ByteAmount max(ByteAmount other) {
221202
}
222203
}
223204

224-
@Override
225-
public int compareTo(ByteAmount other) {
226-
return Long.compare(asBytes(), other.asBytes());
227-
}
228-
229-
@Override
230-
public boolean equals(Object other) {
231-
if (this == other) {
232-
return true;
233-
}
234-
if (other == null || getClass() != other.getClass()) {
235-
return false;
236-
}
237-
238-
ByteAmount that = (ByteAmount) other;
239-
return bytes == that.bytes;
240-
}
241-
242-
@Override
243-
public int hashCode() {
244-
return (int) (bytes ^ (bytes >>> 32));
245-
}
246-
247205
@Override
248206
public String toString() {
249-
String value;
207+
String str;
250208
if (asGigabytes() > 0) {
251-
value = String.format("%.1f GB (%d bytes)", (double) asBytes() / GB, asBytes());
209+
str = String.format("%.1f GB (%d bytes)", value.doubleValue() / GB, value);
252210
} else if (asMegabytes() > 0) {
253-
value = String.format("%.1f MB (%d bytes)", (double) asBytes() / MB, asBytes());
211+
str = String.format("%.1f MB (%d bytes)", value.doubleValue() / MB, value);
254212
} else if (asKilobytes() > 0) {
255-
value = String.format("%.1f KB (%d bytes)", (double) asBytes() / KB, asBytes());
213+
str = String.format("%.1f KB (%d bytes)", value.doubleValue() / KB, value);
256214
} else {
257-
value = bytes + " bytes";
215+
str = value + " bytes";
258216
}
259-
return String.format("ByteAmount{%s}", value);
217+
return String.format("ByteAmount{%s}", str);
260218
}
261219

262220
private void checkArgument(boolean condition, String errorMessage) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.heron.common.basics;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
public final class CPUShare extends ResourceMeasure<Double> {
26+
27+
private CPUShare(Double value) {
28+
super(value);
29+
}
30+
31+
public static CPUShare fromDouble(double value) {
32+
return new CPUShare(value);
33+
}
34+
35+
@Override
36+
public CPUShare minus(ResourceMeasure<Double> other) {
37+
return new CPUShare(value - other.value);
38+
}
39+
40+
@Override
41+
public CPUShare plus(ResourceMeasure<Double> other) {
42+
return new CPUShare(value + other.value);
43+
}
44+
45+
@Override
46+
public CPUShare multiply(int factor) {
47+
return new CPUShare(value * factor);
48+
}
49+
50+
@Override
51+
public CPUShare divide(int factor) {
52+
return new CPUShare(value / factor);
53+
}
54+
55+
@Override
56+
public CPUShare increaseBy(int percentage) {
57+
return new CPUShare(value * (1.0 + percentage / 100.0));
58+
}
59+
60+
public static Map<String, CPUShare> convertDoubleMapToCpuShareMap(Map<String, Double> doubleMap) {
61+
Map<String, CPUShare> retval = new HashMap<>();
62+
for (Map.Entry<String, Double> entry : doubleMap.entrySet()) {
63+
retval.put(entry.getKey(), new CPUShare(entry.getValue()));
64+
}
65+
return retval;
66+
}
67+
68+
@Override
69+
public String toString() {
70+
return String.format("CPUShare{%.3f}", value);
71+
}
72+
}

0 commit comments

Comments
 (0)