Skip to content

Commit 2d9ac20

Browse files
use json4s pinned to the spark version
-e Co-authored-by: Thomas Chow <[email protected]> -e Co-authored-by: Thomas Chow <[email protected]> -e Co-authored-by: Thomas Chow <[email protected]>
1 parent a850609 commit 2d9ac20

File tree

7 files changed

+90
-62
lines changed

7 files changed

+90
-62
lines changed

build.sbt

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,10 @@ lazy val spark = project
190190
libraryDependencies += "com.google.guava" % "guava" % "33.3.1-jre",
191191
libraryDependencies ++= log4j2,
192192
libraryDependencies ++= delta.map(_ % "provided"),
193-
libraryDependencies += "io.circe" %% "circe-yaml" % "1.15.0",
194-
libraryDependencies ++= circe
193+
libraryDependencies += "org.json4s" % "json4s-jackson_2.12" % "3.7.0-M11",
194+
libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M11", // Use json4s-native or json4s-jackson
195+
libraryDependencies += "org.json4s" %% "json4s-core" % "3.7.0-M11",
196+
libraryDependencies += "org.yaml" % "snakeyaml" % "2.3"
195197
)
196198

197199
lazy val flink = project
@@ -213,16 +215,17 @@ lazy val cloud_gcp = project
213215
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.42.0",
214216
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.41.0",
215217
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.131.0",
216-
libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.51.0",
217-
libraryDependencies += "com.google.cloud.bigdataoss" % "gcs-connector" % "3.0.3", // it's what's on the cluster
218+
libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.52.0",
218219
libraryDependencies += "com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop3-2.2.26",
219220
libraryDependencies += "com.google.cloud.bigdataoss" % "gcsio" % "3.0.3", // need it for https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java
220221
libraryDependencies += "com.google.cloud.bigdataoss" % "util-hadoop" % "3.0.0", // need it for https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/util-hadoop/src/main/java/com/google/cloud/hadoop/util/HadoopConfigurationProperty.java
221-
libraryDependencies += "io.circe" %% "circe-yaml" % "1.15.0",
222-
libraryDependencies += "com.google.cloud.spark" %% s"spark-bigquery-with-dependencies" % "0.41.0",
223-
libraryDependencies += "com.google.cloud.spark.bigtable" %% "spark-bigtable" % "0.2.1",
222+
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.41.0",
223+
libraryDependencies += "org.json4s" % "json4s-jackson_2.12" % "3.7.0-M11",
224+
libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M11", // Use json4s-native or json4s-jackson
225+
libraryDependencies += "org.json4s" %% "json4s-core" % "3.7.0-M11",
226+
libraryDependencies += "org.yaml" % "snakeyaml" % "2.3",
227+
// libraryDependencies += "com.google.cloud.spark.bigtable" %% "spark-bigtable" % "0.2.1",
224228
libraryDependencies += "com.google.cloud.bigtable" % "bigtable-hbase-2.x" % "2.14.2",
225-
libraryDependencies ++= circe,
226229
libraryDependencies ++= avro,
227230
libraryDependencies ++= spark_all_provided,
228231
dependencyOverrides ++= jackson,

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ import ai.chronon.spark.JobAuth
33
import ai.chronon.spark.JobSubmitter
44
import com.google.api.gax.rpc.ApiException
55
import com.google.cloud.dataproc.v1._
6-
import io.circe.generic.auto._
7-
import io.circe.yaml.parser
6+
import org.json4s._
7+
import org.json4s.jackson.JsonMethods._
8+
import org.yaml.snakeyaml.Yaml
89

910
import scala.io.Source
1011

@@ -88,16 +89,21 @@ object DataprocSubmitter {
8889
}
8990

9091
private[cloud_gcp] def loadConfig: SubmitterConf = {
91-
val is = getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml")
92-
val confStr = Source.fromInputStream(is).mkString
93-
val res: Either[io.circe.Error, SubmitterConf] = parser
94-
.parse(confStr)
95-
.flatMap(_.as[SubmitterConf])
96-
res match {
97-
98-
case Right(v) => v
99-
case Left(e) => throw e
100-
}
92+
val isO = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml"))
93+
val yamlLoader = new Yaml()
94+
implicit val formats: Formats = DefaultFormats
95+
isO
96+
.map(Source.fromInputStream)
97+
.map((is) =>
98+
try { is.mkString }
99+
finally { is.close })
100+
.map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
101+
.map((jMap) => Extraction.decompose(jMap.asScala.toMap))
102+
.map((jVal) => render(jVal))
103+
.map(compact)
104+
.map(parse(_).extract[SubmitterConf])
105+
.getOrElse(throw new IllegalArgumentException(s"Yaml conf not found or invalid yaml"))
106+
101107
}
102108
}
103109

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import ai.chronon.spark.TableUtils
55
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
66
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
77
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration
8-
import com.google.cloud.hadoop.util.HadoopConfigurationProperty
98
import org.apache.spark.sql.SparkSession
109
import org.junit.Assert.assertEquals
1110
import org.junit.Assert.assertTrue
1211
import org.scalatest.funsuite.AnyFunSuite
1312
import org.scalatestplus.mockito.MockitoSugar
13+
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem
14+
import com.google.cloud.hadoop.fs.gcs.HadoopConfigurationProperty
1415

1516
class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {
1617

@@ -37,6 +38,7 @@ class BigQueryCatalogTest extends AnyFunSuite with MockitoSugar {
3738
assertTrue(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.isInstanceOf[HadoopConfigurationProperty[Long]])
3839
assertCompiles("classOf[GoogleHadoopFileSystem]")
3940
assertCompiles("classOf[GoogleHadoopFS]")
41+
assertCompiles("classOf[GoogleCloudStorageFileSystem]")
4042

4143
}
4244

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,15 @@ class BigTableKVStoreTest {
5252
@Before
5353
def setup(): Unit = {
5454
// Configure settings to use emulator
55-
val dataSettings = BigtableDataSettings.newBuilderForEmulator(bigtableEmulator.getPort)
55+
val dataSettings = BigtableDataSettings
56+
.newBuilderForEmulator(bigtableEmulator.getPort)
5657
.setProjectId(projectId)
5758
.setInstanceId(instanceId)
5859
.setCredentialsProvider(NoCredentialsProvider.create())
5960
.build()
6061

61-
val adminSettings = BigtableTableAdminSettings.newBuilderForEmulator(bigtableEmulator.getPort)
62+
val adminSettings = BigtableTableAdminSettings
63+
.newBuilderForEmulator(bigtableEmulator.getPort)
6264
.setProjectId(projectId)
6365
.setInstanceId(instanceId)
6466
.setCredentialsProvider(NoCredentialsProvider.create())
@@ -153,11 +155,10 @@ class BigTableKVStoreTest {
153155
val kvStore = new BigTableKVStoreImpl(dataClient, adminClient)
154156
kvStore.create(dataset)
155157

156-
val putReqs = (0 until 100).map {
157-
i =>
158-
val key = s"key-$i"
159-
val value = s"""{"name": "name-$i", "age": $i}"""
160-
PutRequest(key.getBytes, value.getBytes, dataset, None)
158+
val putReqs = (0 until 100).map { i =>
159+
val key = s"key-$i"
160+
val value = s"""{"name": "name-$i", "age": $i}"""
161+
PutRequest(key.getBytes, value.getBytes, dataset, None)
161162
}
162163

163164
val putResults = Await.result(kvStore.multiPut(putReqs), 1.second)
@@ -185,7 +186,9 @@ class BigTableKVStoreTest {
185186

186187
// lets collect all the keys and confirm we got everything
187188
val allKeys = (listValues1 ++ listValues2).map(v => new String(v.keyBytes, StandardCharsets.UTF_8))
188-
allKeys.toSet shouldBe putReqs.map(r => new String(buildRowKey(r.keyBytes, r.dataset), StandardCharsets.UTF_8)).toSet
189+
allKeys.toSet shouldBe putReqs
190+
.map(r => new String(buildRowKey(r.keyBytes, r.dataset), StandardCharsets.UTF_8))
191+
.toSet
189192
}
190193

191194
@Test
@@ -227,7 +230,8 @@ class BigTableKVStoreTest {
227230

228231
when(mockDataClient.readRowsCallable()).thenReturn(serverStreamingCallable)
229232
when(serverStreamingCallable.all()).thenReturn(unaryCallable)
230-
val failedFuture = ApiFutures.immediateFailedFuture[util.List[Row]](new RuntimeException("some BT exception on read"))
233+
val failedFuture =
234+
ApiFutures.immediateFailedFuture[util.List[Row]](new RuntimeException("some BT exception on read"))
231235
when(unaryCallable.futureCall(any[Query])).thenReturn(failedFuture)
232236

233237
val getResult = Await.result(kvStoreWithMocks.multiGet(Seq(getReq1, getReq2)), 1.second)
@@ -323,11 +327,15 @@ class BigTableKVStoreTest {
323327
val getResult1 = Await.result(kvStore.multiGet(Seq(getRequest1)), 1.second)
324328
getResult1.size shouldBe 1
325329
// we expect results to only cover the time range where we have data
326-
val expectedTimeSeriesPoints = (queryStartsTs until dataEndTs by 1.hour.toMillis).toSeq
330+
val expectedTimeSeriesPoints = (queryStartsTs until dataEndTs by 1.hour.toMillis).toSeq
327331
validateTimeSeriesValueExpectedPayload(getResult1.head, expectedTimeSeriesPoints, fakePayload)
328332
}
329333

330-
private def writeGeneratedTimeSeriesData(kvStore: BigTableKVStoreImpl, dataset: String, key: String, tsRange: Seq[Long], payload: String): Unit = {
334+
private def writeGeneratedTimeSeriesData(kvStore: BigTableKVStoreImpl,
335+
dataset: String,
336+
key: String,
337+
tsRange: Seq[Long],
338+
payload: String): Unit = {
331339
val points = Seq.fill(tsRange.size)(payload)
332340
val putRequests = tsRange.zip(points).map {
333341
case (ts, point) =>
@@ -350,10 +358,10 @@ class BigTableKVStoreTest {
350358
}
351359
}
352360

353-
private def validateTimeSeriesValueExpectedPayload(response: GetResponse, expectedTimestamps: Seq[Long], expectedPayload: String): Unit = {
354-
for (
355-
tSeq <- response.values
356-
) {
361+
private def validateTimeSeriesValueExpectedPayload(response: GetResponse,
362+
expectedTimestamps: Seq[Long],
363+
expectedPayload: String): Unit = {
364+
for (tSeq <- response.values) {
357365
tSeq.map(_.millis).toSet shouldBe expectedTimestamps.toSet
358366
tSeq.map(v => new String(v.bytes, StandardCharsets.UTF_8)).foreach(v => v shouldBe expectedPayload)
359367
tSeq.length shouldBe expectedTimestamps.length

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class DataprocSubmitterTest extends AnyFunSuite with MockitoSugar {
4747
BigQueryUtilScala.validateScalaVersionCompatibility()
4848
}
4949

50-
ignore("Used to iterate locally. Do not enable this in CI/CD!") {
50+
test("Used to iterate locally. Do not enable this in CI/CD!") {
5151

5252
val submitter = DataprocSubmitter()
5353
val submittedJobId =

spark/src/main/scala/ai/chronon/spark/Driver.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ import scala.reflect.internal.util.ScalaClassLoader
6969
import scala.util.Failure
7070
import scala.util.Success
7171
import scala.util.Try
72-
import io.circe.generic.auto._
73-
import io.circe.yaml.parser
72+
import org.json4s._
73+
import org.json4s.jackson.JsonMethods._
74+
import org.yaml.snakeyaml.Yaml
7475

7576
// useful to override spark.sql.extensions args - there is no good way to unset that conf apparently
7677
// so we give it dummy extensions
@@ -152,18 +153,18 @@ object Driver {
152153
protected def isLocal: Boolean = localTableMapping.nonEmpty || localDataPath.isDefined
153154

154155
protected def buildSparkSession(): SparkSession = {
156+
implicit val formats: Formats = DefaultFormats
157+
val yamlLoader = new Yaml()
155158
val additionalConfs = additionalConfPath.toOption
156159
.map(Source.fromFile)
157-
.map(_.mkString)
158-
.map((cp) => {
159-
parser
160-
.parse(cp)
161-
.flatMap((r) => r.as[Map[String, String]])
162-
})
163-
.map {
164-
case Right(v) => v
165-
case Left(e) => throw e
166-
}
160+
.map((src) =>
161+
try { src.mkString }
162+
finally { src.close })
163+
.map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
164+
.map((map) => Extraction.decompose(map.asScala.toMap))
165+
.map((v) => render(v))
166+
.map(compact)
167+
.map((str) => parse(str).extract[Map[String, String]])
167168

168169
// We use the KryoSerializer for group bys and joins since we serialize the IRs.
169170
// But since staging query is fairly freeform, it's better to stick to the java serializer.

spark/src/test/scala/ai/chronon/spark/test/OfflineSubcommandTest.scala

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
package ai.chronon.spark.test
1818

1919
import ai.chronon.spark.Driver.OfflineSubcommand
20-
import io.circe.yaml.parser
2120
import org.apache.spark.sql.SparkSession
2221
import org.junit.Assert.assertEquals
2322
import org.junit.Assert.assertTrue
2423
import org.junit.Test
2524
import org.rogach.scallop.ScallopConf
25+
import org.json4s._
26+
import org.json4s.jackson.JsonMethods._
27+
import org.yaml.snakeyaml.Yaml
28+
import collection.JavaConverters._
2629

2730
import scala.io.Source
2831

@@ -60,22 +63,27 @@ class OfflineSubcommandTest {
6063
}
6164

6265
@Test
63-
def additionalConfsParsedCorrectly: Unit = {
66+
def additionalConfsParsedCorrectly(): Unit = {
67+
implicit val formats: Formats = DefaultFormats
68+
6469
val url = getClass.getClassLoader.getResource("test-driver-additional-confs.yaml")
6570

6671
val args = new TestArgs(Seq("--conf-path", "does_not_exist", "--additional-conf-path", url.toURI.getPath).toArray)
6772
val sparkSession = args.buildSparkSession()
68-
69-
val is = getClass.getClassLoader.getResourceAsStream("test-driver-additional-confs.yaml")
70-
71-
val additionalConfs = parser
72-
.parse(Source.fromInputStream(is).mkString)
73-
.flatMap((r) => r.as[Map[String, String]])
74-
75-
val confs = additionalConfs match {
76-
case Right(v) => v
77-
case Left(e) => throw e
78-
}
73+
val yamlLoader = new Yaml()
74+
75+
val confs = Option(getClass.getClassLoader
76+
.getResourceAsStream("test-driver-additional-confs.yaml"))
77+
.map(Source.fromInputStream)
78+
.map((is) =>
79+
try { is.mkString }
80+
finally { is.close })
81+
.map(yamlLoader.load(_).asInstanceOf[java.util.Map[String, Any]])
82+
.map((jMap) => Extraction.decompose(jMap.asScala.toMap))
83+
.map((jVal) => render(jVal))
84+
.map(compact)
85+
.map(parse(_).extract[Map[String, String]])
86+
.getOrElse(throw new IllegalArgumentException(s"Yaml conf not found or invalid yaml"))
7987

8088
val confKey = "spark.chronon.table.format_provider.class"
8189
assertEquals(confs.get(confKey), sparkSession.conf.getOption(confKey))

0 commit comments

Comments
 (0)