@@ -63,7 +63,7 @@ import scala.util.Try
63
63
* retrieve metadata / configure it appropriately at creation time
64
64
*/
65
65
66
- case class TableUtils (sparkSession : SparkSession ) {
66
+ class TableUtils (@ transient val sparkSession : SparkSession ) extends Serializable {
67
67
@ transient lazy val logger : Logger = LoggerFactory .getLogger(getClass)
68
68
69
69
private val ARCHIVE_TIMESTAMP_FORMAT = " yyyyMMddHHmmss"
@@ -141,16 +141,24 @@ case class TableUtils(sparkSession: SparkSession) {
141
141
rdd
142
142
}
143
143
144
- def tableExists (tableName : String ): Boolean = sparkSession.catalog.tableExists(tableName)
144
+ // Needs provider
145
+ def tableExists (tableName : String ): Boolean = {
146
+ sparkSession.catalog.tableExists(tableName)
147
+ }
145
148
146
- def loadTable (tableName : String ): DataFrame = sparkSession.table(tableName)
149
+ // Needs provider
150
+ def loadTable (tableName : String ): DataFrame = {
151
+ sparkSession.table(tableName)
152
+ }
147
153
154
+ // Needs provider
148
155
def isPartitioned (tableName : String ): Boolean = {
149
156
// TODO: use proper way to detect if a table is partitioned or not
150
157
val schema = getSchemaFromTable(tableName)
151
158
schema.fieldNames.contains(partitionColumn)
152
159
}
153
160
161
+ // Needs provider
154
162
def createDatabase (database : String ): Boolean = {
155
163
try {
156
164
val command = s " CREATE DATABASE IF NOT EXISTS $database"
@@ -168,6 +176,7 @@ case class TableUtils(sparkSession: SparkSession) {
168
176
169
177
def tableReadFormat (tableName : String ): Format = tableFormatProvider.readFormat(tableName)
170
178
179
+ // Needs provider
171
180
// return all specified partition columns in a table in format of Map[partitionName, PartitionValue]
172
181
def allPartitions (tableName : String , partitionColumnsFilter : Seq [String ] = Seq .empty): Seq [Map [String , String ]] = {
173
182
if (! tableExists(tableName)) return Seq .empty[Map [String , String ]]
@@ -182,6 +191,7 @@ case class TableUtils(sparkSession: SparkSession) {
182
191
}
183
192
}
184
193
194
+ // Needs provider
185
195
def partitions (tableName : String , subPartitionsFilter : Map [String , String ] = Map .empty): Seq [String ] = {
186
196
if (! tableExists(tableName)) return Seq .empty[String ]
187
197
val format = tableReadFormat(tableName)
@@ -222,11 +232,13 @@ case class TableUtils(sparkSession: SparkSession) {
222
232
}
223
233
}
224
234
235
+ // Needs provider
225
236
def getSchemaFromTable (tableName : String ): StructType = {
226
237
sparkSession.sql(s " SELECT * FROM $tableName LIMIT 1 " ).schema
227
238
}
228
239
229
240
// method to cheour clients if a user has access to a table
241
+ // Needs provider
230
242
def cheour clientsTablePermission(tableName : String ,
231
243
fallbaour clientsPartition : String =
232
244
partitionSpec.before(partitionSpec.at(System .currentTimeMillis()))): Boolean = {
@@ -252,12 +264,15 @@ case class TableUtils(sparkSession: SparkSession) {
252
264
}
253
265
}
254
266
267
+ // Needs provider
255
268
def lastAvailablePartition (tableName : String , subPartitionFilters : Map [String , String ] = Map .empty): Option [String ] =
256
269
partitions(tableName, subPartitionFilters).reduceOption((x, y) => Ordering [String ].max(x, y))
257
270
271
+ // Needs provider
258
272
def firstAvailablePartition (tableName : String , subPartitionFilters : Map [String , String ] = Map .empty): Option [String ] =
259
273
partitions(tableName, subPartitionFilters).reduceOption((x, y) => Ordering [String ].min(x, y))
260
274
275
+ // Needs provider
261
276
def insertPartitions (df : DataFrame ,
262
277
tableName : String ,
263
278
tableProperties : Map [String , String ] = null ,
@@ -351,6 +366,7 @@ case class TableUtils(sparkSession: SparkSession) {
351
366
}
352
367
}
353
368
369
+ // Needs provider
354
370
def insertUnPartitioned (df : DataFrame ,
355
371
tableName : String ,
356
372
tableProperties : Map [String , String ] = null ,
@@ -412,6 +428,7 @@ case class TableUtils(sparkSession: SparkSession) {
412
428
}.get
413
429
}
414
430
431
+ // Needs provider
415
432
private def repartitionAndWriteInternal (df : DataFrame ,
416
433
tableName : String ,
417
434
saveMode : SaveMode ,
@@ -488,6 +505,7 @@ case class TableUtils(sparkSession: SparkSession) {
488
505
}
489
506
}
490
507
508
+ // Needs provider
491
509
private def createTableSql (tableName : String ,
492
510
schema : StructType ,
493
511
partitionColumns : Seq [String ],
@@ -526,6 +544,7 @@ case class TableUtils(sparkSession: SparkSession) {
526
544
Seq (createFragment, partitionFragment, fileFormatString, propertiesFragment).mkString(" \n " )
527
545
}
528
546
547
+ // Needs provider
529
548
private def alterTablePropertiesSql (tableName : String , properties : Map [String , String ]): String = {
530
549
// Only SQL api exists for setting TBLPROPERTIES
531
550
val propertiesString = properties
@@ -612,6 +631,7 @@ case class TableUtils(sparkSession: SparkSession) {
612
631
Some (missingChunks)
613
632
}
614
633
634
+ // Needs provider
615
635
def getTableProperties (tableName : String ): Option [Map [String , String ]] = {
616
636
try {
617
637
val tableId = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
@@ -621,6 +641,7 @@ case class TableUtils(sparkSession: SparkSession) {
621
641
}
622
642
}
623
643
644
+ // Needs provider
624
645
def dropTableIfExists (tableName : String ): Unit = {
625
646
val command = s " DROP TABLE IF EXISTS $tableName"
626
647
logger.info(s " Dropping table with command: $command" )
@@ -648,68 +669,6 @@ case class TableUtils(sparkSession: SparkSession) {
648
669
}
649
670
}
650
671
651
- @ deprecated
652
- def dropPartitionsAfterHole (inputTable : String ,
653
- outputTable : String ,
654
- partitionRange : PartitionRange ,
655
- subPartitionFilters : Map [String , String ] = Map .empty): Option [String ] = {
656
-
657
- def partitionsInRange (table : String , partitionFilter : Map [String , String ] = Map .empty): Set [String ] = {
658
- val allParts = partitions(table, partitionFilter)
659
- val startPrunedParts = Option (partitionRange.start).map(start => allParts.filter(_ >= start)).getOrElse(allParts)
660
- Option (partitionRange.end).map(end => startPrunedParts.filter(_ <= end)).getOrElse(startPrunedParts).toSet
661
- }
662
-
663
- val inputPartitions = partitionsInRange(inputTable)
664
- val outputPartitions = partitionsInRange(outputTable, subPartitionFilters)
665
- val earliestHoleOpt = (inputPartitions -- outputPartitions).reduceLeftOption(Ordering [String ].min)
666
- earliestHoleOpt.foreach { hole =>
667
- val toDrop = outputPartitions.filter(_ > hole)
668
- logger.info(s """
669
- |Earliest hole at $hole in output table $outputTable, relative to $inputTable
670
- |Input Parts : ${inputPartitions.toArray.sorted.mkString(" Array(" , " , " , " )" )}
671
- |Output Parts : ${outputPartitions.toArray.sorted.mkString(" Array(" , " , " , " )" )}
672
- |Dropping Parts: ${toDrop.toArray.sorted.mkString(" Array(" , " , " , " )" )}
673
- |Sub Partitions: ${subPartitionFilters.map(kv => s " ${kv._1}= ${kv._2}" ).mkString(" Array(" , " , " , " )" )}
674
- """ .stripMargin)
675
- dropPartitions(outputTable, toDrop.toArray.sorted, partitionColumn, subPartitionFilters)
676
- }
677
- earliestHoleOpt
678
- }
679
-
680
- def dropPartitions (tableName : String ,
681
- partitions : Seq [String ],
682
- partitionColumn : String = partitionColumn,
683
- subPartitionFilters : Map [String , String ] = Map .empty): Unit = {
684
- if (partitions.nonEmpty && tableExists(tableName)) {
685
- val partitionSpecs = partitions
686
- .map { partition =>
687
- val mainSpec = s " $partitionColumn=' $partition' "
688
- val specs = mainSpec +: subPartitionFilters.map {
689
- case (key, value) => s " $key=' $value' "
690
- }.toSeq
691
- specs.mkString(" PARTITION (" , " ," , " )" )
692
- }
693
- .mkString(" ," )
694
- val dropSql = s " ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs"
695
- sql(dropSql)
696
- } else {
697
- logger.info(s " $tableName doesn't exist, please double cheour clients before drop partitions " )
698
- }
699
- }
700
-
701
- def dropPartitionRange (tableName : String ,
702
- startDate : String ,
703
- endDate : String ,
704
- subPartitionFilters : Map [String , String ] = Map .empty): Unit = {
705
- if (tableExists(tableName)) {
706
- val toDrop = Stream .iterate(startDate)(partitionSpec.after).takeWhile(_ <= endDate)
707
- dropPartitions(tableName, toDrop, partitionColumn, subPartitionFilters)
708
- } else {
709
- logger.info(s " $tableName doesn't exist, please double cheour clients before drop partitions " )
710
- }
711
- }
712
-
713
672
/*
714
673
* This method detects new columns that appear in newSchema but not in current table,
715
674
* and append those new columns at the end of the existing table. This allows continuous evolution
@@ -837,6 +796,12 @@ case class TableUtils(sparkSession: SparkSession) {
837
796
}
838
797
}
839
798
799
+ object TableUtils {
800
+ def apply (sparkSession : SparkSession ): TableUtils = {
801
+ new TableUtils (sparkSession)
802
+ }
803
+ }
804
+
840
805
sealed case class IncompatibleSchemaException (inconsistencies : Seq [(String , DataType , DataType )]) extends Exception {
841
806
override def getMessage : String = {
842
807
val inconsistenciesStr =
0 commit comments