Skip to content

Commit 3b0ca05

Browse files
committed
Move connection backend config into Manager __init__
1 parent 8e5e3f5 commit 3b0ca05

File tree

3 files changed

+50
-35
lines changed

3 files changed

+50
-35
lines changed

src/goofi/connection.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,16 @@ def set_backend(backend: str, mp_manager: BaseManager) -> None:
4848
`mp_manager` : multiprocessing.Manager
4949
The multiprocessing manager to use for creating shared objects.
5050
"""
51+
# initialize a shared set of connection ids
52+
assert Connection._CONNECTION_IDS is None, "Connection._CONNECTION_IDS is not None."
53+
Connection._CONNECTION_IDS = mp_manager.list()
54+
55+
# set the backend if discovering connection IDs was successful (prevents overwriting the backend)
5156
assert (
5257
backend in Connection.get_backends().keys()
5358
), f"Invalid backend: {backend}. Choose from {list(Connection.get_backends().keys())}"
5459
Connection._BACKEND = backend
5560

56-
# initialize a shared set of connection ids
57-
assert Connection._CONNECTION_IDS is None, "Connection._CONNECTION_IDS is not None."
58-
Connection._CONNECTION_IDS = mp_manager.list()
59-
6061
@staticmethod
6162
def create() -> Tuple["Connection", "Connection"]:
6263
"""

src/goofi/manager.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,29 @@ class Manager:
112112
same process as the manager.
113113
`duration` : float
114114
The duration to run the manager for. If `0`, runs indefinitely.
115+
`communication_backend` : str
116+
The communication backend to use for node communication. Default is "mp" (multiprocessing).
115117
"""
116118

117119
def __init__(
118-
self, filepath: Optional[str] = None, headless: bool = True, use_multiprocessing: bool = True, duration: float = 0
120+
self,
121+
filepath: Optional[str] = None,
122+
headless: bool = True,
123+
use_multiprocessing: bool = True,
124+
duration: float = 0,
125+
communication_backend: str = "mp",
119126
) -> None:
127+
# create a multiprocessing manager
128+
self._mp_manager = MPManager()
129+
130+
try:
131+
# set up communication backend
132+
Connection.set_backend(communication_backend, self._mp_manager)
133+
except AssertionError:
134+
print("Connection backend already set. Skipping.")
135+
# make sure the connection backend is set correctly
136+
assert Connection._BACKEND == communication_backend
137+
120138
# TODO: add proper logging
121139
print("Starting goofi-pipe...")
122140
# preload all nodes to avoid delays
@@ -319,7 +337,9 @@ def remove_node(self, name: str, notify_gui: bool = True, **gui_kwargs) -> None:
319337
Window().remove_node(name, **gui_kwargs)
320338

321339
@mark_unsaved_changes
322-
def add_link(self, node_out: str, node_in: str, slot_out: str, slot_in: str, notify_gui: bool = True, **gui_kwargs) -> None:
340+
def add_link(
341+
self, node_out: str, node_in: str, slot_out: str, slot_in: str, notify_gui: bool = True, **gui_kwargs
342+
) -> None:
323343
"""
324344
Adds a link between two nodes.
325345
@@ -403,6 +423,9 @@ def terminate(self, notify_gui: bool = True) -> None:
403423
for node in self.nodes:
404424
self.nodes[node].terminate()
405425

426+
# close the communication backend
427+
self._mp_manager.shutdown()
428+
406429
def save(self, filepath: Optional[str] = None, overwrite: bool = False, timeout: float = 3.0) -> None:
407430
"""
408431
Saves the state of the manager to a file.
@@ -585,21 +608,14 @@ def main(duration: float = 0, args=None):
585608
docs()
586609
return
587610

588-
with MPManager() as manager:
589-
# set the communication backend
590-
try:
591-
Connection.set_backend(args.comm, manager)
592-
except AssertionError:
593-
# connection backend is already set (occurrs when running tests)
594-
pass
595-
596-
# create and run the manager (this blocks until the manager is terminated)
597-
Manager(
598-
filepath=args.filepath,
599-
headless=args.headless,
600-
use_multiprocessing=not args.no_multiprocessing,
601-
duration=duration,
602-
)
611+
# create and run the manager (this blocks until the manager is terminated)
612+
Manager(
613+
filepath=args.filepath,
614+
headless=args.headless,
615+
use_multiprocessing=not args.no_multiprocessing,
616+
duration=duration,
617+
communication_backend=args.comm,
618+
)
603619

604620

605621
def docs():

tests/test_manager.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import platform
22
import time
3-
from multiprocessing import Manager as MPManager
43
from os import path
54

65
import pytest
@@ -10,16 +9,22 @@
109
from goofi.manager import Manager
1110
from goofi.message import Message, MessageType
1211

12+
MANAGER_TEST_DURATION = 0.1
1313

14-
def create_simple_manager() -> Manager:
14+
15+
def create_simple_manager(comm_backend: str = "mp") -> Manager:
1516
"""
1617
Creates a simple manager with a constant node, a sine node, and an add node.
1718
19+
### Parameters
20+
`comm_backend` : str
21+
The communication backend to use. Choose from "mp", "zmq-tcp" or "zmq-ipc".
22+
1823
### Returns
1924
manager : Manager
2025
The manager object.
2126
"""
22-
manager = Manager()
27+
manager = Manager(duration=MANAGER_TEST_DURATION, communication_backend=comm_backend)
2328
manager.add_node("ConstantArray", "inputs")
2429
manager.add_node("Sine", "inputs")
2530
manager.add_node("Operation", "array")
@@ -30,11 +35,11 @@ def create_simple_manager() -> Manager:
3035

3136

3237
def test_creation():
33-
Manager()
38+
Manager(duration=MANAGER_TEST_DURATION)
3439

3540

3641
def test_main():
37-
goofi.manager.main(1, ["--headless"])
42+
goofi.manager.main(MANAGER_TEST_DURATION, ["--headless"])
3843

3944

4045
@pytest.mark.skipif(platform.system() == "Windows", reason="Multiprocessing is very slow on Windows.")
@@ -44,14 +49,7 @@ def test_simple(comm_backend):
4449
# TODO: make sure zmq backend works
4550
pytest.skip("ZeroMQ backend still has some issues.")
4651

47-
try:
48-
mp_manager = MPManager()
49-
Connection.set_backend("mp", mp_manager)
50-
except AssertionError:
51-
# connection backend is already set
52-
pass
53-
54-
manager = create_simple_manager()
52+
manager = create_simple_manager(comm_backend=comm_backend)
5553
my_conn, node_conn = Connection.create()
5654
manager.nodes["operation0"].connection.send(
5755
Message(MessageType.ADD_OUTPUT_PIPE, {"slot_name_out": "out", "slot_name_in": "in", "node_connection": my_conn})
@@ -80,7 +78,7 @@ def test_simple(comm_backend):
8078

8179

8280
def test_save_empty(tmpdir):
83-
manager = Manager()
81+
manager = Manager(duration=MANAGER_TEST_DURATION)
8482

8583
# if path is a file, save to that file
8684
manager.save(path.join(tmpdir, "test.gfi"))

0 commit comments

Comments
 (0)