Skip to content

Commit f19dfe9

Browse files
authored
Connect GroupByUploadToKVBulkLoad from Driver.scala to run.py (#221)
## Summary ^^^ ### Testing This testing assumes the gbu job (this one is just the job that generates the data to bigquery) has run. Tested with this command: ``` (dev_chronon) davidhan@Davids-MacBook-Pro: ~/zipline/chronon (main) $ python api/py/ai/chronon/repo/run.py --mode upload-to-kv --conf production/group_bys/quickstart/purchases.v1 --partition-string=2023-11-30 --dataproc --repo=/Users/davidhan/zipline/chronon/api/py/test/sample ``` and led to successful run of this job: https://console.cloud.google.com/dataproc/jobs/2f6b0b81-7b34-4a92-840d-cb90059f3d42/monitoring?region=us-central1&project=canary-443022 ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for a new upload mode to Google Cloud Platform. - Enhanced configuration handling for GCP Dataproc clusters. - Introduced new GCP-related options in the command-line interface. - Updated JSON configuration with new GCP parameters. - **Bug Fixes** - Improved error handling and argument processing in various components. - **Refactor** - Updated environment variable naming conventions. - Restructured configuration management across multiple files. - Enhanced clarity and organization in code structure. - **Chores** - Added support for multiple Java versions. - Updated build and deployment scripts for improved reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 23892a5 commit f19dfe9

File tree

9 files changed

+239
-144
lines changed

9 files changed

+239
-144
lines changed

.tool-versions

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
java corretto-17.0.9.8.1
1+
java
2+
corretto-11.0.25.9.1
3+
corretto-17.0.9.8.1
24
scala 2.12.18
35
asdf-plugin-manager 1.4.0
46
sbt 1.8.2

api/py/ai/chronon/repo/run.py

Lines changed: 86 additions & 48 deletions
Large diffs are not rendered by default.

api/py/test/sample/teams.json

Lines changed: 64 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,70 @@
11
{
2-
"default": {
3-
"table_properties": {
4-
"source": "chronon"
5-
},
6-
"common_env": {
7-
"VERSION": "latest",
8-
"SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit",
9-
"JOB_MODE": "local[*]",
10-
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing",
11-
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class",
12-
"CHRONON_ONLINE_ARGS": "[ONLINE-TODO]args prefixed with -Z become constructor map for your implementation of ai.chronon.online.Api, -Zkv-host=<YOUR_HOST> -Zkv-port=<YOUR_PORT>",
13-
"PARTITION_COLUMN": "ds",
14-
"PARTITION_FORMAT": "yyyy-MM-dd"
15-
},
16-
"production": {
17-
"backfill" : {
18-
"EXECUTOR_CORES": "1",
19-
"DRIVER_MEMORY": "15G",
20-
"EXECUTOR_MEMORY": "8G",
21-
"PARALLELISM": "4000",
22-
"MAX_EXECUTORS": "1000"
23-
},
24-
"upload" : {
25-
"EXECUTOR_CORES": "1",
26-
"EXECUTOR_MEMORY": "8G",
27-
"PARALLELISM": "1000",
28-
"MAX_EXECUTORS": "1000"
29-
},
30-
"streaming" : {
31-
"EXECUTOR_CORES": "2",
32-
"EXECUTOR_MEMORY": "4G",
33-
"PARALLELISM": "16"
34-
}
35-
}
2+
"default": {
3+
"table_properties": {
4+
"source": "chronon"
365
},
37-
"sample_team": {
38-
"description": "Team description",
39-
"namespace": "chronon_db",
40-
"user": "# TODO: ldap user name to run the jobs as, from airflow or your own scheduler",
41-
"production": {
42-
"backfill" : {
43-
"EXECUTOR_CORES": "4"
44-
}
45-
},
46-
"dev": {
47-
"backfill" : {
48-
"EXECUTOR_CORES": "2",
49-
"DRIVER_MEMORY": "30G"
50-
}
51-
}
6+
"common_env": {
7+
"VERSION": "latest",
8+
"SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit",
9+
"JOB_MODE": "local[*]",
10+
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing",
11+
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class",
12+
"CHRONON_ONLINE_ARGS": "[ONLINE-TODO]args prefixed with -Z become constructor map for your implementation of ai.chronon.online.Api, -Zkv-host=<YOUR_HOST> -Zkv-port=<YOUR_PORT>",
13+
"PARTITION_COLUMN": "ds",
14+
"PARTITION_FORMAT": "yyyy-MM-dd",
15+
"CUSTOMER_ID": "canary",
16+
"GCP_PROJECT_ID": "canary-443022",
17+
"GCP_REGION": "us-central1",
18+
"GCP_DATAPROC_CLUSTER_NAME": "canary-2",
19+
"GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance"
5220
},
53-
"kaggle": {
54-
"description": "Workspace for kaggle compeitions",
55-
"namespace": "default"
56-
},
57-
"quickstart": {
58-
"description": "Used for the quickstart example",
59-
"namespace": "default"
21+
"production": {
22+
"backfill": {
23+
"EXECUTOR_CORES": "1",
24+
"DRIVER_MEMORY": "15G",
25+
"EXECUTOR_MEMORY": "8G",
26+
"PARALLELISM": "4000",
27+
"MAX_EXECUTORS": "1000"
28+
},
29+
"upload": {
30+
"EXECUTOR_CORES": "1",
31+
"EXECUTOR_MEMORY": "8G",
32+
"PARALLELISM": "1000",
33+
"MAX_EXECUTORS": "1000"
34+
},
35+
"streaming": {
36+
"EXECUTOR_CORES": "2",
37+
"EXECUTOR_MEMORY": "4G",
38+
"PARALLELISM": "16"
39+
}
40+
}
41+
},
42+
"sample_team": {
43+
"description": "Team description",
44+
"namespace": "chronon_db",
45+
"user": "# TODO: ldap user name to run the jobs as, from airflow or your own scheduler",
46+
"production": {
47+
"backfill": {
48+
"EXECUTOR_CORES": "4"
49+
}
6050
},
61-
"risk": {
62-
"description": "Used for proof of concept",
63-
"namespace": "default"
51+
"dev": {
52+
"backfill": {
53+
"EXECUTOR_CORES": "2",
54+
"DRIVER_MEMORY": "30G"
55+
}
6456
}
65-
57+
},
58+
"kaggle": {
59+
"description": "Workspace for kaggle compeitions",
60+
"namespace": "default"
61+
},
62+
"quickstart": {
63+
"description": "Used for the quickstart example",
64+
"namespace": "default"
65+
},
66+
"risk": {
67+
"description": "Used for proof of concept",
68+
"namespace": "default"
69+
}
6670
}

chronon_dataproc_submitter.env

Lines changed: 0 additions & 4 deletions
This file was deleted.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -181,41 +181,57 @@ object DataprocSubmitter {
181181
val chrononJarUri = args.filter(_.startsWith("--chronon_jar_uri"))(0).split("=")(1)
182182

183183
// search args array for prefix `--gcs_files`
184-
val gcsFiles = args
185-
.filter(_.startsWith("--gcs_files"))(0)
186-
.split("=")(1)
187-
.split(",")
184+
val gcsFilesArgs = args.filter(_.startsWith("--gcs_files"))
185+
assert(gcsFilesArgs.length == 0 || gcsFilesArgs.length == 1)
186+
187+
val gcsFiles = if (gcsFilesArgs.isEmpty) {
188+
Array.empty[String]
189+
} else {
190+
gcsFilesArgs(0).split("=")(1).split(",")
191+
}
188192

189193
val userArgs = args.filter(f => !f.startsWith("--gcs_files") && !f.startsWith("--chronon_jar_uri"))
190194

191195
val required_vars = List.apply(
192-
"ZIPLINE_GCP_PROJECT_ID",
193-
"ZIPLINE_GCP_REGION",
194-
"ZIPLINE_GCP_DATAPROC_CLUSTER_NAME"
196+
"GCP_PROJECT_ID",
197+
"GCP_REGION",
198+
"GCP_DATAPROC_CLUSTER_NAME"
195199
)
196200
val missing_vars = required_vars.filter(!sys.env.contains(_))
197201
if (missing_vars.nonEmpty) {
198202
throw new Exception(s"Missing required environment variables: ${missing_vars.mkString(", ")}")
199203
}
200204

201-
val projectId = sys.env.getOrElse("ZIPLINE_GCP_PROJECT_ID", throw new Exception("ZIPLINE_GCP_PROJECT_ID not set"))
202-
val region = sys.env.getOrElse("ZIPLINE_GCP_REGION", throw new Exception("ZIPLINE_GCP_REGION not set"))
205+
val projectId = sys.env.getOrElse("GCP_PROJECT_ID", throw new Exception("GCP_PROJECT_ID not set"))
206+
val region = sys.env.getOrElse("GCP_REGION", throw new Exception("GCP_REGION not set"))
203207
val clusterName = sys.env
204-
.getOrElse("ZIPLINE_GCP_DATAPROC_CLUSTER_NAME", throw new Exception("ZIPLINE_GCP_DATAPROC_CLUSTER_NAME not set"))
208+
.getOrElse("GCP_DATAPROC_CLUSTER_NAME", throw new Exception("GCP_DATAPROC_CLUSTER_NAME not set"))
205209

206210
val submitterConf = SubmitterConf(
207211
projectId,
208212
region,
209213
clusterName
210214
)
211215

216+
val bigtableInstanceId = sys.env.getOrElse("GCP_BIGTABLE_INSTANCE_ID", "")
217+
218+
val gcpArgsToPass = Array.apply(
219+
"--is-gcp",
220+
s"--gcp-project-id=${projectId}",
221+
s"--gcp-bigtable-instance-id=$bigtableInstanceId"
222+
)
223+
224+
val finalArgs = Array.concat(userArgs, gcpArgsToPass)
225+
226+
println(finalArgs.mkString("Array(", ", ", ")"))
227+
212228
val a = DataprocSubmitter(submitterConf)
213229

214230
val jobId = a.submit(
215231
TypeSparkJob,
216232
Map(MainClass -> "ai.chronon.spark.Driver", JarURI -> chrononJarUri),
217233
gcsFiles.toList,
218-
userArgs: _*
234+
finalArgs: _*
219235
)
220236
println("Dataproc submitter job id: " + jobId)
221237
}

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ class GcpApiImpl(conf: Map[String, String]) extends Api(conf) {
2828
.getOrElse(throw new IllegalArgumentException("GCP_PROJECT_ID environment variable not set"))
2929

3030
val instanceId = sys.env
31-
.get("GCP_INSTANCE_ID")
32-
.orElse(conf.get("GCP_INSTANCE_ID"))
33-
.getOrElse(throw new IllegalArgumentException("GCP_INSTANCE_ID environment variable not set"))
31+
.get("GCP_BIGTABLE_INSTANCE_ID")
32+
.orElse(conf.get("GCP_BIGTABLE_INSTANCE_ID"))
33+
.getOrElse(throw new IllegalArgumentException("GCP_BIGTABLE_INSTANCE_ID environment variable not set"))
3434

3535
// Create settings builder based on whether we're in emulator mode (e.g. docker) or not
3636
val (dataSettingsBuilder, adminSettingsBuilder, maybeBQClient) = sys.env.get("BIGTABLE_EMULATOR_HOST") match {

distribution/build_and_upload_gcp_artifacts.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#!/bin/bash
22

3+
set -e
4+
35
SCRIPT_DIRECTORY=$(dirname -- "$(realpath -- "$0")")
46
CHRONON_ROOT_DIR=$(dirname "$SCRIPT_DIRECTORY")
57

spark/src/main/scala/ai/chronon/spark/Driver.scala

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,17 @@ object Driver {
8686
def parseConf[T <: TBase[_, _]: Manifest: ClassTag](confPath: String): T =
8787
ThriftJsonCodec.fromJsonFile[T](confPath, check = true)
8888

89-
trait OfflineSubcommand {
89+
trait AddGcpSubCommandArgs {
90+
this: ScallopConf =>
91+
val isGcp: ScallopOption[Boolean] =
92+
opt[Boolean](required = false, default = Some(false), descr = "Whether to use GCP")
93+
val gcpProjectId: ScallopOption[String] =
94+
opt[String](required = false, descr = "GCP project id")
95+
val gcpBigtableInstanceId: ScallopOption[String] =
96+
opt[String](required = false, descr = "GCP BigTable instance id")
97+
}
98+
99+
trait OfflineSubcommand extends AddGcpSubCommandArgs {
90100
this: ScallopConf =>
91101
val confPath: ScallopOption[String] = opt[String](required = true, descr = "Path to conf")
92102

@@ -513,10 +523,20 @@ object Driver {
513523
object GroupByUploader {
514524
class Args extends Subcommand("group-by-upload") with OfflineSubcommand {
515525
override def subcommandName() = "group-by-upload"
526+
527+
// jsonPercent
528+
val jsonPercent: ScallopOption[Int] =
529+
opt[Int](name = "json-percent",
530+
required = false,
531+
descr = "Percentage of json encoding to retain for debuggability",
532+
default = Some(1))
516533
}
517534

518535
def run(args: Args): Unit = {
519-
GroupByUpload.run(parseConf[api.GroupBy](args.confPath()), args.endDate(), Some(args.buildTableUtils()))
536+
GroupByUpload.run(parseConf[api.GroupBy](args.confPath()),
537+
args.endDate(),
538+
Some(args.buildTableUtils()),
539+
jsonPercent = args.jsonPercent.apply())
520540
}
521541
}
522542

@@ -564,7 +584,7 @@ object Driver {
564584
}
565585

566586
// common arguments to all online commands
567-
trait OnlineSubcommand { s: ScallopConf =>
587+
trait OnlineSubcommand extends AddGcpSubCommandArgs { s: ScallopConf =>
568588
// this is `-Z` and not `-D` because sbt-pack plugin uses that for JAVA_OPTS
569589
val propsInner: Map[String, String] = props[String]('Z')
570590
val onlineJar: ScallopOption[String] =
@@ -573,14 +593,26 @@ object Driver {
573593
opt[String](required = true,
574594
descr = "Fully qualified Online.Api based class. We expect the jar to be on the class path")
575595

596+
// TODO: davidhan - remove this when we've migrated away from additional-conf-path
597+
val additionalConfPath: ScallopOption[String] =
598+
opt[String](required = false, descr = "Path to additional driver job configurations")
599+
576600
// hashmap implements serializable
577601
def serializableProps: Map[String, String] = {
578602
val map = new mutable.HashMap[String, String]()
579603
propsInner.foreach { case (key, value) => map.update(key, value) }
580604
map.toMap
581605
}
582606

583-
lazy val api: Api = impl(serializableProps)
607+
lazy private val gcpMap = Map(
608+
"GCP_PROJECT_ID" -> gcpProjectId.toOption.getOrElse(""),
609+
"GCP_BIGTABLE_INSTANCE_ID" -> gcpBigtableInstanceId.toOption.getOrElse("")
610+
)
611+
612+
lazy val api: Api = isGcp.toOption match {
613+
case Some(true) => impl(serializableProps ++ gcpMap)
614+
case _ => impl(serializableProps)
615+
}
584616

585617
def metaDataStore =
586618
new MetadataStore(impl(serializableProps).genKvStore, MetadataDataset, timeoutMillis = 10000)
@@ -734,31 +766,36 @@ object Driver {
734766
object GroupByUploadToKVBulkLoad {
735767
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
736768
class Args extends Subcommand("groupby-upload-bulk-load") with OnlineSubcommand {
737-
val srcOfflineTable: ScallopOption[String] =
738-
opt[String](required = true, descr = "Name of the source GroupBy Upload table")
739-
740-
val groupbyName: ScallopOption[String] =
741-
opt[String](required = true, descr = "Name of the GroupBy that we're triggering this upload for")
769+
// Expectation that run.py only sets confPath
770+
val confPath: ScallopOption[String] = opt[String](required = false, descr = "path to groupBy conf")
742771

743772
val partitionString: ScallopOption[String] =
744773
opt[String](required = true, descr = "Partition string (in 'yyyy-MM-dd' format) that we are uploading")
745774
}
746775

747776
def run(args: Args): Unit = {
748-
logger.info(s"Triggering bulk load for GroupBy: ${args.groupbyName()} for partition: ${args
749-
.partitionString()} from table: ${args.srcOfflineTable()}")
777+
val groupByConf = parseConf[api.GroupBy](args.confPath())
778+
779+
val offlineTable = groupByConf.metaData.uploadTable
780+
781+
val groupByName = groupByConf.metaData.name
782+
783+
logger.info(s"Triggering bulk load for GroupBy: ${groupByName} for partition: ${args
784+
.partitionString()} from table: ${offlineTable}")
750785
val kvStore = args.api.genKvStore
751786
val startTime = System.currentTimeMillis()
787+
752788
try {
753-
kvStore.bulkPut(args.srcOfflineTable(), args.groupbyName(), args.partitionString())
789+
// TODO: we may need to wrap this around TableUtils
790+
kvStore.bulkPut(offlineTable, groupByName, args.partitionString())
754791
} catch {
755792
case e: Exception =>
756-
logger.error(s"Failed to upload GroupBy: ${args.groupbyName()} for partition: ${args
757-
.partitionString()} from table: ${args.srcOfflineTable()}",
793+
logger.error(s"Failed to upload GroupBy: ${groupByName} for partition: ${args
794+
.partitionString()} from table: $offlineTable",
758795
e)
759796
throw e
760797
}
761-
logger.info(s"Uploaded GroupByUpload data to KV store for GroupBy: ${args.groupbyName()}; partition: ${args
798+
logger.info(s"Uploaded GroupByUpload data to KV store for GroupBy: ${groupByName}; partition: ${args
762799
.partitionString()} in ${(System.currentTimeMillis() - startTime) / 1000} seconds")
763800
}
764801
}

spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,8 @@ object GroupByUpload {
257257
.withColumn("ds", lit(endDs))
258258
.saveUnPartitioned(groupByConf.metaData.uploadTable, groupByConf.metaData.tableProps)
259259

260-
val kvDfReloaded = tableUtils.sparkSession
261-
.table(groupByConf.metaData.uploadTable)
260+
val kvDfReloaded = tableUtils
261+
.loadTable(groupByConf.metaData.uploadTable)
262262
.where(not(col("key_json").eqNullSafe(Constants.GroupByServingInfoKey)))
263263

264264
val metricRow =

0 commit comments

Comments
 (0)