Skip to content

Commit d0064b2

Browse files
authored
Switch to bazel scalaFmt for the CI workflow (#360)
## Summary Modified our github workflow to run scalaFmt checks using bazel instead of sbt and deleted the build.sbt file as it's no longer needed now. ## Checklist - [ ] Added Unit Tests - [x] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Chores** - Streamlined build and continuous integration setups, transitioning away from legacy tooling. - Modernized internal infrastructure for improved consistency and stability. - **Refactor / Style** - Enhanced code readability with comprehensive cosmetic and documentation updates. - Unified formatting practices across the codebase to support future maintainability. - Adjusted formatting of comments and code blocks for improved clarity without altering functionality. - **Tests** - Reformatted test suites for clarity and consistency while preserving all functional behaviors. - Improved formatting in various test cases and methods for better readability without altering functionality. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 353b590 commit d0064b2

File tree

117 files changed

+1407
-2086
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

117 files changed

+1407
-2086
lines changed

.scalafmt.conf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ align.openParenCallSite = true
33
align.openParenDefnSite = true
44
danglingParentheses.defnSite = false
55
danglingParentheses.callSite = false
6-
maxColumn = 120
6+
docstrings.wrap = false
7+
maxColumn = 120

WORKSPACE

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,20 @@ scala_maven_import_external(
9191
)
9292

9393
load("@io_bazel_rules_scala//scala:scala.bzl", "scala_repositories")
94-
9594
scala_repositories()
9695

9796
load("@io_bazel_rules_scala//scala:toolchains.bzl", "scala_register_toolchains")
98-
9997
scala_register_toolchains()
10098

10199
load("@io_bazel_rules_scala//testing:scalatest.bzl", "scalatest_repositories", "scalatest_toolchain")
102-
103100
scalatest_repositories()
104-
105101
scalatest_toolchain()
106102

103+
# For scalafmt
104+
load("@io_bazel_rules_scala//scala/scalafmt:scalafmt_repositories.bzl", "scalafmt_default_config", "scalafmt_repositories")
105+
scalafmt_default_config()
106+
scalafmt_repositories()
107+
107108
# For Protobuf support
108109
http_archive(
109110
name = "rules_proto",

aggregator/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
scala_library(
22
name = "lib",
33
srcs = glob(["src/main/**/*.scala"]),
4+
format = True,
45
visibility = ["//visibility:public"],
56
deps = [
67
"//api:lib",
@@ -49,6 +50,7 @@ test_deps = [
4950
scala_library(
5051
name = "test_lib",
5152
srcs = glob(["src/test/**/*.scala"]),
53+
format = True,
5254
visibility = ["//visibility:public"],
5355
deps = test_deps,
5456
)

aggregator/src/main/scala/ai/chronon/aggregator/base/SimpleAggregators.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -435,14 +435,13 @@ class FrequentItems[T: FrequentItemsFriendly](val mapSize: Int, val errorType: E
435435
val items = ir.sketch.getFrequentItems(errorType).map(sk => sk.getItem -> sk.getEstimate)
436436
val heap = mutable.PriorityQueue[(T, Long)]()(Ordering.by(_._2))
437437

438-
items.foreach({
439-
case (key, value) =>
440-
if (heap.size < mapSize) {
441-
heap.enqueue((key, value))
442-
} else if (heap.head._2 < value) {
443-
heap.dequeue()
444-
heap.enqueue((key, value))
445-
}
438+
items.foreach({ case (key, value) =>
439+
if (heap.size < mapSize) {
440+
heap.enqueue((key, value))
441+
} else if (heap.head._2 < value) {
442+
heap.dequeue()
443+
heap.enqueue((key, value))
444+
}
446445
})
447446

448447
val result = new util.HashMap[String, Long]()

aggregator/src/main/scala/ai/chronon/aggregator/row/RowAggregator.scala

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,32 +36,31 @@ class RowAggregator(val inputSchema: Seq[(String, DataType)], val aggregationPar
3636
val indices: Range = 0 until length
3737
// has to be array for fast random access
3838
val columnAggregators: Array[ColumnAggregator] = {
39-
aggregationParts.zipWithIndex.map {
40-
case (spec: AggregationPart, aggregatorIndex: Int) =>
41-
val ((_, inputType), inputIndex) = {
42-
inputSchema.zipWithIndex.find(_._1._1 == spec.inputColumn).get
43-
}
44-
45-
val bucketIndex: Option[Int] = Option(spec.bucket).map { bucketCol =>
46-
val bIndex = inputSchema.indexWhere(_._1 == bucketCol)
47-
assert(bIndex != -1, s"bucketing column: $bucketCol is not found in input: ${inputSchema.map(_._1)}")
48-
val bucketType = inputSchema(bIndex)._2
49-
assert(bucketType == StringType, s"bucketing column: $bucketCol needs to be a string, but found $bucketType")
50-
bIndex
51-
}
52-
try {
53-
ColumnAggregator.construct(
54-
inputType,
55-
spec,
56-
ColumnIndices(inputIndex, aggregatorIndex),
57-
bucketIndex
58-
)
59-
} catch {
60-
case e: Exception =>
61-
throw new RuntimeException(
62-
s"Failed to create ${spec.operation} aggregator for ${spec.inputColumn} column of type $inputType",
63-
e)
64-
}
39+
aggregationParts.zipWithIndex.map { case (spec: AggregationPart, aggregatorIndex: Int) =>
40+
val ((_, inputType), inputIndex) = {
41+
inputSchema.zipWithIndex.find(_._1._1 == spec.inputColumn).get
42+
}
43+
44+
val bucketIndex: Option[Int] = Option(spec.bucket).map { bucketCol =>
45+
val bIndex = inputSchema.indexWhere(_._1 == bucketCol)
46+
assert(bIndex != -1, s"bucketing column: $bucketCol is not found in input: ${inputSchema.map(_._1)}")
47+
val bucketType = inputSchema(bIndex)._2
48+
assert(bucketType == StringType, s"bucketing column: $bucketCol needs to be a string, but found $bucketType")
49+
bIndex
50+
}
51+
try {
52+
ColumnAggregator.construct(
53+
inputType,
54+
spec,
55+
ColumnIndices(inputIndex, aggregatorIndex),
56+
bucketIndex
57+
)
58+
} catch {
59+
case e: Exception =>
60+
throw new RuntimeException(
61+
s"Failed to create ${spec.operation} aggregator for ${spec.inputColumn} column of type $inputType",
62+
e)
63+
}
6564
}
6665
}.toArray
6766

aggregator/src/main/scala/ai/chronon/aggregator/row/StatsGenerator.scala

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ import org.apache.datasketches.memory.Memory
2525
import java.util
2626
import scala.collection.Seq
2727

28-
/**
29-
* Module managing FeatureStats Schema, Aggregations to be used by type and aggregator construction.
28+
/** Module managing FeatureStats Schema, Aggregations to be used by type and aggregator construction.
3029
*
3130
* Stats Aggregation has an offline/ batch component and an online component.
3231
* The metrics defined for stats depend on the schema of the join. The dataTypes and column names.
@@ -45,32 +44,29 @@ object StatsGenerator {
4544
val finalizedPercentilesSeries: Array[Double] = Array(0.05, 0.25, 0.5, 0.75, 0.95)
4645
val ignoreColumns: Seq[String] = Seq(api.Constants.TimeColumn, "ds", "date_key", "date", "datestamp")
4746

48-
/**
49-
* InputTransform acts as a signal of how to process the metric.
47+
/** InputTransform acts as a signal of how to process the metric.
5048
*
5149
* IsNull: Check if the input is null.
5250
*
5351
* Raw: Operate in the input column.
5452
*
5553
* One: lit(true) in spark. Used for row counts leveraged to obtain null rate values.
56-
* */
54+
*/
5755
object InputTransform extends Enumeration {
5856
type InputTransform = Value
5957
val IsNull, Raw, One = Value
6058
}
6159
import InputTransform._
6260

63-
/**
64-
* MetricTransform represents a single statistic built on top of an input column.
61+
/** MetricTransform represents a single statistic built on top of an input column.
6562
*/
6663
case class MetricTransform(name: String,
6764
expression: InputTransform,
6865
operation: api.Operation,
6966
suffix: String = "",
7067
argMap: util.Map[String, String] = null)
7168

72-
/**
73-
* Post processing for finalized values or IRs when generating a time series of stats.
69+
/** Post processing for finalized values or IRs when generating a time series of stats.
7470
* In the case of percentiles for examples we reduce to 5 values in order to generate candlesticks.
7571
*/
7672
def SeriesFinalizer(key: String, value: AnyRef): AnyRef = {
@@ -115,17 +111,16 @@ object StatsGenerator {
115111
/** For the schema of the data define metrics to be aggregated */
116112
def buildMetrics(fields: Seq[(String, api.DataType)]): Seq[MetricTransform] = {
117113
val metrics = fields
118-
.flatMap {
119-
case (name, dataType) =>
120-
if (ignoreColumns.contains(name)) {
121-
Seq.empty
122-
} else if (api.DataType.isNumeric(dataType) && dataType != api.ByteType) {
123-
// ByteTypes are not supported due to Avro Encodings and limited support on aggregators.
124-
// Needs to be casted on source if required.
125-
numericTransforms(name)
126-
} else {
127-
anyTransforms(name)
128-
}
114+
.flatMap { case (name, dataType) =>
115+
if (ignoreColumns.contains(name)) {
116+
Seq.empty
117+
} else if (api.DataType.isNumeric(dataType) && dataType != api.ByteType) {
118+
// ByteTypes are not supported due to Avro Encodings and limited support on aggregators.
119+
// Needs to be casted on source if required.
120+
numericTransforms(name)
121+
} else {
122+
anyTransforms(name)
123+
}
129124
}
130125
.sortBy(_.name)
131126
metrics :+ MetricTransform(totalColumn, InputTransform.One, api.Operation.COUNT)
@@ -147,8 +142,7 @@ object StatsGenerator {
147142
linfSimple.asInstanceOf[AnyRef]
148143
}
149144

150-
/**
151-
* PSI is a measure of the difference between two probability distributions.
145+
/** PSI is a measure of the difference between two probability distributions.
152146
* However, it's not defined for cases where a bin can have zero elements in either distribution
153147
* (meant for continuous measures). In order to support PSI for discrete measures we add a small eps value to
154148
* perturb the distribution in bins.

aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,9 @@ object DailyResolution extends Resolution {
6464

6565
object ResolutionUtils {
6666

67-
/**
68-
* Find the smallest tail window resolution in a GroupBy. Returns 1D if the GroupBy does not define any windows (all-time aggregates).
67+
/** Find the smallest tail window resolution in a GroupBy. Returns 1D if the GroupBy does not define any windows (all-time aggregates).
6968
* The window resolutions are: 5 min for a GroupBy a window < 12 hrs, 1 hr for < 12 days, 1 day for > 12 days.
70-
* */
69+
*/
7170
def getSmallestWindowResolutionInMillis(groupBy: GroupBy): Long =
7271
Option(
7372
groupBy.aggregations.toScala.toArray

aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,14 @@ import scala.collection.mutable
2626
case class BatchIr(collapsed: Array[Any], tailHops: HopsAggregator.IrMapType)
2727
case class FinalBatchIr(collapsed: Array[Any], tailHops: HopsAggregator.OutputArrayType)
2828

29-
/**
30-
* Mutations processing starts with an end of the day snapshot FinalBatchIR.
29+
/** Mutations processing starts with an end of the day snapshot FinalBatchIR.
3130
* On top of this FinalBatchIR mutations are processed.
3231
*
33-
*
3432
* update/merge/finalize are related to snapshot data. As such they follow the snapshot Schema
3533
* and aggregators.
3634
* However mutations come into play later in the group by and a finalized version of the snapshot
3735
* data is created to be processed with the mutations rows.
3836
* Since the dataframe inputs are aligned between mutations and snapshot (input) no additional schema is needed.
39-
*
4037
*/
4138
class SawtoothMutationAggregator(aggregations: Seq[Aggregation],
4239
inputSchema: Seq[(String, DataType)],
@@ -106,8 +103,7 @@ class SawtoothMutationAggregator(aggregations: Seq[Aggregation],
106103
def finalizeSnapshot(batchIr: BatchIr): FinalBatchIr =
107104
FinalBatchIr(batchIr.collapsed, Option(batchIr.tailHops).map(hopsAggregator.toTimeSortedArray).orNull)
108105

109-
/**
110-
* Go through the aggregators and update or delete the intermediate with the information of the row if relevant.
106+
/** Go through the aggregators and update or delete the intermediate with the information of the row if relevant.
111107
* Useful for both online and mutations
112108
*/
113109
def updateIr(ir: Array[Any], row: Row, queryTs: Long, hasReversal: Boolean = false): Unit = {
@@ -142,8 +138,7 @@ class SawtoothMutationAggregator(aggregations: Seq[Aggregation],
142138
}
143139
}
144140

145-
/**
146-
* Update the intermediate results with tail hops data from a FinalBatchIr.
141+
/** Update the intermediate results with tail hops data from a FinalBatchIr.
147142
*/
148143
def mergeTailHops(ir: Array[Any], queryTs: Long, batchEndTs: Long, batchIr: FinalBatchIr): Array[Any] = {
149144
var i: Int = 0
@@ -171,8 +166,7 @@ class SawtoothMutationAggregator(aggregations: Seq[Aggregation],
171166
ir
172167
}
173168

174-
/**
175-
* Given aggregations FinalBatchIRs at the end of the Snapshot (batchEndTs) and mutation and query times,
169+
/** Given aggregations FinalBatchIRs at the end of the Snapshot (batchEndTs) and mutation and query times,
176170
* determine the values at the query times for the aggregations.
177171
* This is pretty much a mix of online with extra work for multiple queries ts support.
178172
*/

0 commit comments

Comments
 (0)