Skip to content

Commit c6427cc

Browse files
authored
Stop using cloud_gcp_submitter jar and just use cloud_gcp jar, and add --gcp for fetch (#466)
## Summary ^^^ Removing usage and references to the cloud_gcp_submitter jar as we don't need it. And added an additional `--gcp` arg so that it can be used in the `fetch` command to set which cloud provider's implementations we should use. Ex: `fetch` + `--gcp` should be pulling from the `BigTable` kv store code ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a new command-line option for specifying GCP settings, allowing users to easily switch modes. - Introduced an automated script for creating Python wheel packages during artifact builds. - **Refactor** - Updated module imports and renamed constants for improved clarity and consistency. - Simplified logic for handling jar file paths and streamlined the artifact build process. - Removed obsolete build configurations and redundant commands. - **Documentation** - Enhanced developer notes with improved formatting for better readability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent fc36d9f commit c6427cc

File tree

5 files changed

+149
-79
lines changed

5 files changed

+149
-79
lines changed

api/py/ai/chronon/repo/gcp.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44

55
from google.cloud import storage
66
import crcmod
7-
from .utils import get_environ_arg, retry_decorator, DataprocJobType, get_customer_id, \
7+
from ai.chronon.repo.utils import get_environ_arg, retry_decorator, DataprocJobType, get_customer_id, \
88
extract_filename_from_path
99

1010
# GCP DATAPROC SPECIFIC CONSTANTS
1111
DATAPROC_ENTRY = "ai.chronon.integrations.cloud_gcp.DataprocSubmitter"
12-
ZIPLINE_GCP_ONLINE_JAR_DEFAULT = "cloud_gcp_lib_deploy.jar"
12+
ZIPLINE_GCP_JAR_DEFAULT = "cloud_gcp_lib_deploy.jar"
1313
ZIPLINE_GCP_ONLINE_CLASS_DEFAULT = "ai.chronon.integrations.cloud_gcp.GcpApiImpl"
1414
ZIPLINE_GCP_FLINK_JAR_DEFAULT = "flink_assembly_deploy.jar"
15-
ZIPLINE_GCP_DATAPROC_SUBMITTER_JAR = "cloud_gcp_submitter_deploy.jar"
1615
ZIPLINE_GCP_SERVICE_JAR = "service_assembly_deploy.jar"
1716

1817

@@ -21,7 +20,6 @@ def get_gcp_project_id() -> str:
2120

2221

2322
def get_gcp_bigtable_instance_id() -> str:
24-
from py.ai.chronon.repo.utils import get_environ_arg
2523
return get_environ_arg('GCP_BIGTABLE_INSTANCE_ID')
2624

2725

@@ -169,14 +167,14 @@ def generate_dataproc_submitter_args(user_args: str, job_type: DataprocJobType =
169167

170168
# include jar uri. should also already be in the bucket
171169
jar_uri = f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}" + \
172-
f"/jars/{ZIPLINE_GCP_ONLINE_JAR_DEFAULT}"
170+
f"/jars/{ZIPLINE_GCP_JAR_DEFAULT}"
173171

174172
final_args = "{user_args} --jar-uri={jar_uri} --job-type={job_type} --main-class={main_class}"
175173

176174
if job_type == DataprocJobType.FLINK:
177175
main_class = "ai.chronon.flink.FlinkJob"
178-
flink_jar_uri = f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}"\
179-
+ f"/jars/{ZIPLINE_GCP_FLINK_JAR_DEFAULT}"
176+
flink_jar_uri = f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}" \
177+
+ f"/jars/{ZIPLINE_GCP_FLINK_JAR_DEFAULT}"
180178
return final_args.format(
181179
user_args=user_args,
182180
jar_uri=jar_uri,

api/py/ai/chronon/repo/run.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
import xml.etree.ElementTree as ET
2828
from datetime import datetime, timedelta
2929

30-
from .gcp import generate_dataproc_submitter_args, get_gcp_project_id, get_gcp_bigtable_instance_id, \
30+
from ai.chronon.repo.gcp import generate_dataproc_submitter_args, get_gcp_project_id, get_gcp_bigtable_instance_id, \
3131
get_gcp_region_id, download_zipline_dataproc_jar, ZIPLINE_GCP_ONLINE_CLASS_DEFAULT, DATAPROC_ENTRY, \
32-
ZIPLINE_GCP_DATAPROC_SUBMITTER_JAR, ZIPLINE_GCP_SERVICE_JAR, ZIPLINE_GCP_ONLINE_JAR_DEFAULT
33-
from .utils import DataprocJobType, extract_filename_from_path, retry_decorator, get_customer_id
32+
ZIPLINE_GCP_SERVICE_JAR, ZIPLINE_GCP_JAR_DEFAULT
33+
from ai.chronon.repo.utils import DataprocJobType, extract_filename_from_path, retry_decorator, get_customer_id
3434

3535
ONLINE_ARGS = "--online-jar={online_jar} --online-class={online_class} "
3636
OFFLINE_ARGS = "--conf-path={conf_path} --end-date={ds} "
@@ -342,7 +342,8 @@ def set_runtime_env(params):
342342
environment["cli_args"]["APP_NAME"] = "_".join(
343343
[
344344
k
345-
for k in [
345+
for k in
346+
[
346347
"chronon",
347348
conf_type,
348349
params["mode"].replace("-", "_") if params["mode"] else None,
@@ -780,6 +781,7 @@ def set_defaults(ctx):
780781
@click.option("--env", required=False, default="dev", help="Running environment - default to be dev")
781782
@click.option("--mode", type=click.Choice(MODE_ARGS.keys()))
782783
@click.option("--dataproc", is_flag=True, help="Run on Dataproc in GCP")
784+
@click.option("--gcp", is_flag=True, help="Use GCP settings")
783785
@click.option("--ds", help="the end partition to backfill the data")
784786
@click.option("--app-name", help="app name. Default to {}".format(APP_NAME_TEMPLATE))
785787
@click.option("--start-ds", help="override the original start partition for a range backfill. "
@@ -789,7 +791,7 @@ def set_defaults(ctx):
789791
@click.option("--parallelism", help="break down the backfill range into this number of tasks in parallel. "
790792
"Please use it along with --start-ds and --end-ds and only in manual mode")
791793
@click.option("--repo", help="Path to chronon repo", default=".")
792-
@click.option("--online-jar", default=ZIPLINE_GCP_ONLINE_JAR_DEFAULT,
794+
@click.option("--online-jar", default=ZIPLINE_GCP_JAR_DEFAULT,
793795
help="Jar containing Online KvStore & Deserializer Impl. "
794796
"Used for streaming and metadata-upload mode.")
795797
@click.option("--online-class", default=ZIPLINE_GCP_ONLINE_CLASS_DEFAULT,
@@ -814,7 +816,7 @@ def set_defaults(ctx):
814816
help="Use a mocked data source instead of a real source for groupby-streaming Flink.")
815817
@click.option("--savepoint-uri", help="Savepoint URI for Flink streaming job")
816818
@click.pass_context
817-
def main(ctx, conf, env, mode, dataproc, ds, app_name, start_ds, end_ds, parallelism, repo, online_jar,
819+
def main(ctx, conf, env, mode, dataproc, gcp, ds, app_name, start_ds, end_ds, parallelism, repo, online_jar,
818820
online_class,
819821
version, spark_version, spark_submit_path, spark_streaming_submit_path, online_jar_fetch, sub_help, conf_type,
820822
online_args, chronon_jar, release_tag, list_apps, render_info, groupby_name, kafka_bootstrap, mock_source,
@@ -827,16 +829,16 @@ def main(ctx, conf, env, mode, dataproc, ds, app_name, start_ds, end_ds, paralle
827829
ctx.params["args"] = " ".join(unknown_args) + extra_args
828830
os.makedirs(ZIPLINE_DIRECTORY, exist_ok=True)
829831

830-
if dataproc:
831-
jar_path = download_zipline_dataproc_jar(ZIPLINE_DIRECTORY, get_customer_id(),
832-
ZIPLINE_GCP_DATAPROC_SUBMITTER_JAR)
833-
elif chronon_jar:
834-
jar_path = chronon_jar
835-
else:
832+
if dataproc or gcp:
833+
gcp_jar_path = download_zipline_dataproc_jar(ZIPLINE_DIRECTORY, get_customer_id(),
834+
ZIPLINE_GCP_JAR_DEFAULT)
836835
service_jar_path = download_zipline_dataproc_jar(ZIPLINE_DIRECTORY, get_customer_id(), ZIPLINE_GCP_SERVICE_JAR)
837-
chronon_gcp_jar_path = download_zipline_dataproc_jar(ZIPLINE_DIRECTORY, get_customer_id(),
838-
ZIPLINE_GCP_ONLINE_JAR_DEFAULT)
839-
jar_path = f"{service_jar_path}:{chronon_gcp_jar_path}"
836+
jar_path = f"{service_jar_path}:{gcp_jar_path}" if mode == 'fetch' else gcp_jar_path
837+
else:
838+
jar_path = chronon_jar
839+
840+
if not jar_path:
841+
raise ValueError("Jar path is not set.")
840842

841843
Runner(ctx.params, os.path.expanduser(jar_path)).run()
842844

cloud_gcp/BUILD.bazel

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,6 @@ scala_library(
4242
],
4343
)
4444

45-
jvm_binary(
46-
name = "cloud_gcp_submitter",
47-
main_class = "ai.chronon.integrations.cloud_gcp.DataprocSubmitter",
48-
runtime_deps = [
49-
":cloud_gcp_lib",
50-
scala_artifact_with_suffix("org.apache.iceberg:iceberg-spark-runtime-3.5")
51-
],
52-
)
53-
5445
test_deps = [
5546
":cloud_gcp_lib",
5647
"//api:thrift_java",

0 commit comments

Comments
 (0)