Skip to content
This repository was archived by the owner on Mar 3, 2023. It is now read-only.

Commit 869c82e

Browse files
authored
make python code compatible with both python2 and python3 (#3412)
* make code compatible with both python2 and 3
1 parent 468aed1 commit 869c82e

File tree

84 files changed

+331
-307
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+331
-307
lines changed

examples/src/python/bolt/stateful_count_bolt.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def init_state(self, stateful_state):
3636
self.logger.info("Checkpoint Snapshot recovered : %s" % str(self.recovered_state))
3737

3838
def pre_save(self, checkpoint_id):
39-
for (k, v) in self.counter.items():
39+
for (k, v) in list(self.counter.items()):
4040
self.recovered_state.put(k, v)
4141
self.logger.info("Checkpoint Snapshot %s : %s" % (checkpoint_id, str(self.recovered_state)))
4242

examples/src/python/spout/stateful_word_spout.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def init_state(self, stateful_state):
3737

3838
def pre_save(self, checkpoint_id):
3939
# Purely for debugging purposes
40-
for (k, v) in self.counter.items():
40+
for (k, v) in list(self.counter.items()):
4141
self.recovered_state.put(k, v)
4242
self.logger.info("Checkpoint Snapshot %s : %s" % (checkpoint_id, str(self.recovered_state)))
4343

heron/executor/src/python/heron_executor.py

+19-20
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,10 @@ def init_from_parsed_args(self, parsed_args):
236236
self.tmaster_stats_port = parsed_args.tmaster_stats_port
237237
self.heron_internals_config_file = parsed_args.heron_internals_config_file
238238
self.override_config_file = parsed_args.override_config_file
239-
self.component_ram_map =\
240-
map(lambda x: {x.split(':')[0]:
241-
int(x.split(':')[1])}, parsed_args.component_ram_map.split(','))
242-
self.component_ram_map =\
243-
functools.reduce(lambda x, y: dict(x.items() + y.items()), self.component_ram_map)
239+
self.component_ram_map = [{x.split(':')[0]:int(x.split(':')[1])}
240+
for x in parsed_args.component_ram_map.split(',')]
241+
self.component_ram_map = functools.reduce(lambda x, y: dict(list(x.items()) + list(y.items())),
242+
self.component_ram_map)
244243

245244
# component_jvm_opts_in_base64 itself is a base64-encoding-json-map, which is appended with
246245
# " at the start and end. It also escapes "=" to "&equals" due to aurora limitation
@@ -256,7 +255,7 @@ def init_from_parsed_args(self, parsed_args):
256255
base64.b64decode(parsed_args.component_jvm_opts.
257256
lstrip('"').rstrip('"').replace('(61)', '=').replace('=', '='))
258257
if component_jvm_opts_in_json != "":
259-
for (k, v) in json.loads(component_jvm_opts_in_json).items():
258+
for (k, v) in list(json.loads(component_jvm_opts_in_json).items()):
260259
# In json, the component name and JVM options are still in base64 encoding
261260
self.component_jvm_opts[base64.b64decode(k)] = base64.b64decode(v)
262261

@@ -366,7 +365,7 @@ def parse_args(args):
366365
parser.add_argument("--is-stateful", required=True)
367366
parser.add_argument("--checkpoint-manager-classpath", required=True)
368367
parser.add_argument("--checkpoint-manager-port", required=True)
369-
parser.add_argument("--checkpoint-manager-ram", type=long, required=True)
368+
parser.add_argument("--checkpoint-manager-ram", type=int, required=True)
370369
parser.add_argument("--stateful-config-file", required=True)
371370
parser.add_argument("--health-manager-mode", required=True)
372371
parser.add_argument("--health-manager-classpath", required=True)
@@ -793,7 +792,7 @@ def _get_streaming_processes(self):
793792
'--zkhostportlist=%s' % self.state_manager_connection,
794793
'--zkroot=%s' % self.state_manager_root,
795794
'--stmgr_id=%s' % self.stmgr_ids[self.shard],
796-
'--instance_ids=%s' % ','.join(map(lambda x: x[0], instance_info)),
795+
'--instance_ids=%s' % ','.join([x[0] for x in instance_info]),
797796
'--myhost=%s' % self.master_host,
798797
'--data_port=%s' % str(self.master_port),
799798
'--local_data_port=%s' % str(self.tmaster_controller_port),
@@ -958,8 +957,8 @@ def _run_blocking_process(self, cmd, is_shell=False):
958957
def _kill_processes(self, commands):
959958
# remove the command from processes_to_monitor and kill the process
960959
with self.process_lock:
961-
for command_name, command in commands.items():
962-
for process_info in self.processes_to_monitor.values():
960+
for command_name, command in list(commands.items()):
961+
for process_info in list(self.processes_to_monitor.values()):
963962
if process_info.name == command_name:
964963
del self.processes_to_monitor[process_info.pid]
965964
Log.info("Killing %s process with pid %d: %s" %
@@ -978,7 +977,7 @@ def _start_processes(self, commands):
978977
Log.info("Start processes")
979978
processes_to_monitor = {}
980979
# First start all the processes
981-
for (name, command) in commands.items():
980+
for (name, command) in list(commands.items()):
982981
p = self._run_process(name, command)
983982
processes_to_monitor[p.pid] = ProcessInfo(p, name, command)
984983

@@ -999,7 +998,7 @@ def start_process_monitor(self):
999998
(pid, status) = os.wait()
1000999

10011000
with self.process_lock:
1002-
if pid in self.processes_to_monitor.keys():
1001+
if pid in list(self.processes_to_monitor.keys()):
10031002
old_process_info = self.processes_to_monitor[pid]
10041003
name = old_process_info.name
10051004
command = old_process_info.command
@@ -1061,19 +1060,19 @@ def get_command_changes(self, current_commands, updated_commands):
10611060

10621061
# if the current command has a matching command in the updated commands we keep it
10631062
# otherwise we kill it
1064-
for current_name, current_command in current_commands.items():
1063+
for current_name, current_command in list(current_commands.items()):
10651064
# We don't restart tmaster since it watches the packing plan and updates itself. The stream
10661065
# manager is restarted just to reset state, but we could update it to do so without a restart
1067-
if current_name in updated_commands.keys() and \
1066+
if current_name in list(updated_commands.keys()) and \
10681067
current_command == updated_commands[current_name] and \
10691068
not current_name.startswith('stmgr-'):
10701069
commands_to_keep[current_name] = current_command
10711070
else:
10721071
commands_to_kill[current_name] = current_command
10731072

10741073
# updated commands not in the keep list need to be started
1075-
for updated_name, updated_command in updated_commands.items():
1076-
if updated_name not in commands_to_keep.keys():
1074+
for updated_name, updated_command in list(updated_commands.items()):
1075+
if updated_name not in list(commands_to_keep.keys()):
10771076
commands_to_start[updated_name] = updated_command
10781077

10791078
return commands_to_kill, commands_to_keep, commands_to_start
@@ -1083,8 +1082,8 @@ def launch(self):
10831082
Then starts new ones required and kills old ones no longer required.
10841083
'''
10851084
with self.process_lock:
1086-
current_commands = dict(map((lambda process: (process.name, process.command)),
1087-
self.processes_to_monitor.values()))
1085+
current_commands = dict(list(map((lambda process: (process.name, process.command)),
1086+
list(self.processes_to_monitor.values()))))
10881087
updated_commands = self.get_commands_to_run()
10891088

10901089
# get the commands to kill, keep and start
@@ -1176,7 +1175,7 @@ def cleanup():
11761175
Log.info('Executor terminated; exiting all process in executor.')
11771176

11781177
# Kill child processes first and wait for log collection to finish
1179-
for pid in executor.processes_to_monitor.keys():
1178+
for pid in list(executor.processes_to_monitor.keys()):
11801179
os.kill(pid, signal.SIGTERM)
11811180
time.sleep(5)
11821181

@@ -1192,7 +1191,7 @@ def cleanup():
11921191
sid = os.getsid(pid)
11931192

11941193
# POSIX prohibits the change of the process group ID of a session leader
1195-
if pid <> sid:
1194+
if pid != sid:
11961195
Log.info('Set up process group; executor becomes leader')
11971196
os.setpgrp() # create new process group, become its leader
11981197

heron/executor/tests/python/heron_executor_unittest.py

+20-24
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def get_expected_shell_command(container_id):
9292

9393
def build_packing_plan(self, instance_distribution):
9494
packing_plan = PackingPlan()
95-
for container_id in instance_distribution.keys():
95+
for container_id in list(instance_distribution.keys()):
9696
container_plan = packing_plan.container_plans.add()
9797
container_plan.id = int(container_id)
9898
for (component_name, global_task_id, component_index) in instance_distribution[container_id]:
@@ -293,11 +293,11 @@ def get_args(shard_id):
293293
def test_update_packing_plan(self):
294294
self.executor_0.update_packing_plan(self.packing_plan_expected)
295295

296-
self.assertEquals(self.packing_plan_expected, self.executor_0.packing_plan)
297-
self.assertEquals({1: "stmgr-1", 7: "stmgr-7"}, self.executor_0.stmgr_ids)
298-
self.assertEquals(
296+
self.assertEqual(self.packing_plan_expected, self.executor_0.packing_plan)
297+
self.assertEqual({1: "stmgr-1", 7: "stmgr-7"}, self.executor_0.stmgr_ids)
298+
self.assertEqual(
299299
{0: "metricsmgr-0", 1: "metricsmgr-1", 7: "metricsmgr-7"}, self.executor_0.metricsmgr_ids)
300-
self.assertEquals(
300+
self.assertEqual(
301301
{0: "heron-shell-0", 1: "heron-shell-1", 7: "heron-shell-7"}, self.executor_0.heron_shell_ids)
302302

303303
def test_launch_container_0(self):
@@ -315,17 +315,13 @@ def do_test_launch(self, executor, expected_processes):
315315
monitored_processes = executor.processes_to_monitor
316316

317317
# convert to (pid, name, command)
318-
found_processes = list(map(lambda process_info:
319-
(process_info.pid, process_info.name, process_info.command_str),
320-
executor.processes))
321-
found_monitored = list(map(lambda pinfo:
322-
(pinfo[0], pinfo[1].name, pinfo[1].command_str),
323-
monitored_processes.items()))
318+
found_processes = list([(process_info.pid, process_info.name, process_info.command_str) for process_info in executor.processes])
319+
found_monitored = list([(pinfo[0], pinfo[1].name, pinfo[1].command_str) for pinfo in list(monitored_processes.items())])
324320
found_processes.sort(key=lambda tuple: tuple[0])
325321
found_monitored.sort(key=lambda tuple: tuple[0])
326322
print("do_test_commands - found_processes: %s found_monitored: %s" \
327323
% (found_processes, found_monitored))
328-
self.assertEquals(found_processes, found_monitored)
324+
self.assertEqual(found_processes, found_monitored)
329325

330326
print("do_test_commands - expected_processes: %s monitored_processes: %s" \
331327
% (expected_processes, monitored_processes))
@@ -337,18 +333,18 @@ def test_change_instance_dist_container_1(self):
337333
current_commands = self.executor_1.get_commands_to_run()
338334

339335
temp_dict = dict(
340-
map((lambda process_info: (process_info.name, process_info.command.split(' '))),
341-
self.expected_processes_container_1))
336+
list(map((lambda process_info: (process_info.name, process_info.command.split(' '))),
337+
self.expected_processes_container_1)))
342338

343339
current_json = json.dumps(current_commands, sort_keys=True, cls=CommandEncoder).split(' ')
344340
temp_json = json.dumps(temp_dict, sort_keys=True).split(' ')
345341

346-
print ("current_json: %s" % current_json)
347-
print ("temp_json: %s" % temp_json)
342+
print("current_json: %s" % current_json)
343+
print("temp_json: %s" % temp_json)
348344

349345
# better test error report
350346
for (s1, s2) in zip(current_json, temp_json):
351-
self.assertEquals(s1, s2)
347+
self.assertEqual(s1, s2)
352348

353349
# update instance distribution
354350
new_packing_plan = self.build_packing_plan(
@@ -360,20 +356,20 @@ def test_change_instance_dist_container_1(self):
360356
commands_to_kill, commands_to_keep, commands_to_start = \
361357
self.executor_1.get_command_changes(current_commands, updated_commands)
362358

363-
self.assertEquals(['container_1_exclaim1_2', 'stmgr-1'], sorted(commands_to_kill.keys()))
364-
self.assertEquals(
359+
self.assertEqual(['container_1_exclaim1_2', 'stmgr-1'], sorted(commands_to_kill.keys()))
360+
self.assertEqual(
365361
['container_1_exclaim1_1', 'container_1_word_3', 'heron-shell-1', 'metricsmgr-1'],
366362
sorted(commands_to_keep.keys()))
367-
self.assertEquals(['container_1_word_2', 'stmgr-1'], sorted(commands_to_start.keys()))
363+
self.assertEqual(['container_1_word_2', 'stmgr-1'], sorted(commands_to_start.keys()))
368364

369365
def assert_processes(self, expected_processes, found_processes):
370-
self.assertEquals(len(expected_processes), len(found_processes))
366+
self.assertEqual(len(expected_processes), len(found_processes))
371367
for expected_process in expected_processes:
372368
self.assert_process(expected_process, found_processes)
373369

374370
def assert_process(self, expected_process, found_processes):
375371
pid = expected_process.pid
376372
self.assertTrue(found_processes[pid])
377-
self.assertEquals(expected_process.name, found_processes[pid].name)
378-
self.assertEquals(expected_process.command, found_processes[pid].command_str)
379-
self.assertEquals(1, found_processes[pid].attempts)
373+
self.assertEqual(expected_process.name, found_processes[pid].name)
374+
self.assertEqual(expected_process.command, found_processes[pid].command_str)
375+
self.assertEqual(1, found_processes[pid].attempts)

heron/instance/src/python/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pex_binary(
1919
deps = [":instance-py"],
2020
reqs = [
2121
'colorlog==2.6.1',
22+
'future==0.18.2',
2223
'PyYAML==3.13'
2324
]
2425
)

heron/instance/src/python/basics/__init__.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@
1717
'''module for basic python heron component'''
1818
__all__ = ['bolt_instance.py', 'spout_instance.py', 'base_instance']
1919

20-
from bolt_instance import BoltInstance
21-
from spout_instance import SpoutInstance
20+
from .bolt_instance import BoltInstance
21+
from .spout_instance import SpoutInstance

heron/instance/src/python/basics/bolt_instance.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
'''bolt_instance.py: module for base bolt for python topology'''
2222

2323
import time
24-
import Queue
24+
import queue
2525

2626
import heronpy.api.api_constants as api_constants
2727
from heronpy.api.state.stateful_component import StatefulComponent
@@ -181,7 +181,7 @@ def _read_tuples_and_execute(self):
181181
while not self.in_stream.is_empty():
182182
try:
183183
tuples = self.in_stream.poll()
184-
except Queue.Empty:
184+
except queue.Empty:
185185
break
186186

187187
if isinstance(tuples, tuple_pb2.HeronTupleSet):

heron/instance/src/python/basics/spout_instance.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
'''spout_instance.py: module for base spout for python topology'''
2222

23-
import Queue
23+
import queue
2424
import time
2525
import collections
2626

@@ -185,7 +185,7 @@ def _read_tuples(self):
185185
while not self.in_stream.is_empty():
186186
try:
187187
tuples = self.in_stream.poll()
188-
except Queue.Empty:
188+
except queue.Empty:
189189
break
190190

191191
if isinstance(tuples, tuple_pb2.HeronTupleSet):

heron/instance/src/python/network/gateway_looper.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import time
2727
import select
2828

29-
from event_looper import EventLooper
29+
from .event_looper import EventLooper
3030
from heron.common.src.python.utils.log import Log
3131

3232
class GatewayLooper(EventLooper):
@@ -83,7 +83,7 @@ def poll(self, timeout=0.0):
8383
error_lst = []
8484

8585
if self.sock_map is not None:
86-
for fd, obj in self.sock_map.items():
86+
for fd, obj in list(self.sock_map.items()):
8787
is_r = obj.readable()
8888
is_w = obj.writable()
8989
if is_r:

heron/instance/src/python/utils/metrics/metrics_helper.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def __init__(self, metrics):
4141

4242
def register_metrics(self, metrics_collector, interval):
4343
"""Registers its metrics to a given metrics collector with a given interval"""
44-
for field, metrics in self.metrics.items():
44+
for field, metrics in list(self.metrics.items()):
4545
metrics_collector.register_metric(field, metrics, interval)
4646

4747
def update_count(self, name, incr_by=1, key=None):
@@ -379,7 +379,7 @@ def _gather_one_metric(self, name, message):
379379
if metric_value is None:
380380
return
381381
elif isinstance(metric_value, dict):
382-
for key, value in metric_value.items():
382+
for key, value in list(metric_value.items()):
383383
if key is not None and value is not None:
384384
self._add_data_to_message(message, name + "/" + str(key), value)
385385
self._add_data_to_message(message, "%s/%s" % (name, str(key)), value)

heron/instance/src/python/utils/misc/communicator.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
'''communicator.py: module responsible for communication between Python heron modules'''
2222
import sys
23-
import Queue
23+
from queue import Queue, Full, Empty
2424

2525
from heron.common.src.python.utils.log import Log
2626

@@ -40,7 +40,7 @@ def __init__(self, producer_cb=None, consumer_cb=None):
4040
"""
4141
self._producer_callback = producer_cb
4242
self._consumer_callback = consumer_cb
43-
self._buffer = Queue.Queue()
43+
self._buffer = Queue()
4444
self.capacity = sys.maxsize
4545

4646
def register_capacity(self, capacity):
@@ -72,9 +72,9 @@ def poll(self):
7272
if self._producer_callback is not None:
7373
self._producer_callback()
7474
return ret
75-
except Queue.Empty:
75+
except Empty:
7676
Log.debug("%s: Empty in poll()" % str(self))
77-
raise Queue.Empty
77+
raise Empty
7878

7979
def offer(self, item):
8080
"""Offer to the buffer
@@ -87,9 +87,9 @@ def offer(self, item):
8787
if self._consumer_callback is not None:
8888
self._consumer_callback()
8989
return True
90-
except Queue.Full:
90+
except Full:
9191
Log.debug("%s: Full in offer()" % str(self))
92-
raise Queue.Full
92+
raise Full
9393

9494
def clear(self):
9595
"""Clear the buffer"""

heron/instance/src/python/utils/misc/custom_grouping_helper.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def add(self, stream_id, task_ids, grouping, source_comp_name):
4545

4646
def prepare(self, context):
4747
"""Prepares the custom grouping for this component"""
48-
for stream_id, targets in self.targets.items():
48+
for stream_id, targets in list(self.targets.items()):
4949
for target in targets:
5050
target.prepare(context, stream_id)
5151

heron/instance/src/python/utils/system_config.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
def merge(default, override):
3030
if isinstance(default, dict) and isinstance(override, dict):
31-
for k, v in override.items():
31+
for k, v in list(override.items()):
3232
Log.info("Add overriding configuration '%s'", k)
3333
if k not in default:
3434
default[k] = v

heron/instance/src/python/utils/topology/topology_context_impl.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def get_this_sources(self):
110110
def get_component_tasks(self, component_id):
111111
"""Returns the task ids allocated for the given component id"""
112112
ret = []
113-
for task_id, comp_id in self.task_to_component_map.items():
113+
for task_id, comp_id in list(self.task_to_component_map.items()):
114114
if comp_id == component_id:
115115
ret.append(task_id)
116116
return ret

0 commit comments

Comments
 (0)