Skip to content

Connect GroupByUploadToKVBulkLoad from Driver.scala to run.py #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 17, 2025
4 changes: 3 additions & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
java corretto-17.0.9.8.1
java
corretto-11.0.25.9.1
corretto-17.0.9.8.1
scala 2.12.18
asdf-plugin-manager 1.4.0
sbt 1.8.2
Expand Down
134 changes: 86 additions & 48 deletions api/py/ai/chronon/repo/run.py

Large diffs are not rendered by default.

124 changes: 64 additions & 60 deletions api/py/test/sample/teams.json
Original file line number Diff line number Diff line change
@@ -1,66 +1,70 @@
{
"default": {
"table_properties": {
"source": "chronon"
},
"common_env": {
"VERSION": "latest",
"SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit",
"JOB_MODE": "local[*]",
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing",
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class",
"CHRONON_ONLINE_ARGS": "[ONLINE-TODO]args prefixed with -Z become constructor map for your implementation of ai.chronon.online.Api, -Zkv-host=<YOUR_HOST> -Zkv-port=<YOUR_PORT>",
"PARTITION_COLUMN": "ds",
"PARTITION_FORMAT": "yyyy-MM-dd"
},
"production": {
"backfill" : {
"EXECUTOR_CORES": "1",
"DRIVER_MEMORY": "15G",
"EXECUTOR_MEMORY": "8G",
"PARALLELISM": "4000",
"MAX_EXECUTORS": "1000"
},
"upload" : {
"EXECUTOR_CORES": "1",
"EXECUTOR_MEMORY": "8G",
"PARALLELISM": "1000",
"MAX_EXECUTORS": "1000"
},
"streaming" : {
"EXECUTOR_CORES": "2",
"EXECUTOR_MEMORY": "4G",
"PARALLELISM": "16"
}
}
"default": {
"table_properties": {
"source": "chronon"
},
"sample_team": {
"description": "Team description",
"namespace": "chronon_db",
"user": "# TODO: ldap user name to run the jobs as, from airflow or your own scheduler",
"production": {
"backfill" : {
"EXECUTOR_CORES": "4"
}
},
"dev": {
"backfill" : {
"EXECUTOR_CORES": "2",
"DRIVER_MEMORY": "30G"
}
}
"common_env": {
"VERSION": "latest",
"SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit",
"JOB_MODE": "local[*]",
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing",
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to ZIPLINE_ ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can change. but CHRONON_ONLINE_CLASS is set throughout run.py like here https://github.com/zipline-ai/chronon/blob/main/api/py/ai/chronon/repo/run.py#L684

still want to change?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah got it okay maybe a followup.

"CHRONON_ONLINE_ARGS": "[ONLINE-TODO]args prefixed with -Z become constructor map for your implementation of ai.chronon.online.Api, -Zkv-host=<YOUR_HOST> -Zkv-port=<YOUR_PORT>",
Comment on lines +8 to +12
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove TODO placeholders before production deployment.

These placeholder values could cause runtime issues if not properly configured.

"PARTITION_COLUMN": "ds",
"PARTITION_FORMAT": "yyyy-MM-dd",
"CUSTOMER_ID": "canary",
"GCP_PROJECT_ID": "canary-443022",
"GCP_REGION": "us-central1",
"GCP_DATAPROC_CLUSTER_NAME": "canary-2",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no changes needed for the PR but I think at some point we'll want to hide some of these away from the customer (just making a note)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i'm going to clean up the teams.json in the etsy directory https://github.com/etsy/zipline

"GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance"
},
"kaggle": {
"description": "Workspace for kaggle compeitions",
"namespace": "default"
},
"quickstart": {
"description": "Used for the quickstart example",
"namespace": "default"
"production": {
"backfill": {
"EXECUTOR_CORES": "1",
"DRIVER_MEMORY": "15G",
"EXECUTOR_MEMORY": "8G",
"PARALLELISM": "4000",
"MAX_EXECUTORS": "1000"
},
"upload": {
"EXECUTOR_CORES": "1",
"EXECUTOR_MEMORY": "8G",
"PARALLELISM": "1000",
"MAX_EXECUTORS": "1000"
},
"streaming": {
"EXECUTOR_CORES": "2",
"EXECUTOR_MEMORY": "4G",
"PARALLELISM": "16"
}
}
},
"sample_team": {
"description": "Team description",
"namespace": "chronon_db",
"user": "# TODO: ldap user name to run the jobs as, from airflow or your own scheduler",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace TODO with actual LDAP username.

Placeholder LDAP username needs to be configured before deployment.

"production": {
"backfill": {
"EXECUTOR_CORES": "4"
}
},
"risk": {
"description": "Used for proof of concept",
"namespace": "default"
"dev": {
"backfill": {
"EXECUTOR_CORES": "2",
"DRIVER_MEMORY": "30G"
}
}

},
"kaggle": {
"description": "Workspace for kaggle compeitions",
"namespace": "default"
},
"quickstart": {
"description": "Used for the quickstart example",
"namespace": "default"
},
"risk": {
"description": "Used for proof of concept",
"namespace": "default"
}
}
4 changes: 0 additions & 4 deletions chronon_dataproc_submitter.env

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -181,41 +181,57 @@ object DataprocSubmitter {
val chrononJarUri = args.filter(_.startsWith("--chronon_jar_uri"))(0).split("=")(1)

// search args array for prefix `--gcs_files`
val gcsFiles = args
.filter(_.startsWith("--gcs_files"))(0)
.split("=")(1)
.split(",")
val gcsFilesArgs = args.filter(_.startsWith("--gcs_files"))
assert(gcsFilesArgs.length == 0 || gcsFilesArgs.length == 1)

val gcsFiles = if (gcsFilesArgs.isEmpty) {
Array.empty[String]
} else {
gcsFilesArgs(0).split("=")(1).split(",")
}

val userArgs = args.filter(f => !f.startsWith("--gcs_files") && !f.startsWith("--chronon_jar_uri"))

val required_vars = List.apply(
"ZIPLINE_GCP_PROJECT_ID",
"ZIPLINE_GCP_REGION",
"ZIPLINE_GCP_DATAPROC_CLUSTER_NAME"
"GCP_PROJECT_ID",
"GCP_REGION",
"GCP_DATAPROC_CLUSTER_NAME"
)
val missing_vars = required_vars.filter(!sys.env.contains(_))
if (missing_vars.nonEmpty) {
throw new Exception(s"Missing required environment variables: ${missing_vars.mkString(", ")}")
}

val projectId = sys.env.getOrElse("ZIPLINE_GCP_PROJECT_ID", throw new Exception("ZIPLINE_GCP_PROJECT_ID not set"))
val region = sys.env.getOrElse("ZIPLINE_GCP_REGION", throw new Exception("ZIPLINE_GCP_REGION not set"))
val projectId = sys.env.getOrElse("GCP_PROJECT_ID", throw new Exception("GCP_PROJECT_ID not set"))
val region = sys.env.getOrElse("GCP_REGION", throw new Exception("GCP_REGION not set"))
val clusterName = sys.env
.getOrElse("ZIPLINE_GCP_DATAPROC_CLUSTER_NAME", throw new Exception("ZIPLINE_GCP_DATAPROC_CLUSTER_NAME not set"))
.getOrElse("GCP_DATAPROC_CLUSTER_NAME", throw new Exception("GCP_DATAPROC_CLUSTER_NAME not set"))

val submitterConf = SubmitterConf(
projectId,
region,
clusterName
)

val bigtableInstanceId = sys.env.getOrElse("GCP_BIGTABLE_INSTANCE_ID", "")

val gcpArgsToPass = Array.apply(
"--is-gcp",
s"--gcp-project-id=${projectId}",
s"--gcp-bigtable-instance-id=$bigtableInstanceId"
)

val finalArgs = Array.concat(userArgs, gcpArgsToPass)

println(finalArgs.mkString("Array(", ", ", ")"))

val a = DataprocSubmitter(submitterConf)

val jobId = a.submit(
TypeSparkJob,
Map(MainClass -> "ai.chronon.spark.Driver", JarURI -> chrononJarUri),
gcsFiles.toList,
userArgs: _*
finalArgs: _*
)
println("Dataproc submitter job id: " + jobId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class GcpApiImpl(conf: Map[String, String]) extends Api(conf) {
.getOrElse(throw new IllegalArgumentException("GCP_PROJECT_ID environment variable not set"))

val instanceId = sys.env
.get("GCP_INSTANCE_ID")
.orElse(conf.get("GCP_INSTANCE_ID"))
.getOrElse(throw new IllegalArgumentException("GCP_INSTANCE_ID environment variable not set"))
.get("GCP_BIGTABLE_INSTANCE_ID")
.orElse(conf.get("GCP_BIGTABLE_INSTANCE_ID"))
.getOrElse(throw new IllegalArgumentException("GCP_BIGTABLE_INSTANCE_ID environment variable not set"))
Comment on lines +31 to +33
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Environment variable rename is incomplete

The variable name change from GCP_INSTANCE_ID to GCP_BIGTABLE_INSTANCE_ID needs to be updated in:

  • quickstart/cloud_gcp/scripts/load_data.sh
  • quickstart/cloud_gcp/gcp-docker-compose.yml
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreBulkLoadIntegrationTest.scala
🔗 Analysis chain

LGTM! More descriptive environment variable name.

The rename from GCP_INSTANCE_ID to GCP_BIGTABLE_INSTANCE_ID improves clarity.

Let's verify the variable name is updated everywhere:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any remaining instances of the old variable name
rg "GCP_INSTANCE_ID"

# Search for the new variable name to ensure consistent usage
rg "GCP_BIGTABLE_INSTANCE_ID"

Length of output: 1835


// Create settings builder based on whether we're in emulator mode (e.g. docker) or not
val (dataSettingsBuilder, adminSettingsBuilder, maybeBQClient) = sys.env.get("BIGTABLE_EMULATOR_HOST") match {
Expand Down
2 changes: 2 additions & 0 deletions distribution/build_and_upload_gcp_artifacts.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/bash

set -e

SCRIPT_DIRECTORY=$(dirname -- "$(realpath -- "$0")")
CHRONON_ROOT_DIR=$(dirname "$SCRIPT_DIRECTORY")

Expand Down
67 changes: 52 additions & 15 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,17 @@ object Driver {
def parseConf[T <: TBase[_, _]: Manifest: ClassTag](confPath: String): T =
ThriftJsonCodec.fromJsonFile[T](confPath, check = true)

trait OfflineSubcommand {
trait AddGcpSubCommandArgs {
this: ScallopConf =>
val isGcp: ScallopOption[Boolean] =
opt[Boolean](required = false, default = Some(false), descr = "Whether to use GCP")
val gcpProjectId: ScallopOption[String] =
opt[String](required = false, descr = "GCP project id")
val gcpBigtableInstanceId: ScallopOption[String] =
opt[String](required = false, descr = "GCP BigTable instance id")
}

trait OfflineSubcommand extends AddGcpSubCommandArgs {
this: ScallopConf =>
val confPath: ScallopOption[String] = opt[String](required = true, descr = "Path to conf")

Expand Down Expand Up @@ -513,10 +523,20 @@ object Driver {
object GroupByUploader {
class Args extends Subcommand("group-by-upload") with OfflineSubcommand {
override def subcommandName() = "group-by-upload"

// jsonPercent
val jsonPercent: ScallopOption[Int] =
opt[Int](name = "json-percent",
required = false,
descr = "Percentage of json encoding to retain for debuggability",
default = Some(1))
}

def run(args: Args): Unit = {
GroupByUpload.run(parseConf[api.GroupBy](args.confPath()), args.endDate(), Some(args.buildTableUtils()))
GroupByUpload.run(parseConf[api.GroupBy](args.confPath()),
args.endDate(),
Some(args.buildTableUtils()),
jsonPercent = args.jsonPercent.apply())
}
}

Expand Down Expand Up @@ -564,7 +584,7 @@ object Driver {
}

// common arguments to all online commands
trait OnlineSubcommand { s: ScallopConf =>
trait OnlineSubcommand extends AddGcpSubCommandArgs { s: ScallopConf =>
// this is `-Z` and not `-D` because sbt-pack plugin uses that for JAVA_OPTS
val propsInner: Map[String, String] = props[String]('Z')
val onlineJar: ScallopOption[String] =
Expand All @@ -573,14 +593,26 @@ object Driver {
opt[String](required = true,
descr = "Fully qualified Online.Api based class. We expect the jar to be on the class path")

// TODO: davidhan - remove this when we've migrated away from additional-conf-path
val additionalConfPath: ScallopOption[String] =
opt[String](required = false, descr = "Path to additional driver job configurations")

// hashmap implements serializable
def serializableProps: Map[String, String] = {
val map = new mutable.HashMap[String, String]()
propsInner.foreach { case (key, value) => map.update(key, value) }
map.toMap
}

lazy val api: Api = impl(serializableProps)
lazy private val gcpMap = Map(
"GCP_PROJECT_ID" -> gcpProjectId.toOption.getOrElse(""),
"GCP_BIGTABLE_INSTANCE_ID" -> gcpBigtableInstanceId.toOption.getOrElse("")
)

lazy val api: Api = isGcp.toOption match {
case Some(true) => impl(serializableProps ++ gcpMap)
case _ => impl(serializableProps)
}

def metaDataStore =
new MetadataStore(impl(serializableProps).genKvStore, MetadataDataset, timeoutMillis = 10000)
Expand Down Expand Up @@ -734,31 +766,36 @@ object Driver {
object GroupByUploadToKVBulkLoad {
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
class Args extends Subcommand("groupby-upload-bulk-load") with OnlineSubcommand {
val srcOfflineTable: ScallopOption[String] =
opt[String](required = true, descr = "Name of the source GroupBy Upload table")

val groupbyName: ScallopOption[String] =
opt[String](required = true, descr = "Name of the GroupBy that we're triggering this upload for")
// Expectation that run.py only sets confPath
val confPath: ScallopOption[String] = opt[String](required = false, descr = "path to groupBy conf")

val partitionString: ScallopOption[String] =
opt[String](required = true, descr = "Partition string (in 'yyyy-MM-dd' format) that we are uploading")
}

def run(args: Args): Unit = {
logger.info(s"Triggering bulk load for GroupBy: ${args.groupbyName()} for partition: ${args
.partitionString()} from table: ${args.srcOfflineTable()}")
val groupByConf = parseConf[api.GroupBy](args.confPath())

val offlineTable = groupByConf.metaData.uploadTable

val groupByName = groupByConf.metaData.name

logger.info(s"Triggering bulk load for GroupBy: ${groupByName} for partition: ${args
.partitionString()} from table: ${offlineTable}")
val kvStore = args.api.genKvStore
val startTime = System.currentTimeMillis()

try {
kvStore.bulkPut(args.srcOfflineTable(), args.groupbyName(), args.partitionString())
// TODO: we may need to wrap this around TableUtils
kvStore.bulkPut(offlineTable, groupByName, args.partitionString())
} catch {
case e: Exception =>
logger.error(s"Failed to upload GroupBy: ${args.groupbyName()} for partition: ${args
.partitionString()} from table: ${args.srcOfflineTable()}",
logger.error(s"Failed to upload GroupBy: ${groupByName} for partition: ${args
.partitionString()} from table: $offlineTable",
e)
throw e
}
logger.info(s"Uploaded GroupByUpload data to KV store for GroupBy: ${args.groupbyName()}; partition: ${args
logger.info(s"Uploaded GroupByUpload data to KV store for GroupBy: ${groupByName}; partition: ${args
.partitionString()} in ${(System.currentTimeMillis() - startTime) / 1000} seconds")
}
}
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ object GroupByUpload {
.withColumn("ds", lit(endDs))
.saveUnPartitioned(groupByConf.metaData.uploadTable, groupByConf.metaData.tableProps)

val kvDfReloaded = tableUtils.sparkSession
.table(groupByConf.metaData.uploadTable)
val kvDfReloaded = tableUtils
.loadTable(groupByConf.metaData.uploadTable)
.where(not(col("key_json").eqNullSafe(Constants.GroupByServingInfoKey)))

val metricRow =
Expand Down
Loading