Skip to content

Commit 193373b

Browse files
committed
Simplify the AtomicDistributedCopy, it basically should try and back off and wait until a timeout.
Add util scripts for docker, for connecting to the image and for installing hadoop inside the image. Standardise prefix naming in FileUtils. Add copy code for StringType rasters, rasters must be coppied to local on read. Add HadoopConf to IO operations in GDAL. Simplify HadoopUtils logic. Move testing back to parallel, remove spark.task.cpu as installing Hadoop on docker solves file system issues.
1 parent 74776cf commit 193373b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+334
-258
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@
187187
<forkMode>once</forkMode>
188188
<logForkedProcessCommand>true</logForkedProcessCommand>
189189
<reportsDirectory>${project.build.directory}/test-reports</reportsDirectory>
190+
<argLine>-Xms512m -Xmx2g -XX:+UseG1GC -XX:+UseStringDeduplication -XX:InitiatingHeapOccupancyPercent=70 -XX:MaxGCPauseMillis=200 -XX:+HeapDumpOnOutOfMemoryError -agentlib:jdwp=transport=dt_socket,address=5005,server=y,suspend=n</argLine>
190191
</configuration>
191192
<executions>
192193
<execution>

scripts/0.4.3/connect_to_docker.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
docker exec -it mosaic-dev bash

scripts/0.4.3/install_hadoop.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
wget https://downloads.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0.tar.gz
2+
tar -xzf hadoop-3.4.0.tar.gz
3+
mv hadoop-3.4.0 /usr/local/hadoop
4+
cp /usr/local/hadoop/lib/native/*.so /usr/lib/

scripts/0.4.3/run_test.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mvn test -DargLine=-agentlib:jdwp=transport=dt_socket,address=5005,server=y,suspend=n -DskipTests=False -DwildcardSuites=com.databricks.labs.mosaic.expressions.geometry.ST_UnaryUnionTest

scripts/0.4.3/start_docker.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
docker run -q --privileged --name mosaic-dev -p 5005:5005 -p 8888:8888 \
2+
-v $PWD:/root/mosaic -e JAVA_TOOL_OPTIONS="-agentlib:jdwp=transport=dt_socket,address=5005,server=y,suspend=n" \
3+
-itd mosaic-dev /bin/bash

src/main/scala/com/databricks/labs/mosaic/core/raster/api/GDAL.scala

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import com.databricks.labs.mosaic.core.raster.operator.transform.RasterTransform
66
import com.databricks.labs.mosaic.functions.MosaicExpressionConfig
77
import com.databricks.labs.mosaic.gdal.MosaicGDAL
88
import com.databricks.labs.mosaic.gdal.MosaicGDAL.configureGDAL
9+
import com.databricks.labs.mosaic.utils.{HadoopUtils, PathUtils}
910
import org.apache.spark.sql.SparkSession
1011
import org.apache.spark.sql.types.{BinaryType, DataType, StringType}
1112
import org.apache.spark.unsafe.types.UTF8String
13+
import org.apache.spark.util.SerializableConfiguration
1214
import org.gdal.gdal.gdal
1315
import org.gdal.gdalconst.gdalconstConstants._
1416

@@ -106,34 +108,38 @@ object GDAL {
106108
}
107109

108110
/**
109-
* Reads a raster from the given input data.
110-
* - If it is a byte array, it will read the raster from the byte array.
111-
* - If it is a string, it will read the raster from the path.
112-
* - Path may be a zip file.
113-
* - Path may be a subdataset.
114-
*
115-
* @param inputRaster
116-
* The raster, based on inputDT. Path based rasters with subdatasets
117-
* are supported.
118-
* @param createInfo
119-
* Mosaic creation info of the raster. Note: This is not the same as the
120-
* metadata of the raster. This is not the same as GDAL creation options.
121-
* @param inputDT
122-
* [[DataType]] for the raster, either [[StringType]] or [[BinaryType]].
123-
* @return
124-
* Returns a [[MosaicRasterGDAL]] object.
125-
*/
111+
* Reads a raster from the given input data.
112+
* - If it is a byte array, it will read the raster from the byte array.
113+
* - If it is a string, it will read the raster from the path.
114+
* - Path may be a zip file.
115+
* - Path may be a subdataset.
116+
*
117+
* @param inputRaster
118+
* The raster, based on inputDT. Path based rasters with subdatasets are
119+
* supported.
120+
* @param createInfo
121+
* Mosaic creation info of the raster. Note: This is not the same as the
122+
* metadata of the raster. This is not the same as GDAL creation options.
123+
* @param inputDT
124+
* [[DataType]] for the raster, either [[StringType]] or [[BinaryType]].
125+
* @return
126+
* Returns a [[MosaicRasterGDAL]] object.
127+
*/
126128
def readRaster(
127-
inputRaster: Any,
128-
createInfo: Map[String, String],
129-
inputDT: DataType
130-
): MosaicRasterGDAL = {
129+
inputRaster: Any,
130+
createInfo: Map[String, String],
131+
inputDT: DataType,
132+
hConf: SerializableConfiguration
133+
): MosaicRasterGDAL = {
131134
if (inputRaster == null) {
132135
MosaicRasterGDAL(null, createInfo)
133136
} else {
134137
inputDT match {
135138
case _: StringType =>
136-
MosaicRasterGDAL.readRaster(createInfo)
139+
val inPath = createInfo("path")
140+
val tmpPath = HadoopUtils.copyToLocalTmp(inPath, hConf)
141+
val path = PathUtils.getCleanPath(tmpPath)
142+
MosaicRasterGDAL.readRaster(createInfo + ("path" -> path))
137143
case _: BinaryType =>
138144
val bytes = inputRaster.asInstanceOf[Array[Byte]]
139145
try {
@@ -151,7 +157,7 @@ object GDAL {
151157
} catch {
152158
case _: Throwable => readParentZipBinary(bytes, createInfo)
153159
}
154-
case _ => throw new IllegalArgumentException(s"Unsupported data type: $inputDT")
160+
case _ => throw new IllegalArgumentException(s"Unsupported data type: $inputDT")
155161
}
156162
}
157163
}
@@ -178,14 +184,13 @@ object GDAL {
178184
* @return
179185
* Returns the paths of the written rasters.
180186
*/
181-
def writeRasters(generatedRasters: Seq[MosaicRasterGDAL], rasterDT: DataType): Seq[Any] = {
187+
def writeRasters(generatedRasters: Seq[MosaicRasterGDAL], rasterDT: DataType, hConf: SerializableConfiguration): Seq[Any] = {
182188
generatedRasters.map(raster =>
183189
if (raster != null) {
184190
rasterDT match {
185-
case StringType =>
186-
writeRasterString(raster)
191+
case StringType => writeRasterString(raster, hConf = hConf)
187192
case BinaryType =>
188-
val bytes = raster.writeToBytes()
193+
val bytes = raster.writeToBytes(hConf = hConf)
189194
RasterCleaner.dispose(raster)
190195
bytes
191196
}
@@ -195,14 +200,14 @@ object GDAL {
195200
)
196201
}
197202

198-
def writeRasterString(raster: MosaicRasterGDAL, path: Option[String] = None): UTF8String = {
203+
def writeRasterString(raster: MosaicRasterGDAL, path: Option[String] = None, hConf: SerializableConfiguration): UTF8String = {
199204
val uuid = UUID.randomUUID().toString
200205
val extension = GDAL.getExtension(raster.getDriversShortName)
201206
val writePath = path match {
202207
case Some(p) => p
203-
case None => s"${getCheckpointPath}/$uuid.$extension"
208+
case None => s"${getCheckpointPath}/$uuid.$extension"
204209
}
205-
val outPath = raster.writeToPath(writePath)
210+
val outPath = raster.writeToPath(writePath, dispose = true, hConf)
206211
RasterCleaner.dispose(raster)
207212
UTF8String.fromString(outPath)
208213
}

src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import com.databricks.labs.mosaic.core.raster.io.{RasterCleaner, RasterReader, R
1010
import com.databricks.labs.mosaic.core.raster.operator.clip.RasterClipByVector
1111
import com.databricks.labs.mosaic.core.types.model.GeometryTypeEnum.POLYGON
1212
import com.databricks.labs.mosaic.gdal.MosaicGDAL
13-
import com.databricks.labs.mosaic.utils.{FileUtils, PathUtils, SysUtils}
13+
import com.databricks.labs.mosaic.utils.{FileUtils, HadoopUtils, PathUtils, SysUtils}
14+
import org.apache.spark.util.SerializableConfiguration
1415
import org.gdal.gdal.{Dataset, gdal}
1516
import org.gdal.gdalconst.gdalconstConstants._
1617
import org.gdal.osr
@@ -598,22 +599,22 @@ case class MosaicRasterGDAL(
598599
* @return
599600
* A byte array containing the raster data.
600601
*/
601-
def writeToBytes(dispose: Boolean = true): Array[Byte] = {
602+
def writeToBytes(dispose: Boolean = true, hConf: SerializableConfiguration): Array[Byte] = {
602603
val readPath = {
603604
val tmpPath =
604605
if (isSubDataset) {
605606
val tmpPath = PathUtils.createTmpFilePath(getRasterFileExtension)
606-
writeToPath(tmpPath, dispose = false)
607-
tmpPath
607+
writeToPath(tmpPath, dispose = false, hConf)
608608
} else {
609609
this.path
610610
}
611611
if (Files.isDirectory(Paths.get(tmpPath))) {
612612
val parentDir = Paths.get(tmpPath).getParent.toString
613613
val fileName = Paths.get(tmpPath).getFileName.toString
614614
val prompt = SysUtils.runScript(Array("/bin/sh", "-c", s"cd $parentDir && zip -r0 $fileName.zip $fileName"))
615-
if (prompt._3.nonEmpty)
615+
if (prompt._3.nonEmpty) {
616616
throw new Exception(s"Error zipping file: ${prompt._3}. Please verify that zip is installed. Run 'apt install zip'.")
617+
}
617618
s"$tmpPath.zip"
618619
} else {
619620
tmpPath
@@ -644,41 +645,41 @@ case class MosaicRasterGDAL(
644645
* @return
645646
* The path where written.
646647
*/
647-
def writeToPath(newPath: String, dispose: Boolean = true): String = {
648-
if (isSubDataset) {
649-
val driver = raster.GetDriver()
650-
651-
// test to see if path is in a fuse location
652-
val outPath =
653-
if (PathUtils.isFuseLocation(newPath)) {
654-
PathUtils.createTmpFilePath(getRasterFileExtension)
655-
} else {
656-
newPath
657-
}
648+
def writeToPath(newPath: String, dispose: Boolean = true, hconf: SerializableConfiguration): String = {
649+
val driver = raster.GetDriver()
658650

659-
val ds = driver.CreateCopy(outPath, this.flushCache().getRaster, 1)
660-
if (ds == null) {
661-
val error = gdal.GetLastErrorMsg()
662-
throw new Exception(s"Error writing raster to path: $error")
663-
}
664-
ds.FlushCache()
665-
ds.delete()
666-
if (dispose) RasterCleaner.dispose(this)
667-
if (outPath != newPath) {
668-
Files.move(Paths.get(outPath), Paths.get(newPath), StandardCopyOption.REPLACE_EXISTING)
669-
}
670-
newPath
651+
// we need to figure out how to handle write to dir and write to path
652+
// for now it is this combined logic, but a separate logic would likely be simpler and leaner
653+
val tmpFileName = Paths.get(PathUtils.createTmpFilePath(getRasterFileExtension))
654+
val (tmpPath, outPath) = Paths.get(newPath) match {
655+
case p: Path if Files.isDirectory(p) =>
656+
(
657+
tmpFileName.toString,
658+
s"$p/${tmpFileName.getFileName}"
659+
)
660+
case p: Path => (
661+
s"${tmpFileName.getParent}/${p.getFileName}",
662+
p.toAbsolutePath.toString
663+
)
664+
}
665+
666+
val inRaster = this.getRaster
667+
inRaster.FlushCache()
668+
val ds = driver.CreateCopy(tmpPath, inRaster, 1)
669+
if (ds == null) {
670+
val error = gdal.GetLastErrorMsg()
671+
throw new Exception(s"Error writing raster to path: $error")
672+
}
673+
ds.FlushCache()
674+
ds.delete()
675+
if (dispose) RasterCleaner.dispose(this)
676+
677+
if (tmpPath != outPath) {
678+
HadoopUtils.copyToPath(tmpPath, outPath, hconf)
679+
Files.deleteIfExists(Paths.get(tmpPath))
680+
outPath
671681
} else {
672-
val thisPath = Paths.get(this.path)
673-
val fromDir = thisPath.getParent
674-
val toDir = Paths.get(newPath) match {
675-
case p: Path if Files.isDirectory(p) => p
676-
case p: Path => p.getParent()
677-
}
678-
val stemRegex = PathUtils.getStemRegex(this.path)
679-
PathUtils.wildcardCopy(fromDir.toString, toDir.toString, stemRegex)
680-
if (dispose) RasterCleaner.dispose(this)
681-
s"$toDir/${thisPath.getFileName}"
682+
tmpPath
682683
}
683684
}
684685

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
11
package com.databricks.labs.mosaic.core.raster.io
22

3+
import org.apache.spark.util.SerializableConfiguration
4+
35
/**
46
* RasterWriter is a trait that defines the interface for writing raster data
5-
* to a file system path or as bytes. It is used by the [[com.databricks.labs.mosaic.core.raster.api.GDAL]]
6-
* Raster API to write rasters from the internal [[com.databricks.labs.mosaic.core.raster.gdal.MosaicRasterGDAL]]
7-
* object.
7+
* to a file system path or as bytes. It is used by the
8+
* [[com.databricks.labs.mosaic.core.raster.api.GDAL]] Raster API to write
9+
* rasters from the internal
10+
* [[com.databricks.labs.mosaic.core.raster.gdal.MosaicRasterGDAL]] object.
811
*/
912
trait RasterWriter {
1013

1114
/**
12-
* Writes a raster to a byte array.
13-
*
14-
* @param destroy
15-
* A boolean indicating if the raster should be destroyed after writing.
16-
* @return
17-
* A byte array containing the raster data.
18-
*/
19-
def writeToBytes(destroy: Boolean = true): Array[Byte]
15+
* Writes a raster to a byte array.
16+
*
17+
* @param destroy
18+
* A boolean indicating if the raster should be destroyed after writing.
19+
* @return
20+
* A byte array containing the raster data.
21+
*/
22+
def writeToBytes(destroy: Boolean = true, hConf: SerializableConfiguration): Array[Byte]
2023

2124
/**
2225
* Writes a raster to a specified file system path.
@@ -28,6 +31,6 @@ trait RasterWriter {
2831
* @return
2932
* The path where written (may differ, e.g. due to subdatasets).
3033
*/
31-
def writeToPath(newPath: String, destroy: Boolean = true): String
34+
def writeToPath(newPath: String, destroy: Boolean = true, hConf: SerializableConfiguration): String
3235

3336
}

src/main/scala/com/databricks/labs/mosaic/core/types/model/MosaicRasterTile.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.databricks.labs.mosaic.expressions.raster.{buildMapString, extractMap
77
import org.apache.spark.sql.catalyst.InternalRow
88
import org.apache.spark.sql.types.{BinaryType, DataType, LongType, StringType}
99
import org.apache.spark.unsafe.types.UTF8String
10+
import org.apache.spark.util.SerializableConfiguration
1011

1112
import scala.util.{Failure, Success, Try}
1213

@@ -104,8 +105,8 @@ case class MosaicRasterTile(
104105
* @return
105106
* An instance of [[InternalRow]].
106107
*/
107-
def serialize(rasterDataType: DataType): InternalRow = {
108-
val encodedRaster = encodeRaster(rasterDataType)
108+
def serialize(rasterDataType: DataType, hConf: SerializableConfiguration): InternalRow = {
109+
val encodedRaster = encodeRaster(rasterDataType, hConf)
109110
val path = encodedRaster match {
110111
case uStr: UTF8String => uStr.toString
111112
case _ => raster.createInfo("path")
@@ -140,9 +141,10 @@ case class MosaicRasterTile(
140141
* According to the [[DataType]].
141142
*/
142143
private def encodeRaster(
143-
rasterDataType: DataType
144+
rasterDataType: DataType,
145+
hConf: SerializableConfiguration
144146
): Any = {
145-
GDAL.writeRasters(Seq(raster), rasterDataType).head
147+
GDAL.writeRasters(Seq(raster), rasterDataType, hConf).head
146148
}
147149

148150
def getSequenceNumber: Int =
@@ -169,12 +171,12 @@ object MosaicRasterTile {
169171
* @return
170172
* An instance of [[MosaicRasterTile]].
171173
*/
172-
def deserialize(row: InternalRow, idDataType: DataType, rasterDataType: DataType): MosaicRasterTile = {
174+
def deserialize(row: InternalRow, idDataType: DataType, rasterDataType: DataType, hConf: SerializableConfiguration): MosaicRasterTile = {
173175
val index = row.get(0, idDataType)
174176
// handle checkpoint related de-serialization
175177
val rawRaster = row.get(1, rasterDataType)
176178
val createInfo = extractMap(row.getMap(2))
177-
val raster = GDAL.readRaster(rawRaster, createInfo, rasterDataType)
179+
val raster = GDAL.readRaster(rawRaster, createInfo, rasterDataType, hConf)
178180

179181
// noinspection TypeCheckCanBeMatch
180182
if (Option(index).isDefined) {

src/main/scala/com/databricks/labs/mosaic/datasource/gdal/ReTileOnRead.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ object ReTileOnRead extends ReadStrategy {
7979
// After copying to local we can proceed as if the file was never on the volume
8080
// This was done to avoid redoing the logic for subdatasets via Hadoop file wrangling
8181
// for some reason both returned the same path ????
82-
val tmpPath2 = ReaderUtils.asTmpRaster(tmpPath1, options)
82+
val tmpPath2 = ReaderUtils.asTmpRaster(tmpPath1, options, hconf)
8383

8484
val tiles = localSubdivide(tmpPath2, inPath, sizeInMB)
8585

86-
val rows = tiles.map(tile => ReadAsPath.createRow(status, tile, uuid, requiredSchema, indexSystem))
86+
val rows = tiles.map(tile => ReadAsPath.createRow(status, tile, uuid, requiredSchema, indexSystem, hconf))
8787

8888
// Both tmp files are local and can be deleted
8989
// here using PathUtils is safe, and it accounts for subdatasets complications

0 commit comments

Comments
 (0)