diff --git a/.tool-versions b/.tool-versions index fd2306a46e..46212e39e8 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,4 +1,6 @@ -java corretto-17.0.9.8.1 +java + corretto-11.0.25.9.1 + corretto-17.0.9.8.1 scala 2.12.18 asdf-plugin-manager 1.4.0 sbt 1.8.2 diff --git a/api/py/ai/chronon/repo/run.py b/api/py/ai/chronon/repo/run.py index b9de7efa23..c066701c3e 100755 --- a/api/py/ai/chronon/repo/run.py +++ b/api/py/ai/chronon/repo/run.py @@ -34,6 +34,7 @@ ONLINE_ARGS = "--online-jar={online_jar} --online-class={online_class} " OFFLINE_ARGS = "--conf-path={conf_path} --end-date={ds} " ONLINE_WRITE_ARGS = "--conf-path={conf_path} " + ONLINE_ARGS + ONLINE_OFFLINE_WRITE_ARGS = OFFLINE_ARGS + ONLINE_ARGS ONLINE_MODES = [ "streaming", @@ -47,6 +48,7 @@ "backfill-left", "backfill-final", "upload", + "upload-to-kv", "streaming", "streaming-client", "consistency-metrics-compute", @@ -62,13 +64,15 @@ # Constants for supporting multiple spark versions. SUPPORTED_SPARK = ["2.4.0", "3.1.1", "3.2.1", "3.5.1"] -SCALA_VERSION_FOR_SPARK = {"2.4.0": "2.11", "3.1.1": "2.12", "3.2.1": "2.13", "3.5.1": "2.12"} +SCALA_VERSION_FOR_SPARK = {"2.4.0": "2.11", + "3.1.1": "2.12", "3.2.1": "2.13", "3.5.1": "2.12"} MODE_ARGS = { "backfill": OFFLINE_ARGS, "backfill-left": OFFLINE_ARGS, "backfill-final": OFFLINE_ARGS, "upload": OFFLINE_ARGS, + "upload-to-kv": ONLINE_WRITE_ARGS, "stats-summary": OFFLINE_ARGS, "log-summary": OFFLINE_ARGS, "analyze": OFFLINE_ARGS, @@ -88,6 +92,7 @@ ROUTES = { "group_bys": { "upload": "group-by-upload", + "upload-to-kv": "groupby-upload-bulk-load", "backfill": "group-by-backfill", "streaming": "group-by-streaming", "metadata-upload": "metadata-upload", @@ -123,7 +128,10 @@ APP_NAME_TEMPLATE = "chronon_{conf_type}_{mode}_{context}_{name}" RENDER_INFO_DEFAULT_SCRIPT = "scripts/render_info.py" +# GCP DATAPROC SPECIFIC CONSTANTS DATAPROC_ENTRY = "ai.chronon.integrations.cloud_gcp.DataprocSubmitter" +ZIPLINE_ONLINE_JAR_DEFAULT = "cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" +ZIPLINE_ONLINE_CLASS_DEFAULT = "ai.chronon.integrations.cloud_gcp.GcpApiImpl" def retry_decorator(retries=3, backoff=20): @@ -175,7 +183,8 @@ def download_only_once(url, path, skip_download=False): path = path.strip() if os.path.exists(path): content_output = check_output("curl -sI " + url).decode("utf-8") - content_length = re.search("(content-length:\\s)(\\d+)", content_output.lower()) + content_length = re.search( + "(content-length:\\s)(\\d+)", content_output.lower()) remote_size = int(content_length.group().split()[-1]) local_size = int(check_output("wc -c " + path).split()[0]) print( @@ -189,7 +198,8 @@ def download_only_once(url, path, skip_download=False): print("Sizes match. Assuming it's already downloaded.") should_download = False if should_download: - print("Different file from remote at local: " + path + ". Re-downloading..") + print("Different file from remote at local: " + + path + ". Re-downloading..") check_call("curl {} -o {} --connect-timeout 10".format(url, path)) else: print("No file at: " + path + ". Downloading..") @@ -204,16 +214,16 @@ def download_jar( spark_version="2.4.0", skip_download=False, ): - assert ( - spark_version in SUPPORTED_SPARK - ), f"Received unsupported spark version {spark_version}. Supported spark versions are {SUPPORTED_SPARK}" + assert (spark_version in SUPPORTED_SPARK), (f"Received unsupported spark version {spark_version}. " + f"Supported spark versions are {SUPPORTED_SPARK}") scala_version = SCALA_VERSION_FOR_SPARK[spark_version] maven_url_prefix = os.environ.get("CHRONON_MAVEN_MIRROR_PREFIX", None) default_url_prefix = ( "https://s01.oss.sonatype.org/service/local/repositories/public/content" ) url_prefix = maven_url_prefix if maven_url_prefix else default_url_prefix - base_url = "{}/ai/chronon/spark_{}_{}".format(url_prefix, jar_type, scala_version) + base_url = "{}/ai/chronon/spark_{}_{}".format( + url_prefix, jar_type, scala_version) print("Downloading jar from url: " + base_url) jar_path = os.environ.get("CHRONON_DRIVER_JAR", None) if jar_path is None: @@ -241,11 +251,15 @@ def download_jar( scala_version=scala_version, jar_type=jar_type, ) - jar_path = os.path.join("/tmp", jar_url.split("/")[-1]) + jar_path = os.path.join("/tmp", extract_filename_from_path(jar_url)) download_only_once(jar_url, jar_path, skip_download) return jar_path +def get_teams_json_file_path(repo_path): + return os.path.join(repo_path, "teams.json") + + def set_runtime_env(params): """ Setting the runtime environment variables. @@ -276,10 +290,11 @@ def set_runtime_env(params): if effective_mode and "streaming" in effective_mode: effective_mode = "streaming" if params["repo"]: - teams_file = os.path.join(params["repo"], "teams.json") + teams_file = get_teams_json_file_path(params["repo"]) if os.path.exists(teams_file): with open(teams_file, "r") as infile: teams_json = json.load(infile) + # we should have a fallback if user wants to set to something else `default` environment["common_env"] = teams_json.get("default", {}).get( "common_env", {} ) @@ -320,7 +335,8 @@ def set_runtime_env(params): "backfill-final", ]: environment["conf_env"]["CHRONON_CONFIG_ADDITIONAL_ARGS"] = ( - " ".join(custom_json(conf_json).get("additional_args", [])) + " ".join(custom_json(conf_json).get( + "additional_args", [])) ) environment["cli_args"]["APP_NAME"] = APP_NAME_TEMPLATE.format( mode=effective_mode, @@ -333,7 +349,8 @@ def set_runtime_env(params): ) # fall-back to prod env even in dev mode when dev env is undefined. environment["production_team_env"] = ( - teams_json[team].get("production", {}).get(effective_mode, {}) + teams_json[team].get("production", {}).get( + effective_mode, {}) ) # By default use production env. environment["default_env"] = ( @@ -402,7 +419,8 @@ def __init__(self, args, jar_path): if self.conf: try: - self.context, self.conf_type, self.team, _ = self.conf.split("/")[-4:] + self.context, self.conf_type, self.team, _ = self.conf.split( + "/")[-4:] except Exception as e: logging.error( "Invalid conf path: {}, please ensure to supply the relative path to zipline/ folder".format( @@ -410,12 +428,12 @@ def __init__(self, args, jar_path): ) ) raise e - possible_modes = list(ROUTES[self.conf_type].keys()) + UNIVERSAL_ROUTES + possible_modes = list( + ROUTES[self.conf_type].keys()) + UNIVERSAL_ROUTES assert ( - args["mode"] in possible_modes - ), "Invalid mode:{} for conf:{} of type:{}, please choose from {}".format( - args["mode"], self.conf, self.conf_type, possible_modes - ) + args["mode"] in possible_modes), ("Invalid mode:{} for conf:{} of type:{}, please choose from {}" + .format(args["mode"], self.conf, self.conf_type, possible_modes + )) else: self.conf_type = args["conf_type"] self.ds = args["end_ds"] if "end_ds" in args and args["end_ds"] else args["ds"] @@ -497,9 +515,7 @@ def run(self): ) ) if self.mode == "streaming": - assert ( - len(filtered_apps) == 1 - ), "More than one found, please kill them all" + assert (len(filtered_apps) == 1), "More than one found, please kill them all" print("All good. No need to start a new app.") return elif self.mode == "streaming-client": @@ -520,9 +536,7 @@ def run(self): ) command_list.append(command) else: - # offline mode - - # Always download the jar for now in dataproc mode so that we can pull + # Always download the jar for now so that we can pull # in any fixes or latest changes if self.dataproc: dataproc_jar = download_dataproc_jar(temp_dir, get_customer_id()) @@ -538,13 +552,14 @@ def run(self): for start_ds, end_ds in date_ranges: if not self.dataproc: command = ( - "bash {script} --class ai.chronon.spark.Driver " + - "{jar} {subcommand} {args} {additional_args}" + "bash {script} --class ai.chronon.spark.Driver " + + "{jar} {subcommand} {args} {additional_args}" ).format( script=self.spark_submit, jar=self.jar_path, subcommand=ROUTES[self.conf_type][self.mode], - args=self._gen_final_args(start_ds=start_ds, end_ds=end_ds), + args=self._gen_final_args( + start_ds=start_ds, end_ds=end_ds), additional_args=os.environ.get( "CHRONON_CONFIG_ADDITIONAL_ARGS", "" ), @@ -563,11 +578,16 @@ def run(self): # when we include the gcs file path as part of dataproc, # the file is copied to root and not the complete path # is copied. - override_conf_path=self.conf.split("/")[-1]), + override_conf_path=extract_filename_from_path( + self.conf) if self.conf else None), additional_args=os.environ.get( "CHRONON_CONFIG_ADDITIONAL_ARGS", "" ), ) + local_files_to_upload_to_gcs = [] + if self.conf: + local_files_to_upload_to_gcs.append( + self.conf) dataproc_command = generate_dataproc_submitter_args( local_files_to_upload_to_gcs=[self.conf], @@ -580,8 +600,8 @@ def run(self): else: if not self.dataproc: command = ( - "bash {script} --class ai.chronon.spark.Driver " - + "{jar} {subcommand} {args} {additional_args}" + "bash {script} --class ai.chronon.spark.Driver " + + "{jar} {subcommand} {args} {additional_args}" ).format( script=self.spark_submit, jar=self.jar_path, @@ -603,19 +623,24 @@ def run(self): # does get reflected on GCS. But when we include the gcs file # path as part of dataproc, the file is copied to root and # not the complete path is copied. - override_conf_path=self.conf.split("/")[-1]), + override_conf_path=extract_filename_from_path( + self.conf) if self.conf else None), additional_args=os.environ.get( "CHRONON_CONFIG_ADDITIONAL_ARGS", "" ), ) + local_files_to_upload_to_gcs = [] + if self.conf: + local_files_to_upload_to_gcs.append(self.conf) dataproc_command = generate_dataproc_submitter_args( # for now, self.conf is the only local file that requires uploading to gcs - local_files_to_upload_to_gcs=[self.conf], + local_files_to_upload_to_gcs=local_files_to_upload_to_gcs, user_args=user_args ) command = f"java -cp {dataproc_jar} {DATAPROC_ENTRY} {dataproc_command}" command_list.append(command) + if len(command_list) > 1: # parallel backfill mode with multiprocessing.Pool(processes=int(self.parallelism)) as pool: @@ -638,22 +663,29 @@ def _gen_final_args(self, start_ds=None, end_ds=None, override_conf_path=None): override_start_partition_arg = ( " --start-partition-override=" + start_ds if start_ds else "" ) + final_args = base_args + " " + str(self.args) + override_start_partition_arg + return final_args +def extract_filename_from_path(path): + return path.split("/")[-1] + + def split_date_range(start_date, end_date, parallelism): start_date = datetime.strptime(start_date, "%Y-%m-%d") end_date = datetime.strptime(end_date, "%Y-%m-%d") if start_date > end_date: raise ValueError("Start date should be earlier than end date") total_days = ( - end_date - start_date - ).days + 1 # +1 to include the end_date in the range + end_date - start_date + ).days + 1 # +1 to include the end_date in the range # Check if parallelism is greater than total_days if parallelism > total_days: - raise ValueError("Parallelism should be less than or equal to total days") + raise ValueError( + "Parallelism should be less than or equal to total days") split_size = total_days // parallelism date_ranges = [] @@ -701,16 +733,17 @@ def set_defaults(ctx): def get_customer_id() -> str: - customer_id = os.environ.get('ZIPLINE_CUSTOMER_ID') + customer_id = os.environ.get('CUSTOMER_ID') if not customer_id: - raise ValueError('Please set ZIPLINE_CUSTOMER_ID environment variable') + raise ValueError('Please set CUSTOMER_ID environment variable') return customer_id def get_gcp_project_id() -> str: - gcp_project_id = os.environ.get('ZIPLINE_GCP_PROJECT_ID') + gcp_project_id = os.environ.get('GCP_PROJECT_ID') if not gcp_project_id: - raise ValueError('Please set ZIPLINE_GCP_PROJECT_ID environment variable') + raise ValueError( + 'Please set GCP_PROJECT_ID environment variable') return gcp_project_id @@ -718,22 +751,24 @@ def generate_dataproc_submitter_args(local_files_to_upload_to_gcs: List[str], us customer_warehouse_bucket_name = f"zipline-warehouse-{get_customer_id()}" gcs_files = [] - for f in local_files_to_upload_to_gcs: + for source_file in local_files_to_upload_to_gcs: # upload to `metadata` folder - destination_file_path = f"metadata/{f}" - gcs_files.append(upload_gcs_blob(customer_warehouse_bucket_name, f, destination_file_path)) + destination_file_path = f"metadata/{extract_filename_from_path(source_file)}" + gcs_files.append(upload_gcs_blob( + customer_warehouse_bucket_name, source_file, destination_file_path)) # we also want the additional-confs included here. it should already be in the bucket zipline_artifacts_bucket_prefix = 'gs://zipline-artifacts' - gcs_files.append(f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}/confs/additional-confs.yaml") + gcs_files.append( + f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}/confs/additional-confs.yaml") gcs_file_args = ",".join(gcs_files) # include chronon jar uri. should also already be in the bucket chronon_jar_uri = f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}" + \ - "/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" + "/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" final_args = (f"{user_args} --additional-conf-path=additional-confs.yaml --gcs_files={gcs_file_args} " f"--chronon_jar_uri={chronon_jar_uri}") @@ -750,7 +785,8 @@ def download_dataproc_jar(destination_dir: str, customer_id: str): source_blob_name = f"jars/{file_name}" dataproc_jar_destination_path = f"{destination_dir}/{file_name}" - download_gcs_blob(bucket_name, source_blob_name, dataproc_jar_destination_path) + download_gcs_blob(bucket_name, source_blob_name, + dataproc_jar_destination_path) return dataproc_jar_destination_path @@ -802,10 +838,12 @@ def upload_gcs_blob(bucket_name, source_file_name, destination_blob_name): @click.option("--end-ds", help="the end ds for a range backfill") @click.option("--parallelism", help="break down the backfill range into this number of tasks in parallel. " "Please use it along with --start-ds and --end-ds and only in manual mode") -@click.option("--repo", help="Path to chronon repo") -@click.option("--online-jar", help="Jar containing Online KvStore & Deserializer Impl. " - "Used for streaming and metadata-upload mode.") -@click.option("--online-class", help="Class name of Online Impl. Used for streaming and metadata-upload mode.") +@click.option("--repo", help="Path to chronon repo", default=".") +@click.option("--online-jar", default=ZIPLINE_ONLINE_JAR_DEFAULT, + help="Jar containing Online KvStore & Deserializer Impl. " + "Used for streaming and metadata-upload mode.") +@click.option("--online-class", default=ZIPLINE_ONLINE_CLASS_DEFAULT, + help="Class name of Online Impl. Used for streaming and metadata-upload mode.") @click.option("--version", help="Chronon version to use.") @click.option("--spark-version", default="2.4.0", help="Spark version to use for downloading jar.") @click.option("--spark-submit-path", help="Path to spark-submit") diff --git a/api/py/test/sample/teams.json b/api/py/test/sample/teams.json index 65120076ec..aa6aa3058d 100644 --- a/api/py/test/sample/teams.json +++ b/api/py/test/sample/teams.json @@ -1,66 +1,70 @@ { - "default": { - "table_properties": { - "source": "chronon" - }, - "common_env": { - "VERSION": "latest", - "SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit", - "JOB_MODE": "local[*]", - "HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing", - "CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class", - "CHRONON_ONLINE_ARGS": "[ONLINE-TODO]args prefixed with -Z become constructor map for your implementation of ai.chronon.online.Api, -Zkv-host= -Zkv-port=", - "PARTITION_COLUMN": "ds", - "PARTITION_FORMAT": "yyyy-MM-dd" - }, - "production": { - "backfill" : { - "EXECUTOR_CORES": "1", - "DRIVER_MEMORY": "15G", - "EXECUTOR_MEMORY": "8G", - "PARALLELISM": "4000", - "MAX_EXECUTORS": "1000" - }, - "upload" : { - "EXECUTOR_CORES": "1", - "EXECUTOR_MEMORY": "8G", - "PARALLELISM": "1000", - "MAX_EXECUTORS": "1000" - }, - "streaming" : { - "EXECUTOR_CORES": "2", - "EXECUTOR_MEMORY": "4G", - "PARALLELISM": "16" - } - } + "default": { + "table_properties": { + "source": "chronon" }, - "sample_team": { - "description": "Team description", - "namespace": "chronon_db", - "user": "# TODO: ldap user name to run the jobs as, from airflow or your own scheduler", - "production": { - "backfill" : { - "EXECUTOR_CORES": "4" - } - }, - "dev": { - "backfill" : { - "EXECUTOR_CORES": "2", - "DRIVER_MEMORY": "30G" - } - } + "common_env": { + "VERSION": "latest", + "SPARK_SUBMIT_PATH": "[TODO]/path/to/spark-submit", + "JOB_MODE": "local[*]", + "HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing", + "CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class", + "CHRONON_ONLINE_ARGS": "[ONLINE-TODO]args prefixed with -Z become constructor map for your implementation of ai.chronon.online.Api, -Zkv-host= -Zkv-port=", + "PARTITION_COLUMN": "ds", + "PARTITION_FORMAT": "yyyy-MM-dd", + "CUSTOMER_ID": "canary", + "GCP_PROJECT_ID": "canary-443022", + "GCP_REGION": "us-central1", + "GCP_DATAPROC_CLUSTER_NAME": "canary-2", + "GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance" }, - "kaggle": { - "description": "Workspace for kaggle compeitions", - "namespace": "default" - }, - "quickstart": { - "description": "Used for the quickstart example", - "namespace": "default" + "production": { + "backfill": { + "EXECUTOR_CORES": "1", + "DRIVER_MEMORY": "15G", + "EXECUTOR_MEMORY": "8G", + "PARALLELISM": "4000", + "MAX_EXECUTORS": "1000" + }, + "upload": { + "EXECUTOR_CORES": "1", + "EXECUTOR_MEMORY": "8G", + "PARALLELISM": "1000", + "MAX_EXECUTORS": "1000" + }, + "streaming": { + "EXECUTOR_CORES": "2", + "EXECUTOR_MEMORY": "4G", + "PARALLELISM": "16" + } + } + }, + "sample_team": { + "description": "Team description", + "namespace": "chronon_db", + "user": "# TODO: ldap user name to run the jobs as, from airflow or your own scheduler", + "production": { + "backfill": { + "EXECUTOR_CORES": "4" + } }, - "risk": { - "description": "Used for proof of concept", - "namespace": "default" + "dev": { + "backfill": { + "EXECUTOR_CORES": "2", + "DRIVER_MEMORY": "30G" + } } - + }, + "kaggle": { + "description": "Workspace for kaggle compeitions", + "namespace": "default" + }, + "quickstart": { + "description": "Used for the quickstart example", + "namespace": "default" + }, + "risk": { + "description": "Used for proof of concept", + "namespace": "default" + } } diff --git a/chronon_dataproc_submitter.env b/chronon_dataproc_submitter.env deleted file mode 100644 index 1bdf22821a..0000000000 --- a/chronon_dataproc_submitter.env +++ /dev/null @@ -1,4 +0,0 @@ -ZIPLINE_CUSTOMER_ID=canary -ZIPLINE_GCP_PROJECT_ID=canary-443022 -ZIPLINE_GCP_REGION=us-central1 -ZIPLINE_GCP_DATAPROC_CLUSTER_NAME=canary-2 \ No newline at end of file diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala index 1e846f503f..34424baa4a 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala @@ -181,27 +181,31 @@ object DataprocSubmitter { val chrononJarUri = args.filter(_.startsWith("--chronon_jar_uri"))(0).split("=")(1) // search args array for prefix `--gcs_files` - val gcsFiles = args - .filter(_.startsWith("--gcs_files"))(0) - .split("=")(1) - .split(",") + val gcsFilesArgs = args.filter(_.startsWith("--gcs_files")) + assert(gcsFilesArgs.length == 0 || gcsFilesArgs.length == 1) + + val gcsFiles = if (gcsFilesArgs.isEmpty) { + Array.empty[String] + } else { + gcsFilesArgs(0).split("=")(1).split(",") + } val userArgs = args.filter(f => !f.startsWith("--gcs_files") && !f.startsWith("--chronon_jar_uri")) val required_vars = List.apply( - "ZIPLINE_GCP_PROJECT_ID", - "ZIPLINE_GCP_REGION", - "ZIPLINE_GCP_DATAPROC_CLUSTER_NAME" + "GCP_PROJECT_ID", + "GCP_REGION", + "GCP_DATAPROC_CLUSTER_NAME" ) val missing_vars = required_vars.filter(!sys.env.contains(_)) if (missing_vars.nonEmpty) { throw new Exception(s"Missing required environment variables: ${missing_vars.mkString(", ")}") } - val projectId = sys.env.getOrElse("ZIPLINE_GCP_PROJECT_ID", throw new Exception("ZIPLINE_GCP_PROJECT_ID not set")) - val region = sys.env.getOrElse("ZIPLINE_GCP_REGION", throw new Exception("ZIPLINE_GCP_REGION not set")) + val projectId = sys.env.getOrElse("GCP_PROJECT_ID", throw new Exception("GCP_PROJECT_ID not set")) + val region = sys.env.getOrElse("GCP_REGION", throw new Exception("GCP_REGION not set")) val clusterName = sys.env - .getOrElse("ZIPLINE_GCP_DATAPROC_CLUSTER_NAME", throw new Exception("ZIPLINE_GCP_DATAPROC_CLUSTER_NAME not set")) + .getOrElse("GCP_DATAPROC_CLUSTER_NAME", throw new Exception("GCP_DATAPROC_CLUSTER_NAME not set")) val submitterConf = SubmitterConf( projectId, @@ -209,13 +213,25 @@ object DataprocSubmitter { clusterName ) + val bigtableInstanceId = sys.env.getOrElse("GCP_BIGTABLE_INSTANCE_ID", "") + + val gcpArgsToPass = Array.apply( + "--is-gcp", + s"--gcp-project-id=${projectId}", + s"--gcp-bigtable-instance-id=$bigtableInstanceId" + ) + + val finalArgs = Array.concat(userArgs, gcpArgsToPass) + + println(finalArgs.mkString("Array(", ", ", ")")) + val a = DataprocSubmitter(submitterConf) val jobId = a.submit( TypeSparkJob, Map(MainClass -> "ai.chronon.spark.Driver", JarURI -> chrononJarUri), gcsFiles.toList, - userArgs: _* + finalArgs: _* ) println("Dataproc submitter job id: " + jobId) } diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala index f1afd79d01..d21f267927 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala @@ -28,9 +28,9 @@ class GcpApiImpl(conf: Map[String, String]) extends Api(conf) { .getOrElse(throw new IllegalArgumentException("GCP_PROJECT_ID environment variable not set")) val instanceId = sys.env - .get("GCP_INSTANCE_ID") - .orElse(conf.get("GCP_INSTANCE_ID")) - .getOrElse(throw new IllegalArgumentException("GCP_INSTANCE_ID environment variable not set")) + .get("GCP_BIGTABLE_INSTANCE_ID") + .orElse(conf.get("GCP_BIGTABLE_INSTANCE_ID")) + .getOrElse(throw new IllegalArgumentException("GCP_BIGTABLE_INSTANCE_ID environment variable not set")) // Create settings builder based on whether we're in emulator mode (e.g. docker) or not val (dataSettingsBuilder, adminSettingsBuilder, maybeBQClient) = sys.env.get("BIGTABLE_EMULATOR_HOST") match { diff --git a/distribution/build_and_upload_gcp_artifacts.sh b/distribution/build_and_upload_gcp_artifacts.sh index c76e9b20c2..bf5870bb53 100644 --- a/distribution/build_and_upload_gcp_artifacts.sh +++ b/distribution/build_and_upload_gcp_artifacts.sh @@ -1,5 +1,7 @@ #!/bin/bash +set -e + SCRIPT_DIRECTORY=$(dirname -- "$(realpath -- "$0")") CHRONON_ROOT_DIR=$(dirname "$SCRIPT_DIRECTORY") diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 9c35d9bc44..73d5680f08 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -86,7 +86,17 @@ object Driver { def parseConf[T <: TBase[_, _]: Manifest: ClassTag](confPath: String): T = ThriftJsonCodec.fromJsonFile[T](confPath, check = true) - trait OfflineSubcommand { + trait AddGcpSubCommandArgs { + this: ScallopConf => + val isGcp: ScallopOption[Boolean] = + opt[Boolean](required = false, default = Some(false), descr = "Whether to use GCP") + val gcpProjectId: ScallopOption[String] = + opt[String](required = false, descr = "GCP project id") + val gcpBigtableInstanceId: ScallopOption[String] = + opt[String](required = false, descr = "GCP BigTable instance id") + } + + trait OfflineSubcommand extends AddGcpSubCommandArgs { this: ScallopConf => val confPath: ScallopOption[String] = opt[String](required = true, descr = "Path to conf") @@ -513,10 +523,20 @@ object Driver { object GroupByUploader { class Args extends Subcommand("group-by-upload") with OfflineSubcommand { override def subcommandName() = "group-by-upload" + + // jsonPercent + val jsonPercent: ScallopOption[Int] = + opt[Int](name = "json-percent", + required = false, + descr = "Percentage of json encoding to retain for debuggability", + default = Some(1)) } def run(args: Args): Unit = { - GroupByUpload.run(parseConf[api.GroupBy](args.confPath()), args.endDate(), Some(args.buildTableUtils())) + GroupByUpload.run(parseConf[api.GroupBy](args.confPath()), + args.endDate(), + Some(args.buildTableUtils()), + jsonPercent = args.jsonPercent.apply()) } } @@ -564,7 +584,7 @@ object Driver { } // common arguments to all online commands - trait OnlineSubcommand { s: ScallopConf => + trait OnlineSubcommand extends AddGcpSubCommandArgs { s: ScallopConf => // this is `-Z` and not `-D` because sbt-pack plugin uses that for JAVA_OPTS val propsInner: Map[String, String] = props[String]('Z') val onlineJar: ScallopOption[String] = @@ -573,6 +593,10 @@ object Driver { opt[String](required = true, descr = "Fully qualified Online.Api based class. We expect the jar to be on the class path") + // TODO: davidhan - remove this when we've migrated away from additional-conf-path + val additionalConfPath: ScallopOption[String] = + opt[String](required = false, descr = "Path to additional driver job configurations") + // hashmap implements serializable def serializableProps: Map[String, String] = { val map = new mutable.HashMap[String, String]() @@ -580,7 +604,15 @@ object Driver { map.toMap } - lazy val api: Api = impl(serializableProps) + lazy private val gcpMap = Map( + "GCP_PROJECT_ID" -> gcpProjectId.toOption.getOrElse(""), + "GCP_BIGTABLE_INSTANCE_ID" -> gcpBigtableInstanceId.toOption.getOrElse("") + ) + + lazy val api: Api = isGcp.toOption match { + case Some(true) => impl(serializableProps ++ gcpMap) + case _ => impl(serializableProps) + } def metaDataStore = new MetadataStore(impl(serializableProps).genKvStore, MetadataDataset, timeoutMillis = 10000) @@ -734,31 +766,36 @@ object Driver { object GroupByUploadToKVBulkLoad { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) class Args extends Subcommand("groupby-upload-bulk-load") with OnlineSubcommand { - val srcOfflineTable: ScallopOption[String] = - opt[String](required = true, descr = "Name of the source GroupBy Upload table") - - val groupbyName: ScallopOption[String] = - opt[String](required = true, descr = "Name of the GroupBy that we're triggering this upload for") + // Expectation that run.py only sets confPath + val confPath: ScallopOption[String] = opt[String](required = false, descr = "path to groupBy conf") val partitionString: ScallopOption[String] = opt[String](required = true, descr = "Partition string (in 'yyyy-MM-dd' format) that we are uploading") } def run(args: Args): Unit = { - logger.info(s"Triggering bulk load for GroupBy: ${args.groupbyName()} for partition: ${args - .partitionString()} from table: ${args.srcOfflineTable()}") + val groupByConf = parseConf[api.GroupBy](args.confPath()) + + val offlineTable = groupByConf.metaData.uploadTable + + val groupByName = groupByConf.metaData.name + + logger.info(s"Triggering bulk load for GroupBy: ${groupByName} for partition: ${args + .partitionString()} from table: ${offlineTable}") val kvStore = args.api.genKvStore val startTime = System.currentTimeMillis() + try { - kvStore.bulkPut(args.srcOfflineTable(), args.groupbyName(), args.partitionString()) + // TODO: we may need to wrap this around TableUtils + kvStore.bulkPut(offlineTable, groupByName, args.partitionString()) } catch { case e: Exception => - logger.error(s"Failed to upload GroupBy: ${args.groupbyName()} for partition: ${args - .partitionString()} from table: ${args.srcOfflineTable()}", + logger.error(s"Failed to upload GroupBy: ${groupByName} for partition: ${args + .partitionString()} from table: $offlineTable", e) throw e } - logger.info(s"Uploaded GroupByUpload data to KV store for GroupBy: ${args.groupbyName()}; partition: ${args + logger.info(s"Uploaded GroupByUpload data to KV store for GroupBy: ${groupByName}; partition: ${args .partitionString()} in ${(System.currentTimeMillis() - startTime) / 1000} seconds") } } diff --git a/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala b/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala index 946d04680f..e954d04190 100644 --- a/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala +++ b/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala @@ -257,8 +257,8 @@ object GroupByUpload { .withColumn("ds", lit(endDs)) .saveUnPartitioned(groupByConf.metaData.uploadTable, groupByConf.metaData.tableProps) - val kvDfReloaded = tableUtils.sparkSession - .table(groupByConf.metaData.uploadTable) + val kvDfReloaded = tableUtils + .loadTable(groupByConf.metaData.uploadTable) .where(not(col("key_json").eqNullSafe(Constants.GroupByServingInfoKey))) val metricRow =