@@ -43,6 +43,8 @@ import org.apache.spark.rdd._
43
43
import org .apache .spark .SparkContext ._
44
44
45
45
import java .util .ArrayList
46
+ import java .time .ZonedDateTime
47
+
46
48
import scala .reflect ._
47
49
import scala .collection .JavaConverters ._
48
50
@@ -90,6 +92,78 @@ class TemporalTiledRasterLayer(
90
92
TemporalTiledRasterLayer (zoomLevel, multiBand)
91
93
}
92
94
95
+ private def wkbsToMultiPolygons (wkbs : java.util.ArrayList [Array [Byte ]]) = {
96
+ wkbs
97
+ .asScala.map({ wkb => WKB .read(wkb) })
98
+ .flatMap({
99
+ case p : Polygon => Some (MultiPolygon (p))
100
+ case m : MultiPolygon => Some (m)
101
+ case _ => None
102
+ })
103
+ }
104
+
105
+ private def wkbsToMultiPolygon (wkbs : java.util.ArrayList [Array [Byte ]]) =
106
+ MultiPolygon (
107
+ wkbsToMultiPolygons(wkbs)
108
+ .map({ mp => mp.polygons })
109
+ .foldLeft(List .empty[Polygon ])(_ ++ _)
110
+ )
111
+
112
+ def sumSeries (
113
+ wkbs : java.util.ArrayList [Array [Byte ]]
114
+ ): Array [(ZonedDateTime , Double )] = {
115
+ val polygon : MultiPolygon = wkbsToMultiPolygon(wkbs)
116
+ val metadata = rdd.metadata
117
+ ContextRDD (rdd.mapValues({ m => m.bands(0 ) }), metadata)
118
+ .sumSeries(polygon)
119
+ .toArray
120
+ .sortWith({ (t1, t2) => (t1._1.compareTo(t2._1) <= 0 ) })
121
+ }
122
+
123
+ def minSeries (
124
+ wkbs : java.util.ArrayList [Array [Byte ]]
125
+ ): Array [(ZonedDateTime , Double )] = {
126
+ val polygon : MultiPolygon = wkbsToMultiPolygon(wkbs)
127
+ val metadata = rdd.metadata
128
+ ContextRDD (rdd.mapValues({ m => m.bands(0 ) }), metadata)
129
+ .minSeries(polygon)
130
+ .toArray
131
+ .sortWith({ (t1, t2) => (t1._1.compareTo(t2._1) <= 0 ) })
132
+ }
133
+
134
+ def maxSeries (
135
+ wkbs : java.util.ArrayList [Array [Byte ]]
136
+ ): Array [(ZonedDateTime , Double )] = {
137
+ val polygon : MultiPolygon = wkbsToMultiPolygon(wkbs)
138
+ val metadata = rdd.metadata
139
+ ContextRDD (rdd.mapValues({ m => m.bands(0 ) }), metadata)
140
+ .maxSeries(polygon)
141
+ .toArray
142
+ .sortWith({ (t1, t2) => (t1._1.compareTo(t2._1) <= 0 ) })
143
+ }
144
+
145
+ def meanSeries (
146
+ wkbs : java.util.ArrayList [Array [Byte ]]
147
+ ): Array [(ZonedDateTime , Double )] = {
148
+ val polygon : MultiPolygon = wkbsToMultiPolygon(wkbs)
149
+ val metadata = rdd.metadata
150
+ ContextRDD (rdd.mapValues({ m => m.bands(0 ) }), metadata)
151
+ .meanSeries(polygon)
152
+ .toArray
153
+ .sortWith({ (t1, t2) => (t1._1.compareTo(t2._1) <= 0 ) })
154
+ }
155
+
156
+ def histogramSeries (
157
+ wkbs : java.util.ArrayList [Array [Byte ]]
158
+ ): Array [(ZonedDateTime , Histogram [Double ])] = {
159
+ val polygon : MultiPolygon = wkbsToMultiPolygon(wkbs)
160
+ val metadata = rdd.metadata
161
+ ContextRDD (rdd.mapValues({ m => m.bands(0 ) }), metadata)
162
+ .histogramSeries(polygon)
163
+ .toArray
164
+ .sortWith({ (t1, t2) => (t1._1.compareTo(t2._1) <= 0 ) })
165
+ }
166
+
93
167
def reproject (targetCRS : String , resampleMethod : ResampleMethod ): TemporalTiledRasterLayer = {
94
168
val crs = TileLayer .getCRS(targetCRS).get
95
169
val (zoom, reprojected) = rdd.reproject(crs, rdd.metadata.layout, resampleMethod)
0 commit comments