Skip to content

Commit dab7fe9

Browse files
committed
Don't monitor lost drops until serviceDrops are completed.
1 parent 9607727 commit dab7fe9

File tree

2 files changed

+26
-22
lines changed

2 files changed

+26
-22
lines changed

daliuge-engine/dlg/droputils.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,8 @@ def getLeafNodes(drops):
228228
return [
229229
drop
230230
for drop, _ in breadFirstTraverse(drops)
231-
if not getDownstreamObjects(drop) and drop.CategoryType != "dropclass"
231+
if not getDownstreamObjects(drop) and (drop.CategoryType != "dropclass" and
232+
drop.CategoryType != "Service")
232233
]
233234

234235

daliuge-engine/dlg/lifecycle/dlm.py

+24-21
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@
124124
import threading
125125
import time
126126

127+
from typing import Dict
127128
from dlg.common import CategoryType
128129
from . import registry
129130
from .hsm import manager
@@ -229,8 +230,8 @@ def __init__(
229230
# instead of _drops.itervalues() to get a full, thread-safe copy of the
230231
# dictionary values. Maybe there's a better approach for thread-safety
231232
# here
232-
self._drops: dict[str, AbstractDROP] = {}
233-
233+
self._drops: Dict[str, AbstractDROP] = {}
234+
self._serviceDrops: Dict[str, AbstractDROP] = {}
234235
self._check_period = check_period
235236
self._cleanup_period = cleanup_period
236237
self._drop_checker = None
@@ -334,6 +335,12 @@ def _disappeared(self, drop):
334335

335336
def deleteLostDrops(self):
336337

338+
for sd in self._serviceDrops.values():
339+
if sd.status == DROPStates.INITIALIZED:
340+
logger.info("No need to delete lost drops whilst Service Drops are "
341+
"starting up...")
342+
return
343+
337344
toRemove = []
338345
for drop in self._drops.values():
339346

@@ -465,15 +472,9 @@ def addDrop(self, drop: AbstractDROP):
465472
drop.phase = DROPPhases.GAS
466473
drop.subscribe(self._listener)
467474

468-
# if drop.CategoryType == CategoryType.SERVICE:
469-
# connection = drop.getIO().exists()
470-
# continue
471-
#TODO LOOK HERE FOR SETTING UP SERVICES BASED ON THE DROP
472-
# if drop.persist:
473-
# self._updatePersistentStore(drop)
474-
475-
if drop.CategoryType == CategoryType.SERVICE: # and self._hsm:
476-
print("We are in the DLM!")
475+
if drop.CategoryType == CategoryType.SERVICE: # and self._hsm:
476+
print("Tracking ServiceDROPS in the DLM...")
477+
self._serviceDrops[drop.uid] = drop
477478
# self._hsm.addStore(drop.store)
478479

479480
self._reg.addDrop(drop)
@@ -509,16 +510,18 @@ def handleCompletedDrop(self, uid):
509510

510511
if not self._enable_drop_replication:
511512
return
512-
513-
drop = self._drops[uid]
514-
if drop.persist and self.isReplicable(drop):
515-
logger.debug(
516-
"Replicating %r because it's marked to be persisted", drop
517-
)
518-
try:
519-
self.replicateDrop(drop)
520-
except:
521-
logger.exception("Problem while replicating %r", drop)
513+
try:
514+
drop = self._drops[uid]
515+
if drop.persist and self.isReplicable(drop):
516+
logger.debug(
517+
"Replicating %r because it's marked to be persisted", drop
518+
)
519+
try:
520+
self.replicateDrop(drop)
521+
except:
522+
logger.exception("Problem while replicating %r", drop)
523+
except KeyError:
524+
logger.warning("Drop %s was removed from DLM early!", uid)
522525

523526
def isReplicable(self, drop):
524527
return not isinstance(drop, ContainerDROP)

0 commit comments

Comments
 (0)