File tree 1 file changed +8
-5
lines changed
spark/src/main/scala/ai/chronon/spark
1 file changed +8
-5
lines changed Original file line number Diff line number Diff line change @@ -171,7 +171,7 @@ object JoinUtils {
171
171
val leftEnd = Option (leftSource.query.endPartition).getOrElse(endPartition)
172
172
173
173
logger.info(s " Attempting to fill join partition range: $leftStart to $leftEnd" )
174
- PartitionRange (leftStart, leftEnd)(tableUtils .partitionSpec)
174
+ PartitionRange (leftStart, leftEnd)(leftSource .partitionSpec)
175
175
}
176
176
177
177
/** *
@@ -325,7 +325,7 @@ object JoinUtils {
325
325
val leftSideKeyName = joinPart.rightToLeft(keyName)
326
326
logger.info(
327
327
s " KeyName: $keyName, leftSide KeyName: $leftSideKeyName , Join right to left: ${joinPart.rightToLeft
328
- .mkString(" , " )}" )
328
+ .mkString(" , " )}" )
329
329
val values = collectedLeft.map(row => row.getAs[Any ](leftSideKeyName))
330
330
// Check for null keys, warn if found, err if all null
331
331
val (notNullValues, nullValues) = values.partition(_ != null )
@@ -492,9 +492,12 @@ object JoinUtils {
492
492
}
493
493
494
494
def parseSkewKeys (jmap : java.util.Map [String , java.util.List [String ]]): Option [Map [String , Seq [String ]]] = {
495
- Option (jmap).map(_.toScala.map { case (key, list) =>
496
- key -> list.asScala
497
- }.toMap)
495
+ Option (jmap).map(
496
+ _.toScala
497
+ .map { case (key, list) =>
498
+ key -> list.asScala
499
+ }
500
+ .toMap)
498
501
}
499
502
500
503
def shiftDays (leftDataModel : DataModel , joinPart : JoinPart , leftRange : PartitionRange ): PartitionRange = {
You can’t perform that action at this time.
0 commit comments