Skip to content

Commit 2427335

Browse files
committed
Merge branch 'graph-enabler' into eagle-1184
2 parents b919495 + 0c17c97 commit 2427335

File tree

11 files changed

+175
-125
lines changed

11 files changed

+175
-125
lines changed

daliuge-engine/dlg/apps/simple.py

+57-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
dlg_bool_param,
4242
dlg_int_param,
4343
dlg_list_param,
44+
dlg_dict_param,
4445
dlg_component,
4546
dlg_batch_input,
4647
dlg_batch_output,
@@ -432,6 +433,50 @@ class GenericGatherApp(BarrierAppDROP):
432433
def initialize(self, **kwargs):
433434
super(GenericGatherApp, self).initialize(**kwargs)
434435

436+
def readWriteData(self):
437+
inputs = self.inputs
438+
outputs = self.outputs
439+
total_len = 0
440+
for output in outputs:
441+
for input in inputs:
442+
value = droputils.allDropContents(input)
443+
output.write(value)
444+
445+
def run(self):
446+
self.readWriteData()
447+
448+
449+
##
450+
# @brief DictGatherApp
451+
# @details App packs all data on input into a dictionary using the input drop's names as keys and the reading the
452+
# dict values from the input drops. This app can be used stand-alone without a gather construct.
453+
# @par EAGLE_START
454+
# @param category PythonApp
455+
# @param tag daliuge
456+
# @param value_dict value_dict/Jasom/ApplicationArgument/NoPort/ReadWrite//False/False/The value dictionary can be initialized here
457+
# @param dropclass dlg.apps.simple.DictGatherApp/String/ComponentParameter/NoPort/ReadOnly//False/False/Application class
458+
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
459+
# @param num_cpus 1/Integer/ConstraintParameter/NoPort/ReadOnly//False/False/Number of cores used
460+
# @param group_start False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the start of a group?
461+
# @param input /Object/ApplicationArgument/InputPort/ReadWrite//False/False/0-base placeholder port for inputs
462+
# @param output /Object/ApplicationArgument/OutputPort/ReadWrite//False/False/Placeholder port for outputs
463+
# @param input_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Input port parsing technique
464+
# @param output_parser pickle/Select/ComponentParameter/NoPort/ReadWrite/raw,pickle,eval,npy,path,dataurl/False/False/Output port parsing technique
465+
# @par EAGLE_END
466+
class DictGatherApp(BarrierAppDROP):
467+
component_meta = dlg_component(
468+
"DictGatherApp",
469+
"Collect multiple inputs into a dictionary",
470+
[dlg_batch_input("binary/*", [])],
471+
[dlg_batch_output("binary/*", [])],
472+
[dlg_streaming_input("binary/*")],
473+
)
474+
value_dict = dlg_dict_param("value_dict", {})
475+
476+
def initialize(self, **kwargs):
477+
super(DictGatherApp, self).initialize(**kwargs)
478+
self.kwargs = kwargs
479+
435480
def readWriteData(self):
436481
inputs = self.inputs
437482
outputs = self.outputs
@@ -441,10 +486,19 @@ def readWriteData(self):
441486
# logger.debug(f">>>> writing {inputs} to {outputs}")
442487
for output in outputs:
443488
for input in inputs:
444-
d = droputils.allDropContents(input)
445-
output.write(d)
489+
value = droputils.allDropContents(input)
490+
self.value_dict[input.name] = pickle.loads(value)
491+
for aa_key, aa_dict in self.kwargs["applicationArgs"].items():
492+
if aa_key not in self.value_dict and aa_dict["value"]:
493+
self.value_dict[aa_key] = aa_dict["value"]
494+
logger.debug(
495+
"Writing %s to %s",
496+
self.value_dict,
497+
output.name,
498+
)
499+
output.write(pickle.dumps(self.value_dict))
446500

447-
# logger.debug(f">>> written {d} to {output}")
501+
# logger.debug(f">>> written {d} to {output}")
448502

449503
def run(self):
450504
self.readWriteData()

daliuge-engine/dlg/data/drops/memory.py

+5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ def parse_pydata(pd_dict: dict) -> bytes:
4949
pydata = json.loads(pydata)
5050
except:
5151
pydata = pydata.encode()
52+
if pd_dict["type"].lower() == "eval":
53+
# try:
54+
pydata = eval(pydata)
55+
# except:
56+
# pydata = pydata.encode()
5257
elif pd_dict["type"].lower() == "int":
5358
try:
5459
pydata = int(pydata)

daliuge-engine/dlg/data/io.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ def _write(self, data, **kwargs) -> int:
626626
self._buf += data
627627
else:
628628
self._desc.send(data)
629-
logger.debug("Wrote %s bytes", len(data))
629+
# logger.debug("Wrote %s bytes", len(data))
630630
return len(data)
631631

632632
def exists(self) -> bool:

daliuge-engine/dlg/graph_loader.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def loadDropSpecs(dropSpecList):
224224
if oid in dropSpecs:
225225
dropSpecs[oid]
226226
else:
227-
continue
227+
raise KeyError
228228

229229
# N-1 relationships
230230
elif rel in __TOONE:

daliuge-engine/dlg/manager/composite_manager.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None
312312
iterable,
313313
)
314314
if thrExs:
315-
msg = f"More than one error occurred while {action} on session {sessionId}"
315+
msg = f"ERRROR(s) occurred while {action} for session {sessionId}"
316316
raise SubManagerException(msg, thrExs)
317317

318318
#
@@ -606,4 +606,4 @@ def __init__(self, dmHosts=[], pkeyPath=None, dmCheckTimeout=10):
606606
pkeyPath=pkeyPath,
607607
dmCheckTimeout=dmCheckTimeout,
608608
)
609-
logger.info("Created MasterManager for hosts: %r", self._dmHosts)
609+
logger.info("Created MasterManager for DIM hosts: %r", self._dmHosts)

daliuge-engine/dlg/manager/rest.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,10 @@ def fwrapper(*args, **kwargs):
8080
origin = bottle.request.headers.raw("Origin")
8181
logger.debug("CORS request comming from: %s", origin)
8282
if origin is None or re.match(
83-
r"http://(dlg-trans.local:80[0-9][0-9]|dlg-trans.icrar.org)", origin
83+
r"(http://dlg-trans.local:80[0-9][0-9]|https://dlg-trans.icrar.org)",
84+
origin,
8485
):
85-
origin = "http://dlg-trans.local:8084"
86+
pass
8687
elif re.match(r"http://((localhost)|(127.0.0.1)):80[0-9][0-9]", origin):
8788
origin = "http://localhost:8084"
8889
bottle.response.headers["Access-Control-Allow-Origin"] = origin
@@ -122,9 +123,11 @@ def fwrapper(*args, **kwargs):
122123
eargs = {}
123124
# args[1] is a dictionary of host:exception
124125
for host, subex in e.args[1].items():
126+
logger.debug(">>>> Error class name: %s", subex.__class__.__name__)
125127
eargs[host] = {
126128
"type": subex.__class__.__name__,
127-
"args": subex.args,
129+
# "args": subex.args,
130+
"args": "dummy",
128131
}
129132
elif isinstance(e, DaliugeException):
130133
status, eargs = 555, e.args

daliuge-engine/dlg/manager/session.py

+22-48
Original file line numberDiff line numberDiff line change
@@ -287,23 +287,19 @@ def addGraphSpec(self, graphSpec):
287287
self.status = SessionStates.BUILDING
288288

289289
# This will check the consistency of each dropSpec
290-
logger.debug("Trying to add graphSpec: %s", [[x['oid'],x['node']] for x in graphSpec])
291-
graphSpecDict, self._graphreprodata = graph_loader.loadDropSpecs(
292-
graphSpec
293-
)
290+
# logger.debug("Trying to add graphSpec: %s", [x.keys() for x in graphSpec])
291+
logger.debug("Trying to add graphSpec: %s", graphSpec)
292+
graphSpecDict, self._graphreprodata = graph_loader.loadDropSpecs(graphSpec)
294293
# Check for duplicates
295294
duplicates = set(graphSpecDict) & set(self._graph)
296295
if duplicates:
297296
raise InvalidGraphException(
298-
"Trying to add drops with OIDs that already exist: %r"
299-
% (duplicates,)
297+
"Trying to add drops with OIDs that already exist: %r" % (duplicates,)
300298
)
301299

302300
self._graph.update(graphSpecDict)
303301

304-
logger.debug(
305-
"Added a graph definition with %d DROPs", len(graphSpecDict)
306-
)
302+
logger.debug("Added a graph definition with %d DROPs", len(graphSpecDict))
307303

308304
@track_current_session
309305
def linkGraphParts(self, lhOID, rhOID, linkType, force=False):
@@ -329,9 +325,7 @@ def linkGraphParts(self, lhOID, rhOID, linkType, force=False):
329325
missingOids.append(rhOID)
330326
if missingOids:
331327
oids = "OID" if len(missingOids) == 1 else "OIDs"
332-
raise InvalidGraphException(
333-
"No DROP found for %s %r" % (oids, missingOids)
334-
)
328+
raise InvalidGraphException("No DROP found for %s %r" % (oids, missingOids))
335329

336330
graph_loader.addLink(linkType, lhDropSpec, rhOID, force=force)
337331

@@ -356,8 +350,7 @@ def deploy(self, completedDrops=[], event_listeners=[], foreach=None):
356350
not self._graph and status != SessionStates.PRISTINE
357351
):
358352
raise InvalidSessionState(
359-
"Can't deploy this session in its current status: %d"
360-
% (status)
353+
"Can't deploy this session in its current status: %d" % (status)
361354
)
362355

363356
if not self._graph and completedDrops:
@@ -468,9 +461,7 @@ def _run(self, worker):
468461
def trigger_drops(self, uids):
469462
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
470463
downStreamDrops[:] = [
471-
dsDrop
472-
for dsDrop in downStreamDrops
473-
if isinstance(dsDrop, AbstractDROP)
464+
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
474465
]
475466
if drop.uid in uids:
476467
if isinstance(drop, InputFiredAppDROP):
@@ -547,9 +538,7 @@ def add_node_subscriptions(self, relationships):
547538
remote_uid = rel.rhs
548539
mname = LINKTYPE_1TON_BACK_APPEND_METHOD[rel.rel]
549540

550-
self._proxyinfo.append(
551-
(host, rpc_port, local_uid, mname, remote_uid)
552-
)
541+
self._proxyinfo.append((host, rpc_port, local_uid, mname, remote_uid))
553542

554543
def append_reprodata(self, oid, reprodata):
555544
if oid in self._graph:
@@ -561,30 +550,26 @@ def append_reprodata(self, oid, reprodata):
561550
drop_reprodata = reprodata.get("data", {})
562551
drop_hashes = reprodata.get("merkleroot", {})
563552
for rmode in ALL_RMODES:
564-
self._graph[oid]["reprodata"][rmode.name][
565-
"rg_data"
566-
] = drop_reprodata[rmode.name]
553+
self._graph[oid]["reprodata"][rmode.name]["rg_data"] = (
554+
drop_reprodata[rmode.name]
555+
)
567556
self._graph[oid]["reprodata"][rmode.name]["rg_data"][
568557
"merkleroot"
569558
] = drop_hashes.get(rmode.name, b"")
570559

571560
else:
572-
self._graph[oid]["reprodata"]["rg_data"] = reprodata.get(
573-
"data", {}
561+
self._graph[oid]["reprodata"]["rg_data"] = reprodata.get("data", {})
562+
self._graph[oid]["reprodata"]["rg_data"]["merkleroot"] = reprodata.get(
563+
"merkleroot", b""
574564
)
575-
self._graph[oid]["reprodata"]["rg_data"][
576-
"merkleroot"
577-
] = reprodata.get("merkleroot", b"")
578565

579566
@track_current_session
580567
def finish(self):
581568
self.status = SessionStates.FINISHED
582569
logger.info("Session %s finished", self._sessionId)
583570
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
584571
downStreamDrops[:] = [
585-
dsDrop
586-
for dsDrop in downStreamDrops
587-
if isinstance(dsDrop, AbstractDROP)
572+
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
588573
]
589574
if drop.status in (DROPStates.INITIALIZED, DROPStates.WRITING):
590575
drop.setCompleted()
@@ -595,9 +580,7 @@ def end(self):
595580
logger.info("Session %s ended", self._sessionId)
596581
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
597582
downStreamDrops[:] = [
598-
dsDrop
599-
for dsDrop in downStreamDrops
600-
if isinstance(dsDrop, AbstractDROP)
583+
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
601584
]
602585
if drop.status in (DROPStates.INITIALIZED, DROPStates.WRITING):
603586
drop.skip()
@@ -621,9 +604,7 @@ def getGraphStatus(self):
621604
statusDict = collections.defaultdict(dict)
622605
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
623606
downStreamDrops[:] = [
624-
dsDrop
625-
for dsDrop in downStreamDrops
626-
if isinstance(dsDrop, AbstractDROP)
607+
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
627608
]
628609
if isinstance(drop, AppDROP):
629610
statusDict[drop.oid]["execStatus"] = drop.execStatus
@@ -636,14 +617,11 @@ def cancel(self):
636617
status = self.status
637618
if status != SessionStates.RUNNING:
638619
raise InvalidSessionState(
639-
"Can't cancel this session in its current status: %d"
640-
% (status)
620+
"Can't cancel this session in its current status: %d" % (status)
641621
)
642622
for drop, downStreamDrops in droputils.breadFirstTraverse(self._roots):
643623
downStreamDrops[:] = [
644-
dsDrop
645-
for dsDrop in downStreamDrops
646-
if isinstance(dsDrop, AbstractDROP)
624+
dsDrop for dsDrop in downStreamDrops if isinstance(dsDrop, AbstractDROP)
647625
]
648626
if drop.status not in (
649627
DROPStates.ERROR,
@@ -682,9 +660,7 @@ def get_drop_property(self, uid, prop_name):
682660
drop = self._drops[uid]
683661
return getattr(drop, prop_name)
684662
except AttributeError:
685-
raise DaliugeException(
686-
"%r has no property called %s" % (drop, prop_name)
687-
)
663+
raise DaliugeException("%r has no property called %s" % (drop, prop_name))
688664

689665
def call_drop(self, uid, method, *args):
690666
if uid not in self._drops:
@@ -693,9 +669,7 @@ def call_drop(self, uid, method, *args):
693669
drop = self._drops[uid]
694670
m = getattr(drop, method)
695671
except AttributeError:
696-
raise DaliugeException(
697-
"%r has no method called %s" % (drop, method)
698-
)
672+
raise DaliugeException("%r has no method called %s" % (drop, method))
699673
return m(*args)
700674

701675
# Support for the 'with' keyword

daliuge-engine/dlg/testutils.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,20 @@ def start_nm_in_thread(self, port=constants.NODE_DEFAULT_REST_PORT):
6363
return self._start_manager_in_thread(port, NodeManager, NMRestServer, False)
6464

6565
def start_dim_in_thread(
66-
self, nm_hosts=["localhost"], port=constants.ISLAND_DEFAULT_REST_PORT
66+
self,
67+
nm_hosts=[f"localhost:{constants.NODE_DEFAULT_REST_PORT}"],
68+
port=constants.ISLAND_DEFAULT_REST_PORT,
6769
):
6870
return self._start_manager_in_thread(
6971
port, DataIslandManager, CompositeManagerRestServer, nm_hosts
7072
)
7173

7274
def start_mm_in_thread(
73-
self, nm_hosts=["localhost"], port=constants.MASTER_DEFAULT_REST_PORT
75+
self,
76+
nm_hosts=[
77+
f"localhost:{constants.ISLAND_DEFAULT_REST_PORT}",
78+
],
79+
port=constants.MASTER_DEFAULT_REST_PORT,
7480
):
7581
return self._start_manager_in_thread(
7682
port, MasterManager, CompositeManagerRestServer, nm_hosts

daliuge-engine/test/deploy/test_common.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ def _submit(self):
7878
]
7979
pg = add_test_reprodata(pg)
8080
for drop in pg:
81-
drop["node"] = "localhost"
82-
drop["island"] = "localhost"
81+
drop["node"] = f"localhost:{constants.NODE_DEFAULT_REST_PORT}"
82+
drop["island"] = f"localhost:{constants.ISLAND_DEFAULT_REST_PORT}"
8383
return common.submit(pg, "localhost", self.port)
8484

8585
def assert_sessions_finished(self, status, *session_ids):
@@ -91,9 +91,7 @@ def test_submit(self):
9191

9292
def test_monitor(self):
9393
session_id = self._submit()
94-
status = common.monitor_sessions(
95-
session_id, port=self.port, poll_interval=0.1
96-
)
94+
status = common.monitor_sessions(session_id, port=self.port, poll_interval=0.1)
9795
self.assert_session_finished(status)
9896

9997
def test_monitor_all(self):

0 commit comments

Comments
 (0)