Skip to content

Commit b4f23de

Browse files
committed
Merge branch 'LIU-368' of https://github.com/icrar/daliuge into LIU-368
2 parents bf3a8fc + 6207121 commit b4f23de

File tree

6 files changed

+129
-50
lines changed

6 files changed

+129
-50
lines changed

daliuge-engine/dlg/data/drops/data_base.py

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
6565
# @param streaming False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component streams input and output data
6666
# @param persist False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
67+
# @param expireAfterUse True/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution#
6768
# @param dummy /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/Dummy port
6869
# @par EAGLE_END
6970
class DataDROP(AbstractDROP):

daliuge-engine/dlg/data/drops/file.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
# @param dropclass dlg.data.drops.file.FileDROP/String/ComponentParameter/NoPort/ReadWrite//False/False/Drop class
4545
# @param streaming False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component streams input and output data
4646
# @param persist True/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
47+
# @param expireAfterUse True/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
4748
# @param data_volume 5/Float/ConstraintParameter/NoPort/ReadWrite//False/False/Estimated size of the data contained in this node
4849
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
4950
# @param dummy /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/Dummy port
@@ -70,12 +71,18 @@ class FileDROP(DataDROP, PathBasedDrop):
7071
check_filepath_exists = dlg_bool_param("check_filepath_exists", False)
7172
# is_dir = dlg_bool_param("is_dir", False)
7273

73-
# Make sure files are not deleted by default and certainly not if they are
74-
# marked to be persisted no matter what expireAfterUse said
7574
def __init__(self, *args, **kwargs):
76-
if "persist" not in kwargs:
77-
kwargs["persist"] = True
78-
if kwargs["persist"] and "lifespan" not in kwargs:
75+
"""
76+
Initialise default drop behaviour when it is completed with the following rules:
77+
78+
- "expireAfterUse": Remove the data from the workspace once it has been used
79+
by all consumers. This is independent of the "persist" flag. This is false
80+
by default for FileDrops.
81+
82+
"""
83+
84+
# 'lifespan' and 'expireAfterUse' are mutually exclusive
85+
if "lifespan" not in kwargs and "expireAfterUse" not in kwargs:
7986
kwargs["expireAfterUse"] = False
8087
self.is_dir = False
8188
super().__init__(*args, **kwargs)

daliuge-engine/dlg/data/drops/memory.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def parse_pydata(pd_dict: dict) -> bytes:
8787
# @param pydata None/String/ApplicationArgument/NoPort/ReadWrite//False/False/Data to be loaded into memory
8888
# @param dummy /Object/ApplicationArgument/InputOutput/ReadWrite//False/False/Dummy port
8989
# @param persist False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
90+
# @param expireAfterUse True/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
9091
# @param data_volume 5/Float/ConstraintParameter/NoPort/ReadWrite//False/False/Estimated size of the data contained in this node
9192
# @param group_end False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Is this node the end of a group?
9293
# @param streaming False/Boolean/ComponentParameter/NoPort/ReadWrite//False/False/Specifies whether this data component streams input and output data
@@ -103,8 +104,6 @@ def __init__(self, *args, **kwargs):
103104
kwargs["persist"] = False
104105
if "expireAfterUse" not in kwargs:
105106
kwargs["expireAfterUse"] = True
106-
if kwargs["persist"]:
107-
kwargs["expireAfterUse"] = False
108107
super().__init__(*args, **kwargs)
109108

110109
def initialize(self, **kwargs):

daliuge-engine/dlg/drop.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,14 @@ def __init__(self, oid, uid, **kwargs):
352352
"but they are mutually exclusive" % (self,),
353353
)
354354

355-
self._expireAfterUse = self._popArg(kwargs, "expireAfterUse", False)
355+
# If expireAfterUse is set by the user to be False, we do not want to initiate
356+
# a timeout using lifespan, so we set the default for expireAfterUse to None
357+
self._expireAfterUse = self._popArg(kwargs, "expireAfterUse", None)
358+
359+
# We only initiate the lifespan if the expireAfterUse flag has not been specified
360+
# as an argument on the Drop.
356361
self._expirationDate = -1
357-
if not self._expireAfterUse:
362+
if self._expireAfterUse is None:
358363
lifespan = float(self._popArg(kwargs, "lifespan", -1))
359364
if lifespan != -1:
360365
self._expirationDate = time.time() + lifespan
@@ -367,9 +372,6 @@ def __init__(self, oid, uid, **kwargs):
367372

368373
# No DROP should be persisted unless stated otherwise; used for replication
369374
self._persist: bool = self._popArg(kwargs, "persist", False)
370-
# If DROP should be persisted, don't expire (delete) it.
371-
if self._persist:
372-
self._expireAfterUse = False
373375

374376
# Useful to have access to all EAGLE parameters without a prior knowledge
375377
self._parameters = dict(kwargs)

daliuge-engine/dlg/lifecycle/dlm.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,16 @@ def deleteExpiredDrops(self):
281281
if drop.status == DROPStates.EXPIRED:
282282
self._deleteDrop(drop)
283283

284-
def expireCompletedDrops(self):
284+
def expireCompletedDrops(self) -> None:
285+
"""
286+
Drops that have 'expireAfterUse' argument specified should be removed
287+
when the drop has completed execution.
288+
289+
Note: This operation occurs independently of the 'persist' argument.
290+
291+
Returns:
292+
None
293+
"""
285294
now = time.time()
286295
for drop in self._drops.values():
287296

@@ -290,7 +299,7 @@ def expireCompletedDrops(self):
290299

291300
# Expire-after-use: mark as expired if all consumers
292301
# are finished using this DROP
293-
if not drop.persist and drop.expireAfterUse:
302+
if drop.expireAfterUse:
294303
allDone = all(
295304
c.execStatus
296305
in [AppDROPStates.FINISHED, AppDROPStates.ERROR]

daliuge-engine/test/lifecycle/test_dlm.py

+97-36
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from dlg.apps.app_base import BarrierAppDROP
3636
from dlg.data.drops.directorycontainer import DirectoryContainer
3737
from dlg.data.drops.file import FileDROP
38+
from dlg.data.drops.memory import InMemoryDROP
3839
from dlg.droputils import DROPWaiterCtx
3940
from dlg.lifecycle import dlm
4041

@@ -61,7 +62,15 @@ def test_dropAddition(self):
6162

6263
def test_dropCompleteTriggersReplication(self):
6364
with dlm.DataLifecycleManager(enable_drop_replication=True) as manager:
64-
drop = FileDROP("oid:A", "uid:A1", expectedSize=1)
65+
66+
# By default a file is non-persistent
67+
drop = FileDROP("oid:B", "uid:B1", expectedSize=1)
68+
manager.addDrop(drop)
69+
self._writeAndClose(drop)
70+
self.assertEqual(DROPPhases.GAS, drop.phase)
71+
self.assertEqual(1, len(manager.getDropUids(drop)))
72+
73+
drop = FileDROP("oid:A", "uid:A1", expectedSize=1, persist=True)
6574
manager.addDrop(drop)
6675
self._writeAndClose(drop)
6776

@@ -70,12 +79,6 @@ def test_dropCompleteTriggersReplication(self):
7079
self.assertEqual(DROPPhases.SOLID, drop.phase)
7180
self.assertEqual(2, len(manager.getDropUids(drop)))
7281

73-
# Try the same with a non-persisted data object, it shouldn't be replicated
74-
drop = FileDROP("oid:B", "uid:B1", expectedSize=1, persist=False)
75-
manager.addDrop(drop)
76-
self._writeAndClose(drop)
77-
self.assertEqual(DROPPhases.GAS, drop.phase)
78-
self.assertEqual(1, len(manager.getDropUids(drop)))
7982

8083
def test_expiringNormalDrop(self):
8184
with dlm.DataLifecycleManager(check_period=0.5) as manager:
@@ -130,55 +133,113 @@ def test_cleanupExpiredDrops(self):
130133
self.assertEqual(DROPStates.DELETED, drop.status)
131134
self.assertFalse(drop.exists())
132135

133-
def test_expireAfterUse(self):
136+
def test_expireAfterUseForFile(self):
137+
"""
138+
Test default and non-default behaviour for the expireAfterUse flag
139+
for file drops.
140+
141+
Default: expireAfterUse=False, so the drop should still exist after it
142+
has been consumed.
143+
Non-default: expiredAfterUse=True, so the drop will be expired and
144+
deleted after it is consumed.
145+
"""
146+
147+
class MyApp(BarrierAppDROP):
148+
def run(self):
149+
pass
150+
151+
with dlm.DataLifecycleManager(check_period=0.5, cleanup_period=2) as manager:
152+
153+
# Check default
154+
default_fp, default_name = tempfile.mkstemp()
155+
default = FileDROP(
156+
"a",
157+
"a",
158+
filepath=default_name
159+
)
160+
161+
expired_fp, expired_name = tempfile.mkstemp()
162+
expired = FileDROP(
163+
"b",
164+
"b",
165+
filepath=expired_name,
166+
expireAfterUse=True # Remove the file after use
167+
)
168+
c = MyApp("c", "c")
169+
d = MyApp("d", "d")
170+
default.addConsumer(c)
171+
default.addConsumer(d)
172+
expired.addConsumer(c)
173+
expired.addConsumer(d)
174+
175+
manager.addDrop(default)
176+
manager.addDrop(expired)
177+
manager.addDrop(expired)
178+
manager.addDrop(c)
179+
180+
# Make sure all consumers are done
181+
with DROPWaiterCtx(self, [c, d], 1):
182+
default.setCompleted()
183+
expired.setCompleted()
184+
185+
# Both directories should be there, but after cleanup B's shouldn't
186+
# be there anymore
187+
self.assertTrue(default.exists())
188+
self.assertTrue(expired.exists())
189+
time.sleep(2.5)
190+
self.assertTrue(default.exists())
191+
self.assertFalse(expired.exists())
192+
default.delete()
193+
194+
def test_expireAfterUseForMemory(self):
134195
"""
135-
Simple test for the expireAfterUse flag. Two DROPs are created with
136-
different values, and after they are used we check whether their data
137-
is still there or not
196+
Default: expireAfterUse=True, so the drop should not exist after it
197+
has been consumed.
198+
Non-default: expiredAfterUse=False, so the drop will be not be expired
199+
after it is consumed.
138200
"""
139201

140202
class MyApp(BarrierAppDROP):
141203
def run(self):
142204
pass
143205

144206
with dlm.DataLifecycleManager(check_period=0.5, cleanup_period=2) as manager:
145-
a = DirectoryContainer(
207+
208+
# Check default behaviour - deleted for memory drops
209+
default = InMemoryDROP(
146210
"a",
147211
"a",
148-
persist=False,
149-
expireAfterUse=True,
150-
dirname=tempfile.mkdtemp(),
151212
)
152-
b_dirname = tempfile.mkdtemp()
153-
b = DirectoryContainer(
213+
214+
# Non-default behaviour - memory is not deleted
215+
non_expired = InMemoryDROP(
154216
"b",
155217
"b",
156-
persist=False,
157-
expireAfterUse=False,
158-
dirname=b_dirname,
218+
expireAfterUse=False
159219
)
160220
c = MyApp("c", "c")
161221
d = MyApp("d", "d")
162-
a.addConsumer(c)
163-
a.addConsumer(d)
164-
b.addConsumer(c)
165-
b.addConsumer(d)
166-
167-
manager.addDrop(a)
168-
manager.addDrop(b)
169-
manager.addDrop(b)
222+
default.addConsumer(c)
223+
default.addConsumer(d)
224+
non_expired.addConsumer(c)
225+
non_expired.addConsumer(d)
226+
227+
manager.addDrop(default)
228+
manager.addDrop(non_expired)
229+
manager.addDrop(non_expired)
170230
manager.addDrop(c)
171231

172232
# Make sure all consumers are done
173233
with DROPWaiterCtx(self, [c, d], 1):
174-
a.setCompleted()
175-
b.setCompleted()
234+
default.setCompleted()
235+
non_expired.setCompleted()
176236

177-
# Both directories should be there, but after cleanup A's shouldn't
237+
# Both directories should be there, but after cleanup B's shouldn't
178238
# be there anymore
179-
self.assertTrue(a.exists())
180-
self.assertTrue(b.exists())
239+
self.assertTrue(default.exists())
240+
self.assertTrue(non_expired.exists())
181241
time.sleep(2.5)
182-
self.assertFalse(a.exists())
183-
self.assertTrue(b.exists())
184-
b.delete()
242+
self.assertFalse(default.exists())
243+
self.assertTrue(non_expired.exists())
244+
non_expired.delete()
245+

0 commit comments

Comments
 (0)