Skip to content
This repository was archived by the owner on Mar 3, 2023. It is now read-only.

Commit fdf3430

Browse files
authored
Add sum/max/min reducers in Scala (#3133)
1 parent 1a0d198 commit fdf3430

File tree

4 files changed

+109
-6
lines changed

4 files changed

+109
-6
lines changed

examples/src/scala/org/apache/heron/examples/streamlet/scala/ScalaWindowedWordCountTopology.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.util.Random
2424

2525
import org.apache.heron.examples.streamlet.scala.common.ScalaTopologyExampleUtils
2626
import org.apache.heron.streamlet.{Config, KeyValue, KeyedWindow, WindowConfig}
27-
import org.apache.heron.streamlet.scala.{Builder, Runner}
27+
import org.apache.heron.streamlet.scala.{Builder, Runner, StreamletReducers}
2828

2929
/**
3030
* This topology is an implementation of the classic word count example
@@ -62,7 +62,7 @@ object ScalaWindowedWordCountTopology {
6262
.reduceByKeyAndWindow[String, Int]((word: String) => word,
6363
(x: String) => 1,
6464
WindowConfig.TumblingCountWindow(50),
65-
(x: Int, y: Int) => x + y)
65+
StreamletReducers.sum(_: Int, _: Int))
6666
.setName("reduce-operation")
6767
.consume((kv: KeyValue[KeyedWindow[String], Int]) =>
6868
log.info(s"word: ${kv.getKey.getKey} - count: ${kv.getValue}"))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.heron.streamlet.scala
20+
21+
/**
22+
* This class contains a few standard reduces that can be used with
23+
* Streamlet reduce functions such as reduceByKeyAndWindow.
24+
* Example, assuming s is a Stringlet<T> object and each tuple has these functions:
25+
* - Integer getKey() and
26+
* - Double getValue()
27+
* To get streams of sum, min and max of all values upto the current one:
28+
* s.reduceByKey(T::getKey, T::getValue, StreamletReducers::sum);
29+
* s.reduceByKey(T::getKey, T::getValue, StreamletReducers::min);
30+
* s.reduceByKey(T::getKey, T::getValue, StreamletReducers::max);
31+
*/
32+
object StreamletReducers {
33+
34+
def sum(a: Int, b: Int): Int = a + b
35+
def sum(a: Long, b: Long): Long = a + b
36+
def sum(a: Float, b: Float): Float = a + b
37+
def sum(a: Double, b: Double): Double = a + b
38+
39+
def max(a: Int, b: Int): Int = math.max(a, b)
40+
def max(a: Long, b: Long): Long = math.max(a, b)
41+
def max(a: Float, b: Float): Float = math.max(a, b)
42+
def max(a: Double, b: Double): Double = math.max(a, b)
43+
44+
def min(a: Int, b: Int): Int = math.min(a, b)
45+
def min(a: Long, b: Long): Long = math.min(a, b)
46+
def min(a: Float, b: Float): Float = math.min(a, b)
47+
def min(a: Double, b: Double): Double = math.min(a, b)
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.heron.streamlet.scala
20+
21+
import org.junit.Assert.assertEquals
22+
23+
import org.apache.heron.streamlet.scala.common.BaseFunSuite
24+
25+
class StreamletReducersTest extends BaseFunSuite {
26+
27+
test("Sum should work correctly") {
28+
assertEquals(StreamletReducers.sum(1, 2), 3)
29+
assertEquals(StreamletReducers.sum(1L, 2L), 3L)
30+
assertEquals(StreamletReducers.sum(1.0f, 2.0f), 3.0f, 0.01f)
31+
assertEquals(StreamletReducers.sum(1.0, 2.0), 3.0, 0.01)
32+
}
33+
34+
test("Max should work correctly") {
35+
assertEquals(StreamletReducers.max(1, 2), 2)
36+
assertEquals(StreamletReducers.max(2, 1), 2)
37+
assertEquals(StreamletReducers.max(1L, 2L), 2L)
38+
assertEquals(StreamletReducers.max(2L, 1L), 2L)
39+
assertEquals(StreamletReducers.max(1.0f, 2.0f), 2.0f, 0.01f)
40+
assertEquals(StreamletReducers.max(2.0f, 1.0f), 2.0f, 0.01f)
41+
assertEquals(StreamletReducers.max(1.0, 2.0), 2.0, 0.01)
42+
assertEquals(StreamletReducers.max(2.0, 1.0), 2.0, 0.01)
43+
}
44+
45+
test("Min should work correctly") {
46+
assertEquals(StreamletReducers.min(1, 2), 1)
47+
assertEquals(StreamletReducers.min(2, 1), 1)
48+
assertEquals(StreamletReducers.min(1L, 2L), 1L)
49+
assertEquals(StreamletReducers.min(2L, 1L), 1L)
50+
assertEquals(StreamletReducers.min(1.0f, 2.0f), 1.0f, 0.01f)
51+
assertEquals(StreamletReducers.min(2.0f, 1.0f), 1.0f, 0.01f)
52+
assertEquals(StreamletReducers.min(1.0, 2.0), 1.0, 0.01)
53+
assertEquals(StreamletReducers.min(2.0, 1.0), 1.0, 0.01)
54+
}
55+
}

heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ import org.apache.heron.streamlet.impl.streamlets.{
5454
UnionStreamlet
5555
}
5656

57-
import org.apache.heron.streamlet.scala.{Builder, Streamlet}
57+
import org.apache.heron.streamlet.scala.{Builder, Streamlet, StreamletReducers}
5858
import org.apache.heron.streamlet.scala.common.{
5959
BaseFunSuite,
6060
TestIncrementSerializableTransformer,
@@ -623,7 +623,7 @@ class StreamletImplTest extends BaseFunSuite {
623623
supplierStreamlet
624624
.reduceByKey[Int, Int]((x: Int) => x * 100,
625625
(x: Int) => x,
626-
(x: Int, y: Int) => x + y) // sum operation
626+
StreamletReducers.sum(_: Int, _: Int))
627627
.setName("Reduce_Streamlet_1")
628628
.setNumPartitions(5)
629629

@@ -651,7 +651,7 @@ class StreamletImplTest extends BaseFunSuite {
651651
supplierStreamlet
652652
.reduceByKey[Int, Int]((key: Int) => key * 100,
653653
0,
654-
(x: Int, y: Int) => x + y) // sum operation
654+
StreamletReducers.sum(_: Int, _: Int))
655655
.setName("Reduce_Streamlet_1")
656656
.setNumPartitions(5)
657657

@@ -680,7 +680,7 @@ class StreamletImplTest extends BaseFunSuite {
680680
.reduceByKeyAndWindow[Int, Int]((key: Int) => key * 100,
681681
(value: Int) => 1,
682682
WindowConfig.TumblingCountWindow(10),
683-
(x: Int, y: Int) => x + y)
683+
StreamletReducers.sum(_: Int, _: Int))
684684
.setName("Reduce_Streamlet_1")
685685
.setNumPartitions(5)
686686

0 commit comments

Comments
 (0)