@@ -2,13 +2,12 @@ package com.databricks.labs.mosaic.utils
2
2
3
3
import com .databricks .labs .mosaic .functions .MosaicContext
4
4
import com .google .common .io .{ByteStreams , Closeables }
5
- import org .apache .hadoop .fs .{FileStatus , FileSystem , FileUtil , Path }
5
+ import org .apache .hadoop .fs .{FileStatus , FileSystem , Path }
6
6
import org .apache .orc .util .Murmur3
7
+ import org .apache .spark .sql .execution .streaming .FileSystemBasedCheckpointFileManager
7
8
import org .apache .spark .util .SerializableConfiguration
8
9
9
10
import java .net .URI
10
- import java .nio .file .{Files , Paths }
11
- import java .util .UUID
12
11
13
12
// noinspection ScalaWeakerAccess
14
13
object HadoopUtils {
@@ -39,13 +38,15 @@ object HadoopUtils {
39
38
}
40
39
}
41
40
42
- def getStemRegex (str : String ): String = {
43
- val cleanPath = HadoopUtils .cleanPath(str)
44
- val fileName = new Path (cleanPath).getName
45
- val stemName = fileName.substring(0 , fileName.lastIndexOf('.' ))
46
- val stemEscaped = stemName.replace(" ." , " \\ ." )
47
- val stemRegex = s " $stemEscaped\\ ..* " .r
48
- stemRegex.toString
41
+ def getRelativePath (inPath : String , basePath : String ): String = {
42
+ inPath
43
+ .stripPrefix(basePath)
44
+ .stripPrefix(" file:/" )
45
+ .stripPrefix(" dbfs:/" )
46
+ .stripPrefix(" /dbfs/" )
47
+ .stripPrefix(" dbfs/" )
48
+ .stripPrefix(" Volumes/" )
49
+ .stripPrefix(" /Volumes/" )
49
50
}
50
51
51
52
def listHadoopFiles (inPath : String ): Seq [String ] = {
@@ -60,99 +61,42 @@ object HadoopUtils {
60
61
.map(_.getPath.toString)
61
62
}
62
63
63
- def copyToLocalTmp (inPath : String ): String = {
64
- copyToLocalTmp(inPath, hadoopConf)
65
- }
66
-
67
64
def copyToLocalTmp (inPath : String , hconf : SerializableConfiguration ): String = {
68
65
val copyFromPath = new Path (cleanPath(inPath))
69
- val fs = copyFromPath.getFileSystem(hconf.value)
70
- val uuid = UUID .randomUUID().toString.replace(" -" , " _" )
71
- val outDir = MosaicContext .tmpDir(null ) + s " / $uuid"
72
- Files .createDirectories(Paths .get(outDir))
73
- if (fs.getFileStatus(copyFromPath).isDirectory) {
74
- // If the path is a directory, we need to copy all files in the directory
75
- val name = copyFromPath.getName
76
- val stemRegex = " .*"
77
- wildcardCopy(copyFromPath.toString, outDir + " /" + name, stemRegex, hconf)
78
- } else {
79
- val inPathDir = copyFromPath.getParent.toString
80
- val stemRegex = getStemRegex(inPath)
81
- wildcardCopy(inPathDir, outDir, stemRegex, hconf)
82
- }
83
- val fullFileName = copyFromPath.getName.split(" /" ).last
84
- // Wrapper to force metadata to be copied
85
- try {
86
- fs.getFileStatus(new Path (s " ${MosaicContext .tmpDir(null )}/ $uuid/ $fullFileName" )).getPath.toString
87
- } catch {
88
- case _ : Exception =>
89
- // If the file is not found, we need to copy it again
90
- val newPath = new Path (s " ${MosaicContext .tmpDir(null )}/ $uuid/ $fullFileName" )
91
- fs.copyToLocalFile(copyFromPath, newPath)
92
- // Return the path of the copied file
93
- }
94
- fs.getFileStatus(new Path (s " ${MosaicContext .tmpDir(null )}/ $uuid/ $fullFileName" )).getPath.toString
95
- }
96
-
97
- def wildcardCopy (inDirPath : String , outDirPath : String , pattern : String ): Unit = {
98
- wildcardCopy(inDirPath, outDirPath, pattern, hadoopConf)
66
+ val outputDir = cleanPath(MosaicContext .tmpDir(null ))
67
+ copyToLocalDir(copyFromPath.toString, outputDir, hconf)
99
68
}
100
69
101
- def wildcardCopy (inDirPath : String , outDirPath : String , pattern : String , hconf : SerializableConfiguration ): Unit = {
102
- val copyFromPath = cleanPath(inDirPath)
103
- val copyToPath = cleanPath(outDirPath)
104
-
105
- val tc = listHadoopFiles(copyFromPath, hconf)
106
- .filter(f => s " $copyFromPath/ $pattern" .r.findFirstIn(f).isDefined)
107
-
108
- for (path <- tc) {
109
- val src = new Path (path)
110
- val dest = new Path (copyToPath, src.getName)
111
- if (src != dest) {
112
- val fs = src.getFileSystem(hconf.value)
113
- if (fs.getFileStatus(src).isDirectory) {
114
- // writeNioDir(src, dest, hconf)
115
- Files .createDirectories(Paths .get(dest.toString))
116
- FileUtil .copy(fs, src, fs, dest, false , hconf.value)
117
- } else {
118
- // writeNioFile(src, dest, hconf)
119
- Files .createDirectories(Paths .get(dest.getParent.toString))
120
- Files .createFile(Paths .get(dest.toString))
121
- fs.copyToLocalFile(src, dest)
122
- }
123
- }
124
- }
125
- }
126
-
127
- def writeNioFile (src : Path , dest : Path , hconf : SerializableConfiguration ): Unit = {
128
- val fs = src.getFileSystem(hconf.value)
129
- val srcStatus = fs.getFileStatus(src)
130
- val bytes = readContent(fs, srcStatus)
131
- FileUtils .writeBytes(dest.toString, bytes)
132
- }
70
+ def copyToLocalDir (inPath : String , outDir : String , hConf : SerializableConfiguration , basePath : String = " " ): String = {
71
+ val copyFromPath = new Path (cleanPath(inPath))
72
+ val fs = copyFromPath.getFileSystem(hConf.value)
73
+ val checkpointManager = new FileSystemBasedCheckpointFileManager (new Path (outDir), hConf.value)
74
+ checkpointManager.createCheckpointDirectory()
133
75
134
- def writeNioDir (src : Path , dest : Path , hconf : SerializableConfiguration ): Unit = {
135
- val fs = src.getFileSystem(hconf.value)
136
- val destNio = Paths .get(dest.toString)
137
-
138
- def recurse (currentSrc : Path , currentDest : java.nio.file.Path ): Unit = {
139
- fs.listStatus(currentSrc).foreach { entry =>
140
- val name = entry.getPath.getName
141
- val nextSrc = entry.getPath
142
- val nextDest = currentDest.resolve(name)
143
-
144
- if (entry.isDirectory) {
145
- Files .createDirectories(nextDest)
146
- recurse(nextSrc, nextDest)
147
- } else {
148
- val destH = new Path (nextDest.toString)
149
- writeNioFile(nextSrc, destH, hconf)
150
- }
151
- }
76
+ if (fs.getFileStatus(copyFromPath).isDirectory) {
77
+ val files = listHadoopFiles(copyFromPath.toString, hConf)
78
+ files.foreach(filePath => copyToLocalDir(filePath, outDir, hConf, basePath = copyFromPath.toString))
79
+ outDir
80
+ } else {
81
+ val relativePath = new Path (getRelativePath(copyFromPath.toString, basePath))
82
+ val fileName = relativePath.getName
83
+ val baseName = if (fileName.contains(" ." )) fileName.substring(0 , fileName.lastIndexOf('.' )) else fileName
84
+ val localDestPath = new Path (s " $outDir/ $relativePath" )
85
+ // this is horribly inefficient but ok for now
86
+ // we need a set of files to check for that is fixed per format
87
+ val parent = relativePath.getParent
88
+ val pattern = if (parent.toString.endsWith(" /" )) s " $parent$baseName" else s " $parent/ $baseName"
89
+ val sideFiles = listHadoopFiles(copyFromPath.getParent.toString, hConf)
90
+ .filter(_.contains(pattern))
91
+ sideFiles.foreach( // copy together with sidecar files
92
+ filePath => {
93
+ val input = new Path (filePath)
94
+ val output = new Path (localDestPath.getParent.toString + " /" + input.getName)
95
+ AtomicDistributedCopy .copyIfNeeded(checkpointManager, fs, input, output)
96
+ }
97
+ )
98
+ localDestPath.toString
152
99
}
153
-
154
- Files .createDirectories(destNio)
155
- recurse(src, destNio)
156
100
}
157
101
158
102
/**
0 commit comments