@@ -214,9 +214,7 @@ def download_jar(
214
214
spark_version = "2.4.0" ,
215
215
skip_download = False ,
216
216
):
217
- assert (
218
- spark_version in SUPPORTED_SPARK
219
- ), f"Received unsupported spark version { spark_version } . Supported spark versions are { SUPPORTED_SPARK } "
217
+ assert (spark_version in SUPPORTED_SPARK ), f"Received unsupported spark version { spark_version } . Supported spark versions are { SUPPORTED_SPARK } "
220
218
scala_version = SCALA_VERSION_FOR_SPARK [spark_version ]
221
219
maven_url_prefix = os .environ .get ("CHRONON_MAVEN_MIRROR_PREFIX" , None )
222
220
default_url_prefix = (
@@ -372,8 +370,7 @@ def set_runtime_env(params):
372
370
for k in [
373
371
"chronon" ,
374
372
conf_type ,
375
- params ["mode" ].replace (
376
- "-" , "_" ) if params ["mode" ] else None ,
373
+ params ["mode" ].replace ("-" , "_" ) if params ["mode" ] else None ,
377
374
]
378
375
if k is not None
379
376
]
@@ -432,9 +429,7 @@ def __init__(self, args, jar_path):
432
429
raise e
433
430
possible_modes = list (
434
431
ROUTES [self .conf_type ].keys ()) + UNIVERSAL_ROUTES
435
- assert (
436
- args ["mode" ] in possible_modes
437
- ), "Invalid mode:{} for conf:{} of type:{}, please choose from {}" .format (
432
+ assert (args ["mode" ] in possible_modes ), "Invalid mode:{} for conf:{} of type:{}, please choose from {}" .format (
438
433
args ["mode" ], self .conf , self .conf_type , possible_modes
439
434
)
440
435
else :
@@ -518,9 +513,7 @@ def run(self):
518
513
)
519
514
)
520
515
if self .mode == "streaming" :
521
- assert (
522
- len (filtered_apps ) == 1
523
- ), "More than one found, please kill them all"
516
+ assert (len (filtered_apps ) == 1 ), "More than one found, please kill them all"
524
517
print ("All good. No need to start a new app." )
525
518
return
526
519
elif self .mode == "streaming-client" :
@@ -556,10 +549,7 @@ def run(self):
556
549
)
557
550
for start_ds , end_ds in date_ranges :
558
551
if not self .dataproc :
559
- command = (
560
- "bash {script} --class ai.chronon.spark.Driver {jar} {subcommand} {args} "
561
- + "{additional_args}"
562
- ).format (
552
+ command = ("bash {script} --class ai.chronon.spark.Driver {jar} {subcommand} {args} " + "{additional_args}" ).format (
563
553
script = self .spark_submit ,
564
554
jar = self .jar_path ,
565
555
subcommand = ROUTES [self .conf_type ][self .mode ],
@@ -593,9 +583,6 @@ def run(self):
593
583
if self .conf :
594
584
local_files_to_upload_to_gcs .append (
595
585
self .conf )
596
- # upload teams.json to gcs
597
- local_files_to_upload_to_gcs .append (
598
- get_teams_json_file_path (self .repo ))
599
586
600
587
dataproc_command = generate_dataproc_submitter_args (
601
588
local_files_to_upload_to_gcs = [self .conf ],
@@ -608,8 +595,8 @@ def run(self):
608
595
else :
609
596
if not self .dataproc :
610
597
command = (
611
- "bash {script} --class ai.chronon.spark.Driver {jar} {subcommand} {args} " +
612
- "{additional_args}"
598
+ "bash {script} --class ai.chronon.spark.Driver {jar} {subcommand} {args} " +
599
+ "{additional_args}"
613
600
).format (
614
601
script = self .spark_submit ,
615
602
jar = self .jar_path ,
@@ -641,10 +628,6 @@ def run(self):
641
628
if self .conf :
642
629
local_files_to_upload_to_gcs .append (self .conf )
643
630
644
- # upload teams.json to gcs
645
- local_files_to_upload_to_gcs .append (
646
- get_teams_json_file_path (self .repo ))
647
-
648
631
dataproc_command = generate_dataproc_submitter_args (
649
632
# for now, self.conf is the only local file that requires uploading to gcs
650
633
local_files_to_upload_to_gcs = local_files_to_upload_to_gcs ,
@@ -677,7 +660,7 @@ def _gen_final_args(self, start_ds=None, end_ds=None, override_conf_path=None):
677
660
)
678
661
679
662
final_args = base_args + " " + \
680
- str (self .args ) + override_start_partition_arg
663
+ str (self .args ) + override_start_partition_arg
681
664
682
665
return final_args
683
666
@@ -692,8 +675,8 @@ def split_date_range(start_date, end_date, parallelism):
692
675
if start_date > end_date :
693
676
raise ValueError ("Start date should be earlier than end date" )
694
677
total_days = (
695
- end_date - start_date
696
- ).days + 1 # +1 to include the end_date in the range
678
+ end_date - start_date
679
+ ).days + 1 # +1 to include the end_date in the range
697
680
698
681
# Check if parallelism is greater than total_days
699
682
if parallelism > total_days :
0 commit comments