17
17
# KIND, either express or implied. See the License for the
18
18
# specific language governing permissions and limitations
19
19
# under the License.
20
+ """
21
+ This CLI manages the execution of a topology binary.
22
+
23
+ """
20
24
21
- """ The Heron executor is a process that runs on a container and is responsible for starting and
22
- monitoring the processes of the topology and it's support services."""
23
25
import argparse
24
26
import atexit
25
27
import base64
38
40
import socket
39
41
import traceback
40
42
import itertools
41
- import yaml
42
43
43
44
from heron .common .src .python .utils import log
44
45
from heron .common .src .python .utils import proc
48
49
from heron .statemgrs .src .python import configloader
49
50
from heron .statemgrs .src .python .config import Config as StateMgrConfig
50
51
52
+ import click
53
+ import yaml
54
+
51
55
Log = log .Log
52
56
53
57
# pylint: disable=too-many-lines
54
58
55
- def print_usage ():
56
- print (
57
- "Usage: ./heron-executor --shard=<shardid> --topology-name=<topname>"
58
- " --topology-id=<topid> --topology-defn-file=<topdefnfile>"
59
- " --state-manager-connection=<state_manager_connection>"
60
- " --state-manager-root=<state_manager_root>"
61
- " --state-manager-config-file=<state_manager_config_file>"
62
- " --tmaster-binary=<tmaster_binary>"
63
- " --stmgr-binary=<stmgr_binary> --metrics-manager-classpath=<metricsmgr_classpath>"
64
- " --instance-jvm-opts=<instance_jvm_opts_in_base64> --classpath=<classpath>"
65
- " --master-port=<master_port> --tmaster-controller-port=<tmaster_controller_port>"
66
- " --tmaster-stats-port=<tmaster_stats_port>"
67
- " --heron-internals-config-file=<heron_internals_config_file>"
68
- " --override-config-file=<override_config_file> --component-ram-map=<component_ram_map>"
69
- " --component-jvm-opts=<component_jvm_opts_in_base64> --pkg-type=<pkg_type>"
70
- " --topology-binary-file=<topology_bin_file> --heron-java-home=<heron_java_home>"
71
- " --shell-port=<shell-port> --heron-shell-binary=<heron_shell_binary>"
72
- " --metrics-manager-port=<metricsmgr_port>"
73
- " --cluster=<cluster> --role=<role> --environment=<environ>"
74
- " --instance-classpath=<instance_classpath>"
75
- " --metrics-sinks-config-file=<metrics_sinks_config_file>"
76
- " --scheduler-classpath=<scheduler_classpath> --scheduler-port=<scheduler_port>"
77
- " --python-instance-binary=<python_instance_binary>"
78
- " --metricscache-manager-classpath=<metricscachemgr_classpath>"
79
- " --metricscache-manager-master-port=<metricscachemgr_masterport>"
80
- " --metricscache-manager-stats-port=<metricscachemgr_statsport>"
81
- " --is-stateful=<is_stateful> --checkpoint-manager-classpath=<ckptmgr_classpath>"
82
- " --checkpoint-manager-port=<ckptmgr_port> --checkpoint-manager-ram=<checkpoint_manager_ram>"
83
- " --stateful-config-file=<stateful_config_file>"
84
- " --health-manager-mode=<healthmgr_mode> --health-manager-classpath=<healthmgr_classpath>"
85
- " --cpp-instance-binary=<cpp_instance_binary>"
86
- " --jvm-remote-debugger-ports=<comma_seperated_port_list>" )
59
+ @click .command ()
60
+ @click .option ("--cluster" , required = True )
61
+ @click .option ("--role" , required = True )
62
+ @click .option ("--environment" , required = True )
63
+ @click .option ("--checkpoint-manager-classpath" , required = True )
64
+ @click .option ("--checkpoint-manager-port" , required = True )
65
+ @click .option ("--checkpoint-manager-ram" , type = int , required = True )
66
+ @click .option ("--classpath" , required = True )
67
+ @click .option ("--component-jvm-opts" , required = True )
68
+ @click .option ("--component-ram-map" , required = True )
69
+ @click .option ("--cpp-instance-binary" , required = True )
70
+ @click .option ("--health-manager-classpath" , required = True )
71
+ @click .option ("--health-manager-mode" , required = True )
72
+ @click .option ("--heron-internals-config-file" , required = True )
73
+ @click .option ("--heron-java-home" , required = True )
74
+ @click .option ("--heron-shell-binary" , required = True )
75
+ @click .option ("--instance-classpath" , required = True )
76
+ @click .option ("--instance-jvm-opts" , required = True )
77
+ @click .option ("--is-stateful" , required = True )
78
+ @click .option ("--master-port" , required = True )
79
+ @click .option ("--metrics-manager-classpath" , required = True )
80
+ @click .option ("--metrics-manager-port" , required = True )
81
+ @click .option ("--metrics-sinks-config-file" , required = True )
82
+ @click .option ("--metricscache-manager-classpath" , required = True )
83
+ @click .option ("--metricscache-manager-master-port" , required = True )
84
+ @click .option ("--metricscache-manager-mode" , required = False )
85
+ @click .option ("--metricscache-manager-stats-port" , required = True )
86
+ @click .option ("--override-config-file" , required = True )
87
+ @click .option ("--pkg-type" , required = True )
88
+ @click .option ("--python-instance-binary" , required = True )
89
+ @click .option ("--scheduler-classpath" , required = True )
90
+ @click .option ("--scheduler-port" , required = True )
91
+ @click .option ("--shard" , type = int , required = True )
92
+ @click .option ("--shell-port" , required = True )
93
+ @click .option ("--state-manager-config-file" , required = True )
94
+ @click .option ("--state-manager-connection" , required = True )
95
+ @click .option ("--state-manager-root" , required = True )
96
+ @click .option ("--stateful-config-file" , required = True )
97
+ @click .option ("--stmgr-binary" , required = True )
98
+ @click .option ("--tmaster-binary" , required = True )
99
+ @click .option ("--tmaster-controller-port" , required = True )
100
+ @click .option ("--tmaster-stats-port" , required = True )
101
+ @click .option ("--topology-binary-file" , required = True )
102
+ @click .option ("--topology-defn-file" , required = True )
103
+ @click .option ("--topology-id" , required = True )
104
+ @click .option ("--topology-name" , required = True )
105
+ @click .option ("--jvm-remote-debugger-ports" ,
106
+ help = "comma separated list of ports to be used"
107
+ " by a remote debugger for JVM instances" )
108
+ def cli (
109
+ ** kwargs : dict ,
110
+ ) -> None :
111
+ """
112
+ The Heron executor is a process that runs on a container and is responsible for
113
+ starting and monitoring the processes of the topology and it's support services.
114
+
115
+ """
116
+ # Since Heron on YARN runs as headless users, pex compiled
117
+ # binaries should be exploded into the container working
118
+ # directory. In order to do this, we need to set the
119
+ # PEX_ROOT shell environment before forking the processes
120
+ shell_env = os .environ .copy ()
121
+ shell_env ["PEX_ROOT" ] = os .path .join (os .path .abspath ('.' ), ".pex" )
122
+
123
+ parsed_args = argparse .Namespace (** kwargs )
124
+ # Instantiate the executor, bind it to signal handlers and launch it
125
+ executor = HeronExecutor (parsed_args , shell_env )
126
+ executor .initialize ()
127
+
128
+ start (executor )
87
129
88
130
def id_map (prefix , container_plans , add_zero_id = False ):
89
131
ids = {}
@@ -304,8 +346,7 @@ def init_from_parsed_args(self, parsed_args):
304
346
parsed_args .jvm_remote_debugger_ports .split ("," ) \
305
347
if parsed_args .jvm_remote_debugger_ports else None
306
348
307
- def __init__ (self , args , shell_env ):
308
- parsed_args = self .parse_args (args )
349
+ def __init__ (self , parsed_args , shell_env ):
309
350
self .init_from_parsed_args (parsed_args )
310
351
311
352
self .shell_env = shell_env
@@ -330,69 +371,6 @@ def __init__(self, args, shell_env):
330
371
self .state_managers = []
331
372
self .jvm_version = None
332
373
333
- @staticmethod
334
- def parse_args (args ):
335
- """Uses python argparse to collect positional args"""
336
- Log .info ("Input args: %r" % args )
337
-
338
- parser = argparse .ArgumentParser ()
339
-
340
- parser .add_argument ("--shard" , type = int , required = True )
341
- parser .add_argument ("--topology-name" , required = True )
342
- parser .add_argument ("--topology-id" , required = True )
343
- parser .add_argument ("--topology-defn-file" , required = True )
344
- parser .add_argument ("--state-manager-connection" , required = True )
345
- parser .add_argument ("--state-manager-root" , required = True )
346
- parser .add_argument ("--state-manager-config-file" , required = True )
347
- parser .add_argument ("--tmaster-binary" , required = True )
348
- parser .add_argument ("--stmgr-binary" , required = True )
349
- parser .add_argument ("--metrics-manager-classpath" , required = True )
350
- parser .add_argument ("--instance-jvm-opts" , required = True )
351
- parser .add_argument ("--classpath" , required = True )
352
- parser .add_argument ("--master-port" , required = True )
353
- parser .add_argument ("--tmaster-controller-port" , required = True )
354
- parser .add_argument ("--tmaster-stats-port" , required = True )
355
- parser .add_argument ("--heron-internals-config-file" , required = True )
356
- parser .add_argument ("--override-config-file" , required = True )
357
- parser .add_argument ("--component-ram-map" , required = True )
358
- parser .add_argument ("--component-jvm-opts" , required = True )
359
- parser .add_argument ("--pkg-type" , required = True )
360
- parser .add_argument ("--topology-binary-file" , required = True )
361
- parser .add_argument ("--heron-java-home" , required = True )
362
- parser .add_argument ("--shell-port" , required = True )
363
- parser .add_argument ("--heron-shell-binary" , required = True )
364
- parser .add_argument ("--metrics-manager-port" , required = True )
365
- parser .add_argument ("--cluster" , required = True )
366
- parser .add_argument ("--role" , required = True )
367
- parser .add_argument ("--environment" , required = True )
368
- parser .add_argument ("--instance-classpath" , required = True )
369
- parser .add_argument ("--metrics-sinks-config-file" , required = True )
370
- parser .add_argument ("--scheduler-classpath" , required = True )
371
- parser .add_argument ("--scheduler-port" , required = True )
372
- parser .add_argument ("--python-instance-binary" , required = True )
373
- parser .add_argument ("--cpp-instance-binary" , required = True )
374
- parser .add_argument ("--metricscache-manager-classpath" , required = True )
375
- parser .add_argument ("--metricscache-manager-master-port" , required = True )
376
- parser .add_argument ("--metricscache-manager-stats-port" , required = True )
377
- parser .add_argument ("--metricscache-manager-mode" , required = False )
378
- parser .add_argument ("--is-stateful" , required = True )
379
- parser .add_argument ("--checkpoint-manager-classpath" , required = True )
380
- parser .add_argument ("--checkpoint-manager-port" , required = True )
381
- parser .add_argument ("--checkpoint-manager-ram" , type = int , required = True )
382
- parser .add_argument ("--stateful-config-file" , required = True )
383
- parser .add_argument ("--health-manager-mode" , required = True )
384
- parser .add_argument ("--health-manager-classpath" , required = True )
385
- parser .add_argument ("--jvm-remote-debugger-ports" , required = False ,
386
- help = "ports to be used by a remote debugger for JVM instances" )
387
-
388
- parsed_args , unknown_args = parser .parse_known_args (args [1 :])
389
-
390
- if unknown_args :
391
- Log .warn ('Unknown arguments found!!! They are: %s' % unknown_args )
392
- Log .warn (parser .format_help ())
393
-
394
- return parsed_args
395
-
396
374
def run_command_or_exit (self , command ):
397
375
if self ._run_blocking_process (command , True ) != 0 :
398
376
Log .error ("Failed to run command: %s. Exiting" % command )
@@ -982,7 +960,7 @@ def start_process_monitor(self):
982
960
# Now wait for any child to die
983
961
Log .info ("Start process monitor" )
984
962
while True :
985
- if len ( self .processes_to_monitor ) > 0 :
963
+ if self .processes_to_monitor :
986
964
(pid , status ) = os .wait ()
987
965
988
966
with self .process_lock :
@@ -1100,7 +1078,7 @@ def start_state_manager_watches(self):
1100
1078
with open (self .override_config_file , 'r' ) as stream :
1101
1079
overrides = yaml .load (stream )
1102
1080
if overrides is None :
1103
- overrides = dict ()
1081
+ overrides = {}
1104
1082
overrides ["heron.statemgr.connection.string" ] = self .state_manager_connection
1105
1083
1106
1084
statemgr_config = StateMgrConfig ()
@@ -1201,20 +1179,5 @@ def start(executor):
1201
1179
# they are dead. This is the main loop of executor
1202
1180
executor .start_process_monitor ()
1203
1181
1204
- def main ():
1205
- """Register exit handlers, initialize the executor and run it."""
1206
- # Since Heron on YARN runs as headless users, pex compiled
1207
- # binaries should be exploded into the container working
1208
- # directory. In order to do this, we need to set the
1209
- # PEX_ROOT shell environment before forking the processes
1210
- shell_env = os .environ .copy ()
1211
- shell_env ["PEX_ROOT" ] = os .path .join (os .path .abspath ('.' ), ".pex" )
1212
-
1213
- # Instantiate the executor, bind it to signal handlers and launch it
1214
- executor = HeronExecutor (sys .argv , shell_env )
1215
- executor .initialize ()
1216
-
1217
- start (executor )
1218
-
1219
1182
if __name__ == "__main__" :
1220
- main ()
1183
+ cli ()
0 commit comments