Skip to content

Commit 8220302

Browse files
authored
Merge pull request #2288 from jamesmcclain/feature/timeseries
Time Series
2 parents 1f937c7 + 3025283 commit 8220302

File tree

8 files changed

+568
-10
lines changed

8 files changed

+568
-10
lines changed

spark/src/main/scala/geotrellis/spark/package.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ package object spark
6161
with summary.Implicits
6262
with summary.polygonal.Implicits
6363
with tiling.Implicits
64+
with timeseries.Implicits
6465
with viewshed.Implicits {
6566
type TileLayerRDD[K] = RDD[(K, Tile)] with Metadata[TileLayerMetadata[K]]
6667
object TileLayerRDD {
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package geotrellis.spark.timeseries
2+
3+
import geotrellis.spark._
4+
5+
import org.apache.spark.rdd.RDD
6+
7+
8+
object Implicits extends Implicits
9+
10+
trait Implicits {
11+
implicit class withRDDTimeSeriesMethods(val self: TileLayerRDD[SpaceTimeKey])
12+
extends RDDTimeSeriesMethods
13+
}
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
package geotrellis.spark.timeseries
2+
3+
import geotrellis.raster._
4+
import geotrellis.raster.histogram._
5+
import geotrellis.raster.summary.polygonal._
6+
import geotrellis.spark._
7+
import geotrellis.spark.mask.Mask.Options
8+
import geotrellis.util.annotations.experimental
9+
import geotrellis.util.MethodExtensions
10+
import geotrellis.vector._
11+
12+
import java.time.ZonedDateTime
13+
14+
15+
/**
16+
* @define experimental <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>@experimental
17+
*/
18+
@experimental object RDDTimeSeriesFunctions {
19+
20+
/**
21+
* $experimental
22+
*/
23+
@experimental def histogramProjection(tile: Tile): StreamingHistogram =
24+
StreamingHistogram.fromTile(tile)
25+
26+
/**
27+
* $experimental
28+
*/
29+
@experimental def histogramReduction(left: StreamingHistogram, right: StreamingHistogram): StreamingHistogram =
30+
left + right
31+
32+
/**
33+
* $experimental
34+
*/
35+
@experimental def meanReduction(left: MeanResult, right: MeanResult): MeanResult =
36+
left + right
37+
38+
/**
39+
* $experimental
40+
*/
41+
@experimental def maxReduction(left: Double, right: Double): Double =
42+
scala.math.max(left, right)
43+
44+
/**
45+
* $experimental
46+
*/
47+
@experimental def minReduction(left: Double, right: Double): Double =
48+
scala.math.min(left, right)
49+
50+
/**
51+
* $experimental
52+
*/
53+
@experimental def sumReduction(left: Double, right: Double): Double =
54+
left + right
55+
}
56+
57+
/**
58+
* @define experimental <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>@experimental
59+
*/
60+
@experimental abstract class RDDTimeSeriesMethods
61+
extends MethodExtensions[TileLayerRDD[SpaceTimeKey]] {
62+
63+
/**
64+
* $experimental
65+
*/
66+
@experimental def sumSeries(
67+
polygon: MultiPolygon,
68+
options: Options
69+
): Map[ZonedDateTime, Double] =
70+
sumSeries(List(polygon), options)
71+
72+
/**
73+
* $experimental
74+
*/
75+
@experimental def sumSeries(
76+
polygon: MultiPolygon
77+
): Map[ZonedDateTime, Double] =
78+
sumSeries(List(polygon), Options.DEFAULT)
79+
80+
/**
81+
* $experimental
82+
*/
83+
@experimental def sumSeries(
84+
polygons: Traversable[MultiPolygon]
85+
): Map[ZonedDateTime, Double] =
86+
sumSeries(polygons, Options.DEFAULT)
87+
88+
/**
89+
* $experimental
90+
*/
91+
@experimental def sumSeries(
92+
polygons: Traversable[MultiPolygon],
93+
options: Options
94+
): Map[ZonedDateTime, Double] = {
95+
TimeSeries(
96+
self,
97+
SumDoubleSummary.handleFullTile,
98+
RDDTimeSeriesFunctions.sumReduction,
99+
polygons,
100+
options
101+
)
102+
.collect()
103+
.toMap
104+
}
105+
106+
/**
107+
* $experimental
108+
*/
109+
@experimental def minSeries(
110+
polygon: MultiPolygon,
111+
options: Options
112+
): Map[ZonedDateTime, Double] =
113+
minSeries(List(polygon), options)
114+
115+
/**
116+
* $experimental
117+
*/
118+
@experimental def minSeries(
119+
polygon: MultiPolygon
120+
): Map[ZonedDateTime, Double] =
121+
minSeries(List(polygon), Options.DEFAULT)
122+
123+
/**
124+
* $experimental
125+
*/
126+
@experimental def minSeries(
127+
polygons: Traversable[MultiPolygon]
128+
): Map[ZonedDateTime, Double] =
129+
minSeries(polygons, Options.DEFAULT)
130+
131+
/**
132+
* $experimental
133+
*/
134+
@experimental def minSeries(
135+
polygons: Traversable[MultiPolygon],
136+
options: Options
137+
): Map[ZonedDateTime, Double] = {
138+
TimeSeries(
139+
self,
140+
MinDoubleSummary.handleFullTile,
141+
RDDTimeSeriesFunctions.minReduction,
142+
polygons,
143+
options
144+
)
145+
.collect()
146+
.toMap
147+
}
148+
149+
/**
150+
* $experimental
151+
*/
152+
@experimental def maxSeries(
153+
polygon: MultiPolygon,
154+
options: Options
155+
): Map[ZonedDateTime, Double] =
156+
maxSeries(List(polygon), options)
157+
158+
/**
159+
* $experimental
160+
*/
161+
@experimental def maxSeries(
162+
polygon: MultiPolygon
163+
): Map[ZonedDateTime, Double] =
164+
maxSeries(List(polygon), Options.DEFAULT)
165+
166+
/**
167+
* $experimental
168+
*/
169+
@experimental def maxSeries(
170+
polygons: Traversable[MultiPolygon]
171+
): Map[ZonedDateTime, Double] =
172+
maxSeries(polygons, Options.DEFAULT)
173+
174+
/**
175+
* $experimental
176+
*/
177+
@experimental def maxSeries(
178+
polygons: Traversable[MultiPolygon],
179+
options: Options
180+
): Map[ZonedDateTime, Double] = {
181+
TimeSeries(
182+
self,
183+
MaxDoubleSummary.handleFullTile,
184+
RDDTimeSeriesFunctions.maxReduction,
185+
polygons,
186+
options
187+
)
188+
.collect()
189+
.toMap
190+
}
191+
192+
/**
193+
* $experimental
194+
*/
195+
@experimental def meanSeries(
196+
polygon: MultiPolygon,
197+
options: Options = Options.DEFAULT
198+
): Map[ZonedDateTime, Double] =
199+
meanSeries(List(polygon), options)
200+
201+
/**
202+
* $experimental
203+
*/
204+
@experimental def meanSeries(
205+
polygon: MultiPolygon
206+
): Map[ZonedDateTime, Double] =
207+
meanSeries(polygon, Options.DEFAULT)
208+
209+
/**
210+
* $experimental
211+
*/
212+
@experimental def meanSeries(
213+
polygons: Traversable[MultiPolygon]
214+
): Map[ZonedDateTime, Double] =
215+
meanSeries(polygons, Options.DEFAULT)
216+
217+
/**
218+
* $experimental
219+
*/
220+
@experimental def meanSeries(
221+
polygons: Traversable[MultiPolygon],
222+
options: Options
223+
): Map[ZonedDateTime, Double] = {
224+
TimeSeries(
225+
self,
226+
MeanResult.fromFullTileDouble,
227+
RDDTimeSeriesFunctions.meanReduction,
228+
polygons,
229+
options
230+
)
231+
.mapValues({ mr => mr.mean })
232+
.collect()
233+
.toMap
234+
}
235+
236+
/**
237+
* $experimental
238+
*/
239+
@experimental def histogramSeries(
240+
polygon: MultiPolygon,
241+
options: Options = Options.DEFAULT
242+
): Map[ZonedDateTime, Histogram[Double]] =
243+
histogramSeries(List(polygon), options)
244+
245+
/**
246+
* $experimental
247+
*/
248+
@experimental def histogramSeries(
249+
polygon: MultiPolygon
250+
): Map[ZonedDateTime, Histogram[Double]] =
251+
histogramSeries(polygon, Options.DEFAULT)
252+
253+
/**
254+
* $experimental
255+
*/
256+
@experimental def histogramSeries(
257+
polygons: Traversable[MultiPolygon]
258+
): Map[ZonedDateTime, Histogram[Double]] =
259+
histogramSeries(polygons, Options.DEFAULT)
260+
261+
/**
262+
* $experimental
263+
*/
264+
@experimental def histogramSeries(
265+
polygons: Traversable[MultiPolygon],
266+
options: Options
267+
): Map[ZonedDateTime, Histogram[Double]] = {
268+
TimeSeries(
269+
self,
270+
RDDTimeSeriesFunctions.histogramProjection,
271+
RDDTimeSeriesFunctions.histogramReduction,
272+
polygons,
273+
options
274+
)
275+
.collect()
276+
.toMap
277+
}
278+
279+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package geotrellis.spark.timeseries
2+
3+
import geotrellis.raster._
4+
import geotrellis.spark._
5+
import geotrellis.spark.mask._
6+
import geotrellis.util.annotations.experimental
7+
import geotrellis.vector._
8+
9+
import org.apache.log4j.Logger
10+
import org.apache.spark.rdd._
11+
12+
import scala.reflect.ClassTag
13+
14+
import java.time.ZonedDateTime
15+
16+
17+
/**
18+
* Given a TileLayerRDD[SpaceTimeKey], some masking geometry, and a
19+
* reduction operator, produce a time series.
20+
*
21+
* @author James McClain
22+
* @define experimental <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>@experimental
23+
*/
24+
@experimental object TimeSeries {
25+
26+
@experimental def apply[R: ClassTag](
27+
layer: TileLayerRDD[SpaceTimeKey],
28+
projection: Tile => R,
29+
reduction: (R, R) => R,
30+
geoms: Traversable[MultiPolygon],
31+
options: Mask.Options = Mask.Options.DEFAULT
32+
): RDD[(ZonedDateTime, R)] = {
33+
34+
layer
35+
.mask(geoms, options)
36+
.map({ case (key: SpaceTimeKey, tile: Tile) => (key.time, projection(tile)) })
37+
.reduceByKey(reduction)
38+
}
39+
40+
}

spark/src/test/scala/geotrellis/spark/costdistance/RDDCostDistanceMethodsSpec.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,23 @@ class RDDCostDistanceMethodsSpec extends FunSpec
4545

4646
val points = List(Point(2.5+5.0, 2.5))
4747

48-
it("The costdistance Method Should Work (1/2)") {
49-
val expected = IterativeCostDistance(rdd, points).collect.toList
50-
val actual = rdd.costdistance(points).collect.toList
48+
describe("Cost-Distance Extension Methods") {
5149

52-
actual should be (expected)
53-
}
50+
it("The costdistance Method Should Work (1/2)") {
51+
val expected = IterativeCostDistance(rdd, points).collect.toList
52+
val actual = rdd.costdistance(points).collect.toList
53+
54+
actual should be (expected)
55+
}
56+
57+
it("The costdistance Method Should Work (2/2)") {
58+
val resolution = IterativeCostDistance.computeResolution(rdd)
59+
val expected = IterativeCostDistance(rdd, points, resolution).collect.toList
60+
val actual = rdd.costdistance(points, resolution).collect.toList
5461

55-
it("The costdistance Method Should Work (2/2)") {
56-
val resolution = IterativeCostDistance.computeResolution(rdd)
57-
val expected = IterativeCostDistance(rdd, points, resolution).collect.toList
58-
val actual = rdd.costdistance(points, resolution).collect.toList
62+
actual should be (expected)
63+
}
5964

60-
actual should be (expected)
6165
}
6266

6367
}

0 commit comments

Comments
 (0)