Skip to content

Commit 28509fb

Browse files
committed
style: Apply scalafix and scalafmt changes
1 parent b70bfc7 commit 28509fb

File tree

9 files changed

+90
-81
lines changed

9 files changed

+90
-81
lines changed

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ import ai.chronon.flink.types.AvroCodecOutput
1313
import ai.chronon.flink.types.TimestampedTile
1414
import ai.chronon.flink.types.WriteResponse
1515
import ai.chronon.flink.validation.ValidationFlinkJob
16-
import ai.chronon.flink.window.{AlwaysFireOnElementTrigger, FlinkRowAggProcessFunction, FlinkRowAggregationFunction, KeySelectorBuilder}
16+
import ai.chronon.flink.window.{
17+
AlwaysFireOnElementTrigger,
18+
FlinkRowAggProcessFunction,
19+
FlinkRowAggregationFunction,
20+
KeySelectorBuilder
21+
}
1722
import ai.chronon.online.Api
1823
import ai.chronon.online.GroupByServingInfoParsed
1924
import ai.chronon.online.TopicInfo
@@ -309,7 +314,8 @@ object FlinkJob {
309314

310315
val schemaProvider = FlinkSerDeProvider.build(topicInfo)
311316

312-
val deserializationSchema = DeserializationSchemaBuilder.buildSourceProjectionDeserSchema(schemaProvider, servingInfo.groupBy)
317+
val deserializationSchema =
318+
DeserializationSchemaBuilder.buildSourceProjectionDeserSchema(schemaProvider, servingInfo.groupBy)
313319
require(
314320
deserializationSchema.isInstanceOf[SourceProjection],
315321
s"Expect created deserialization schema for groupBy: $groupByName with $topicInfo to mixin SourceProjection. " +

flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,22 @@ import ai.chronon.online.serde.SerDe
66
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema
77
import org.apache.spark.sql.{Encoder, Row}
88

9-
109
/** DeserializationSchema for use within Chronon. Includes details such as the source event encoder and if projection is
11-
* enabled, the projected schema. This is used to both build the Flink sources as well as in the downstream processing
12-
* operators (e.g. SparkExprEval).
13-
*
14-
* @tparam T - Type of the object returned after deserialization. Can be event type (no projection)
15-
* or Map[String, Any] (with projection)
16-
*/
10+
* enabled, the projected schema. This is used to both build the Flink sources as well as in the downstream processing
11+
* operators (e.g. SparkExprEval).
12+
*
13+
* @tparam T - Type of the object returned after deserialization. Can be event type (no projection)
14+
* or Map[String, Any] (with projection)
15+
*/
1716
abstract class ChrononDeserializationSchema[T] extends AbstractDeserializationSchema[T] {
1817
def sourceProjectionEnabled: Boolean
1918

2019
def sourceEventEncoder: Encoder[Row]
2120
}
2221

2322
/** Trait that is mixed in with DeserializationSchemas that support projection pushdown. This trait provides the projected
24-
* schema that the source event will be projected to.
25-
*/
23+
* schema that the source event will be projected to.
24+
*/
2625
trait SourceProjection {
2726
def projectedSchema: Array[(String, api.DataType)]
2827
}
@@ -32,7 +31,8 @@ object DeserializationSchemaBuilder {
3231
new SourceIdentityDeserializationSchema(provider, groupBy)
3332
}
3433

35-
def buildSourceProjectionDeserSchema(provider: SerDe, groupBy: GroupBy): ChrononDeserializationSchema[Map[String, Any]] = {
34+
def buildSourceProjectionDeserSchema(provider: SerDe,
35+
groupBy: GroupBy): ChrononDeserializationSchema[Map[String, Any]] = {
3636
new SourceProjectionDeserializationSchema(provider, groupBy)
3737
}
3838
}

flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ object CustomSchemaSerDe {
1111

1212
def buildCustomSchemaSerDe(topicInfo: TopicInfo): SerDe = {
1313
val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader
14-
val providerClass = topicInfo.params.getOrElse(ProviderClass, throw new IllegalArgumentException(s"$ProviderClass not set"))
14+
val providerClass =
15+
topicInfo.params.getOrElse(ProviderClass, throw new IllegalArgumentException(s"$ProviderClass not set"))
1516
val cls = cl.loadClass(providerClass)
1617
val constructor = cls.getConstructors.apply(0)
1718
val provider = constructor.newInstance(topicInfo)

flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
1111
import org.apache.spark.sql.{Encoder, Encoders, Row}
1212
import org.slf4j.{Logger, LoggerFactory}
1313

14-
15-
abstract class BaseDeserializationSchema[T](deserSchemaProvider: SerDe, groupBy: GroupBy) extends ChrononDeserializationSchema[T] {
14+
abstract class BaseDeserializationSchema[T](deserSchemaProvider: SerDe, groupBy: GroupBy)
15+
extends ChrononDeserializationSchema[T] {
1616

1717
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
1818

@@ -21,7 +21,8 @@ abstract class BaseDeserializationSchema[T](deserSchemaProvider: SerDe, groupBy:
2121

2222
override def sourceProjectionEnabled: Boolean = false
2323

24-
override def sourceEventEncoder: Encoder[Row] = Encoders.row(SparkConversions.fromChrononSchema(deserSchemaProvider.schema))
24+
override def sourceEventEncoder: Encoder[Row] =
25+
Encoders.row(SparkConversions.fromChrononSchema(deserSchemaProvider.schema))
2526

2627
override def open(context: DeserializationSchema.InitializationContext): Unit = {
2728
super.open(context)
@@ -43,17 +44,17 @@ abstract class BaseDeserializationSchema[T](deserSchemaProvider: SerDe, groupBy:
4344
}
4445
}
4546

46-
class SourceIdentityDeserializationSchema(deserSchemaProvider: SerDe, groupBy: GroupBy) extends BaseDeserializationSchema[Row](deserSchemaProvider, groupBy) {
47+
class SourceIdentityDeserializationSchema(deserSchemaProvider: SerDe, groupBy: GroupBy)
48+
extends BaseDeserializationSchema[Row](deserSchemaProvider, groupBy) {
4749

4850
override def deserialize(messageBytes: Array[Byte], out: Collector[Row]): Unit = {
4951
val maybeMutation = doDeserializeMutation(messageBytes)
5052

51-
maybeMutation.foreach {
52-
mutation =>
53-
Seq(mutation.before, mutation.after)
54-
.filter(_ != null)
55-
.map(r => SparkConversions.toSparkRow(r, deserSchemaProvider.schema, GenericRowHandler.func).asInstanceOf[Row])
56-
.foreach(row => out.collect(row))
53+
maybeMutation.foreach { mutation =>
54+
Seq(mutation.before, mutation.after)
55+
.filter(_ != null)
56+
.map(r => SparkConversions.toSparkRow(r, deserSchemaProvider.schema, GenericRowHandler.func).asInstanceOf[Row])
57+
.foreach(row => out.collect(row))
5758
}
5859
}
5960

@@ -64,8 +65,8 @@ class SourceIdentityDeserializationSchema(deserSchemaProvider: SerDe, groupBy: G
6465
}
6566

6667
class SourceProjectionDeserializationSchema(deserSchemaProvider: SerDe, groupBy: GroupBy)
67-
extends BaseDeserializationSchema[Map[String, Any]](deserSchemaProvider, groupBy)
68-
with SourceProjection {
68+
extends BaseDeserializationSchema[Map[String, Any]](deserSchemaProvider, groupBy)
69+
with SourceProjection {
6970

7071
@transient private var evaluator: SparkExpressionEval[Row] = _
7172
@transient private var rowSerializer: ExpressionEncoder.Serializer[Row] = _
@@ -100,9 +101,10 @@ class SourceProjectionDeserializationSchema(deserSchemaProvider: SerDe, groupBy:
100101
val maybeMutation = doDeserializeMutation(messageBytes)
101102

102103
val mutations = maybeMutation
103-
.map {
104-
mutation => Seq(mutation.before, mutation.after).filter(_ != null)
105-
}.getOrElse(Seq.empty)
104+
.map { mutation =>
105+
Seq(mutation.before, mutation.after).filter(_ != null)
106+
}
107+
.getOrElse(Seq.empty)
106108

107109
mutations.foreach(row => doSparkExprEval(row, out))
108110
}
@@ -139,4 +141,4 @@ object GenericRowHandler {
139141
result
140142
}
141143
}
142-
}
144+
}

flink/src/main/scala/ai/chronon/flink/deser/SchemaRegistrySerDe.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, Sch
88
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
99
import org.apache.avro.Schema
1010

11-
/**
12-
* Schema Provider / SerDe implementation that uses the Confluent Schema Registry to fetch schemas for topics.
11+
/** Schema Provider / SerDe implementation that uses the Confluent Schema Registry to fetch schemas for topics.
1312
* Can be configured as: topic = "kafka://topic-name/registry_host=host/[registry_port=port]/[registry_scheme=http]/[subject=subject]"
1413
* Port, scheme and subject are optional. If port is missing, we assume the host is pointing to a LB address / such that
1514
* forwards to the right host + port. Scheme defaults to http. Subject defaults to the topic name + "-value" (based on schema

flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ object ValidationFlinkJob {
141141

142142
val schemaProvider = FlinkSerDeProvider.build(topicInfo)
143143

144-
val deserializationSchema = DeserializationSchemaBuilder.buildSourceIdentityDeserSchema(schemaProvider, servingInfo.groupBy)
144+
val deserializationSchema =
145+
DeserializationSchemaBuilder.buildSourceIdentityDeserSchema(schemaProvider, servingInfo.groupBy)
145146

146147
val source =
147148
topicInfo.messageBus match {

online/src/main/scala/ai/chronon/online/metrics/Metrics.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,11 @@ object Metrics {
136136
case "otel" | "opentelemetry" =>
137137
if (metricsEnabled) {
138138
val maybeMetricReader = OtelMetricsReporter.buildOtelMetricReader()
139-
val openTelemetry = maybeMetricReader.map {
140-
metricReader =>
139+
val openTelemetry = maybeMetricReader
140+
.map { metricReader =>
141141
OtelMetricsReporter.buildOpenTelemetryClient(metricReader)
142-
}.getOrElse(OpenTelemetry.noop())
142+
}
143+
.getOrElse(OpenTelemetry.noop())
143144
new OtelMetricsReporter(openTelemetry)
144145
} else {
145146
new OtelMetricsReporter(OpenTelemetry.noop())

online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,20 +98,20 @@ object OtelMetricsReporter {
9898
val metricReader = System.getProperty(MetricsReader, MetricsReaderDefault)
9999
metricReader.toLowerCase match {
100100
case MetricsReaderDefault =>
101-
getExporterUrl.map {
102-
url =>
103-
val exporterUrl = url + "/v1/metrics"
104-
val metricExporter = OtlpHttpMetricExporter.builder.setEndpoint(exporterUrl).build
105-
// Configure periodic metric reader// Configure periodic metric reader
106-
PeriodicMetricReader.builder(metricExporter).setInterval(Duration.parse(MetricsExporterInterval)).build
101+
getExporterUrl.map { url =>
102+
val exporterUrl = url + "/v1/metrics"
103+
val metricExporter = OtlpHttpMetricExporter.builder.setEndpoint(exporterUrl).build
104+
// Configure periodic metric reader// Configure periodic metric reader
105+
PeriodicMetricReader.builder(metricExporter).setInterval(Duration.parse(MetricsExporterInterval)).build
107106
}
108107

109108
case MetricsReaderPrometheus =>
110109
val prometheusPort =
111110
System.getProperty(MetricsExporterPrometheusPortKey, MetricsExporterPrometheusPortDefault).toInt
112-
Some(PrometheusHttpServer.builder
113-
.setPort(prometheusPort)
114-
.build)
111+
Some(
112+
PrometheusHttpServer.builder
113+
.setPort(prometheusPort)
114+
.build)
115115
case _ =>
116116
throw new IllegalArgumentException(s"Unknown metrics reader (only http / prometheus supported): $metricReader")
117117
}

online/src/main/scala/ai/chronon/online/serde/SerDe.scala

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ package ai.chronon.online.serde
22

33
import ai.chronon.api.StructType
44

5-
/**
6-
* A concrete Serde implementation is responsible for providing details on the incoming event schema
7-
* and rails to deserialize individual events. Events are deserialized to Chronon's [Mutation] type.
8-
*
9-
* As these Serde implementations are used in distributed streaming engines such as Flink, care must be taken
10-
* to ensure that implementations are serializable (thus fields members of the class must be serializable).
11-
*/
5+
/** A concrete Serde implementation is responsible for providing details on the incoming event schema
6+
* and rails to deserialize individual events. Events are deserialized to Chronon's [Mutation] type.
7+
*
8+
* As these Serde implementations are used in distributed streaming engines such as Flink, care must be taken
9+
* to ensure that implementations are serializable (thus fields members of the class must be serializable).
10+
*/
1211
abstract class SerDe extends Serializable {
1312
def fromBytes(bytes: Array[Byte]): Mutation
1413
def schema: StructType
@@ -19,34 +18,34 @@ abstract class SerDe extends Serializable {
1918
}
2019

2120
/** ==== MUTATION vs. EVENT ====
22-
* Mutation is the general case of an Event
23-
* Imagine a user impression/view stream - impressions/views are immutable events
24-
* Imagine a stream of changes to a credit card transaction stream.
25-
* - transactions can be "corrected"/updated & deleted, besides being "inserted"
26-
* - This is one of the core difference between entity and event sources. Events are insert-only.
27-
* - (The other difference is Entites are stored in the warehouse typically as snapshots of the table as of midnight)
28-
* In case of an update - one must produce both before and after values
29-
* In case of a delete - only before is populated & after is left as null
30-
* In case of a insert - only after is populated & before is left as null
31-
*
32-
* ==== TIME ASSUMPTIONS ====
33-
* The schema needs to contain a `ts`(milliseconds as a java Long)
34-
* For the entities case, `mutation_ts` when absent will use `ts` as a replacement
35-
*
36-
* ==== TYPE CONVERSIONS ====
37-
* Java types corresponding to the schema types. [[SerDe]] should produce mutations that comply.
38-
* NOTE: everything is nullable (hence boxed)
39-
* IntType java.lang.Integer
40-
* LongType java.lang.Long
41-
* DoubleType java.lang.Double
42-
* FloatType java.lang.Float
43-
* ShortType java.lang.Short
44-
* BooleanType java.lang.Boolean
45-
* ByteType java.lang.Byte
46-
* StringType java.lang.String
47-
* BinaryType Array[Byte]
48-
* ListType java.util.List[Byte]
49-
* MapType java.util.Map[Byte]
50-
* StructType Array[Any]
51-
*/
21+
* Mutation is the general case of an Event
22+
* Imagine a user impression/view stream - impressions/views are immutable events
23+
* Imagine a stream of changes to a credit card transaction stream.
24+
* - transactions can be "corrected"/updated & deleted, besides being "inserted"
25+
* - This is one of the core difference between entity and event sources. Events are insert-only.
26+
* - (The other difference is Entites are stored in the warehouse typically as snapshots of the table as of midnight)
27+
* In case of an update - one must produce both before and after values
28+
* In case of a delete - only before is populated & after is left as null
29+
* In case of a insert - only after is populated & before is left as null
30+
*
31+
* ==== TIME ASSUMPTIONS ====
32+
* The schema needs to contain a `ts`(milliseconds as a java Long)
33+
* For the entities case, `mutation_ts` when absent will use `ts` as a replacement
34+
*
35+
* ==== TYPE CONVERSIONS ====
36+
* Java types corresponding to the schema types. [[SerDe]] should produce mutations that comply.
37+
* NOTE: everything is nullable (hence boxed)
38+
* IntType java.lang.Integer
39+
* LongType java.lang.Long
40+
* DoubleType java.lang.Double
41+
* FloatType java.lang.Float
42+
* ShortType java.lang.Short
43+
* BooleanType java.lang.Boolean
44+
* ByteType java.lang.Byte
45+
* StringType java.lang.String
46+
* BinaryType Array[Byte]
47+
* ListType java.util.List[Byte]
48+
* MapType java.util.Map[Byte]
49+
* StructType Array[Any]
50+
*/
5251
case class Mutation(schema: StructType = null, before: Array[Any] = null, after: Array[Any] = null)

0 commit comments

Comments
 (0)