Skip to content

Commit df48d30

Browse files
committed
Updates to fix ZMQ with ports
1 parent 4a61999 commit df48d30

File tree

5 files changed

+20
-5
lines changed

5 files changed

+20
-5
lines changed

daliuge-common/dlg/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def zmq_safe(host_or_addr):
141141
# The catch-all IP address, ZMQ needs a *
142142
if host_or_addr == "0.0.0.0":
143143
return "*"
144-
144+
host_or_addr = host_or_addr.split(":")[0]
145145
# Return otherwise always an IP address
146146
return socket.gethostbyname(host_or_addr)
147147

daliuge-engine/dlg/graph_loader.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,10 @@ def loadDropSpecs(dropSpecList):
221221
# relationship list but doesn't exist in the list of DROPs
222222
for oid in dropSpec[rel]:
223223
oid = list(oid.keys())[0] if isinstance(oid, dict) else oid
224-
dropSpecs[oid]
224+
if oid in dropSpecs:
225+
dropSpecs[oid]
226+
else:
227+
continue
225228

226229
# N-1 relationships
227230
elif rel in __TOONE:
@@ -277,7 +280,10 @@ def createGraphFromDropSpecList(dropSpecList, session=None):
277280
link = __TOMANY[attr]
278281
for rel in dropSpec[attr]:
279282
oid = list(rel.keys())[0] if isinstance(rel, dict) else rel
280-
lhDrop = drops[oid]
283+
if oid in drops:
284+
lhDrop = drops[oid]
285+
else:
286+
continue
281287
relFuncName = LINKTYPE_1TON_APPEND_METHOD[link]
282288
try:
283289
relFunc = getattr(drop, relFuncName)

daliuge-engine/dlg/manager/node_manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,8 @@ def add_node_subscriptions(self, sessionId, relationships):
435435
host, events_port, _ = nodesub
436436

437437
# TODO: we also have to unsubscribe from them at some point
438+
host = host.split(":")[0]
439+
logger.debug("Sending subscription to %s", f"{host}:{events_port}")
438440
self.subscribe(host, events_port)
439441

440442
def has_method(self, sessionId, uid, mname):

daliuge-engine/dlg/manager/session.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ def addGraphSpec(self, graphSpec):
287287
self.status = SessionStates.BUILDING
288288

289289
# This will check the consistency of each dropSpec
290+
logger.debug("Trying to add graphSpec: %s", [[x['oid'],x['node']] for x in graphSpec])
290291
graphSpecDict, self._graphreprodata = graph_loader.loadDropSpecs(
291292
graphSpec
292293
)

daliuge-engine/dlg/rpc.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class RPCClientBase(RPCObject):
5555

5656
def get_drop_attribute(self, hostname, port, session_id, uid, name):
5757

58+
hostname = hostname.split(":")[0]
59+
5860
logger.debug(
5961
"Getting attribute %s for drop %s of session %s at %s:%d",
6062
name,
@@ -63,6 +65,7 @@ def get_drop_attribute(self, hostname, port, session_id, uid, name):
6365
hostname,
6466
port,
6567
)
68+
hostname = hostname.split(":")[0]
6669

6770
client, closer = self.get_rpc_client(hostname, port)
6871

@@ -93,7 +96,7 @@ class RPCServerBase(RPCObject):
9396
"""Base class for all RPC server"""
9497

9598
def __init__(self, host, port):
96-
self._rpc_host = host
99+
self._rpc_host = host.split(":")[0]
97100
self._rpc_port = port
98101

99102

@@ -137,10 +140,11 @@ def shutdown(self):
137140

138141
def get_client_for_endpoint(self, host, port):
139142

143+
host = host.split(":")[0]
140144
endpoint = (host, port)
141145

142146
with self._zrpcclient_acquisition_lock:
143-
if endpoint in self._zrpcclients:
147+
if endpoint in self._zrpcclinents:
144148
return self._zrpcclients[endpoint]
145149

146150
# We start the new client on its own thread so it uses gevent, etc.
@@ -179,6 +183,7 @@ def has_method(self, session_id, uid, name):
179183
return client
180184

181185
def run_zrpcclient(self, host, port, req_queue):
186+
host = host.split(":")[0]
182187
client = zerorpc.Client("tcp://%s:%d" % (host, port), context=self._context)
183188

184189
forwarder = gevent.spawn(self.forward_requests, req_queue, client)
@@ -210,6 +215,7 @@ def queue_request(self, client, req):
210215
async_result.rawlink(lambda x: self.process_response(req, x))
211216

212217
def get_rpc_client(self, hostname, port):
218+
# hostname = hostname.split(":")[0]
213219
client = self.get_client_for_endpoint(hostname, port)
214220
# No closing function since clients are long-lived
215221
return client, lambda: None

0 commit comments

Comments
 (0)