Skip to content

Commit 4714b21

Browse files
committed
coderabbit good idea
1 parent 8fa05be commit 4714b21

File tree

1 file changed

+14
-64
lines changed

1 file changed

+14
-64
lines changed

api/py/ai/chronon/repo/run.py

Lines changed: 14 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@
135135
ZIPLINE_ONLINE_CLASS_DEFAULT = "ai.chronon.integrations.cloud_gcp.GcpApiImpl"
136136
ZIPLINE_FLINK_JAR_DEFAULT = "flink-assembly-0.1.0-SNAPSHOT.jar"
137137
ZIPLINE_DATAPROC_SUBMITTER_JAR = "cloud_gcp_submitter_deploy.jar"
138+
ZIPLINE_SERVICE_JAR = "service-0.1.0-SNAPSHOT.jar"
138139

139140
ZIPLINE_DIRECTORY = "/tmp/zipline"
140141

@@ -864,78 +865,27 @@ def generate_dataproc_submitter_args(user_args: str, job_type: DataprocJobType =
864865
raise ValueError(f"Invalid job type: {job_type}")
865866

866867

867-
def download_dataproc_submitter_jar(destination_dir: str, customer_id: str):
868+
def download_zipline_jar(destination_dir: str, customer_id: str, jar_name: str):
868869
bucket_name = f"zipline-artifacts-{customer_id}"
869870

870-
file_name = ZIPLINE_DATAPROC_SUBMITTER_JAR
871-
872-
source_blob_name = f"jars/{file_name}"
873-
dataproc_jar_destination_path = f"{destination_dir}/{file_name}"
874-
875-
are_identical = compare_gcs_and_local_file_hashes(bucket_name, source_blob_name,
876-
dataproc_jar_destination_path) if os.path.exists(
877-
dataproc_jar_destination_path) else False
878-
879-
if are_identical:
880-
print(
881-
f"{dataproc_jar_destination_path} matches GCS {bucket_name}/{source_blob_name}")
882-
else:
883-
print(
884-
f"{dataproc_jar_destination_path} does NOT match GCS {bucket_name}/{source_blob_name}")
885-
print("Downloading dataproc submitter jar from GCS...")
886-
download_gcs_blob(bucket_name, source_blob_name,
887-
dataproc_jar_destination_path)
888-
889-
return dataproc_jar_destination_path
890-
891-
892-
def download_chronon_gcp_jar(destination_dir: str, customer_id: str):
893-
bucket_name = f"zipline-artifacts-{customer_id}"
894-
895-
file_name = ZIPLINE_ONLINE_JAR_DEFAULT
896-
897-
source_blob_name = f"jars/{file_name}"
898-
chronon_gcp_jar_destination_path = f"{destination_dir}/{file_name}"
899-
900-
are_identical = compare_gcs_and_local_file_hashes(bucket_name, source_blob_name,
901-
chronon_gcp_jar_destination_path) if os.path.exists(
902-
chronon_gcp_jar_destination_path) else False
903-
904-
if are_identical:
905-
print(
906-
f"{chronon_gcp_jar_destination_path} matches GCS {bucket_name}/{source_blob_name}")
907-
else:
908-
print(
909-
f"{chronon_gcp_jar_destination_path} does NOT match GCS {bucket_name}/{source_blob_name}")
910-
print("Downloading chronon gcp jar from GCS...")
911-
download_gcs_blob(bucket_name, source_blob_name,
912-
chronon_gcp_jar_destination_path)
913-
return chronon_gcp_jar_destination_path
914-
915-
916-
def download_service_jar(destination_dir: str, customer_id: str):
917-
bucket_name = f"zipline-artifacts-{customer_id}"
918-
919-
file_name = "service-0.1.0-SNAPSHOT.jar"
920-
921-
source_blob_name = f"jars/{file_name}"
922-
service_jar_destination_path = f"{destination_dir}/{file_name}"
871+
source_blob_name = f"jars/{jar_name}"
872+
destination_path = f"{destination_dir}/{jar_name}"
923873

924874
are_identical = compare_gcs_and_local_file_hashes(bucket_name, source_blob_name,
925-
service_jar_destination_path) if os.path.exists(
926-
service_jar_destination_path) else False
875+
destination_path) if os.path.exists(
876+
destination_path) else False
927877

928878
if are_identical:
929879
print(
930-
f"{service_jar_destination_path} matches GCS {bucket_name}/{source_blob_name}")
880+
f"{destination_path} matches GCS {bucket_name}/{source_blob_name}")
931881
else:
932882
print(
933-
f"{service_jar_destination_path} does NOT match GCS {bucket_name}/{source_blob_name}")
934-
print("Downloading service jar from GCS...")
883+
f"{destination_path} does NOT match GCS {bucket_name}/{source_blob_name}")
884+
print(f"Downloading {jar_name} from GCS...")
935885

936886
download_gcs_blob(bucket_name, source_blob_name,
937-
service_jar_destination_path)
938-
return service_jar_destination_path
887+
destination_path)
888+
return destination_path
939889

940890

941891
@retry_decorator(retries=2, backoff=5)
@@ -1085,12 +1035,12 @@ def main(ctx, conf, env, mode, dataproc, ds, app_name, start_ds, end_ds, paralle
10851035
os.makedirs(ZIPLINE_DIRECTORY, exist_ok=True)
10861036

10871037
if dataproc:
1088-
jar_path = download_dataproc_submitter_jar(ZIPLINE_DIRECTORY, get_customer_id())
1038+
jar_path = download_zipline_jar(ZIPLINE_DIRECTORY, get_customer_id(), ZIPLINE_DATAPROC_SUBMITTER_JAR)
10891039
elif chronon_jar:
10901040
jar_path = chronon_jar
10911041
else:
1092-
service_jar_path = download_service_jar(ZIPLINE_DIRECTORY, get_customer_id())
1093-
chronon_gcp_jar_path = download_chronon_gcp_jar(ZIPLINE_DIRECTORY, get_customer_id())
1042+
service_jar_path = download_zipline_jar(ZIPLINE_DIRECTORY, get_customer_id(), ZIPLINE_SERVICE_JAR)
1043+
chronon_gcp_jar_path = download_zipline_jar(ZIPLINE_DIRECTORY, get_customer_id(), ZIPLINE_ONLINE_JAR_DEFAULT)
10941044
jar_path = f"{service_jar_path}:{chronon_gcp_jar_path}"
10951045

10961046
Runner(ctx.params, os.path.expanduser(jar_path)).run()

0 commit comments

Comments
 (0)