Skip to content

Commit 4b19b64

Browse files
committed
LIU-368: Update expireAfterUse default behaviour
Files: expireAfterUse=False Memory: expireAfterUse=True This is irrespective of 'persist'
1 parent 73f8b9f commit 4b19b64

File tree

3 files changed

+103
-50
lines changed

3 files changed

+103
-50
lines changed

daliuge-engine/dlg/data/drops/file.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,15 @@ def __init__(self, *args, **kwargs):
7575
"""
7676
Initialise default drop behaviour when it is completed with the following rules:
7777
78-
- "persist": Replicate and store the data in the specified persistent store
79-
Files should be persistent by default.
78+
- "expireAfterUse": Remove the data from the workspace once it has been used
79+
by all consumers. This is independent of the "persist" flag. This is false
80+
by default for FileDrops.
8081
81-
- "expireAfterUse": Remove the data from the workspace once it has been used
82-
by all consumers. This is independent of the "persist" flag.
83-
84-
Use the default behaviour for expireAfterUse in AbstractDROP,
85-
unless specified otherwise.
8682
"""
8783

88-
if "persist" not in kwargs:
89-
kwargs["persist"] = True
84+
# 'lifespan' and 'expireAfterUse' are mutually exclusive
85+
if "lifespan" not in kwargs and "expireAfterUse" not in kwargs:
86+
kwargs["expireAfterUse"] = False
9087
self.is_dir = False
9188
super().__init__(*args, **kwargs)
9289

daliuge-engine/dlg/drop.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -373,11 +373,6 @@ def __init__(self, oid, uid, **kwargs):
373373
# No DROP should be persisted unless stated otherwise; used for replication
374374
self._persist: bool = self._popArg(kwargs, "persist", False)
375375

376-
# Non-persistent drops should expire after they are used
377-
# If expirationDate is set, the drops will expire so we do not need to update
378-
if not self._persist and self._expirationDate == -1:
379-
self._expireAfterUse = True
380-
381376
# Useful to have access to all EAGLE parameters without a prior knowledge
382377
self._parameters = dict(kwargs)
383378
self.autofill_environment_variables()

daliuge-engine/test/lifecycle/test_dlm.py

Lines changed: 97 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from dlg.apps.app_base import BarrierAppDROP
3636
from dlg.data.drops.directorycontainer import DirectoryContainer
3737
from dlg.data.drops.file import FileDROP
38+
from dlg.data.drops.memory import InMemoryDROP
3839
from dlg.droputils import DROPWaiterCtx
3940
from dlg.lifecycle import dlm
4041

@@ -61,7 +62,15 @@ def test_dropAddition(self):
6162

6263
def test_dropCompleteTriggersReplication(self):
6364
with dlm.DataLifecycleManager(enable_drop_replication=True) as manager:
64-
drop = FileDROP("oid:A", "uid:A1", expectedSize=1)
65+
66+
# By default a file is non-persistent
67+
drop = FileDROP("oid:B", "uid:B1", expectedSize=1)
68+
manager.addDrop(drop)
69+
self._writeAndClose(drop)
70+
self.assertEqual(DROPPhases.GAS, drop.phase)
71+
self.assertEqual(1, len(manager.getDropUids(drop)))
72+
73+
drop = FileDROP("oid:A", "uid:A1", expectedSize=1, persist=True)
6574
manager.addDrop(drop)
6675
self._writeAndClose(drop)
6776

@@ -70,12 +79,6 @@ def test_dropCompleteTriggersReplication(self):
7079
self.assertEqual(DROPPhases.SOLID, drop.phase)
7180
self.assertEqual(2, len(manager.getDropUids(drop)))
7281

73-
# Try the same with a non-persisted data object, it shouldn't be replicated
74-
drop = FileDROP("oid:B", "uid:B1", expectedSize=1, persist=False)
75-
manager.addDrop(drop)
76-
self._writeAndClose(drop)
77-
self.assertEqual(DROPPhases.GAS, drop.phase)
78-
self.assertEqual(1, len(manager.getDropUids(drop)))
7982

8083
def test_expiringNormalDrop(self):
8184
with dlm.DataLifecycleManager(check_period=0.5) as manager:
@@ -130,55 +133,113 @@ def test_cleanupExpiredDrops(self):
130133
self.assertEqual(DROPStates.DELETED, drop.status)
131134
self.assertFalse(drop.exists())
132135

133-
def test_expireAfterUse(self):
136+
def test_expireAfterUseForFile(self):
137+
"""
138+
Test default and non-default behaviour for the expireAfterUse flag
139+
for file drops.
140+
141+
Default: expireAfterUse=False, so the drop should still exist after it
142+
has been consumed.
143+
Non-default: expiredAfterUse=True, so the drop will be expired and
144+
deleted after it is consumed.
145+
"""
146+
147+
class MyApp(BarrierAppDROP):
148+
def run(self):
149+
pass
150+
151+
with dlm.DataLifecycleManager(check_period=0.5, cleanup_period=2) as manager:
152+
153+
# Check default
154+
default_fp, default_name = tempfile.mkstemp()
155+
default = FileDROP(
156+
"a",
157+
"a",
158+
filepath=default_name
159+
)
160+
161+
expired_fp, expired_name = tempfile.mkstemp()
162+
expired = FileDROP(
163+
"b",
164+
"b",
165+
filepath=expired_name,
166+
expireAfterUse=True # Remove the file after use
167+
)
168+
c = MyApp("c", "c")
169+
d = MyApp("d", "d")
170+
default.addConsumer(c)
171+
default.addConsumer(d)
172+
expired.addConsumer(c)
173+
expired.addConsumer(d)
174+
175+
manager.addDrop(default)
176+
manager.addDrop(expired)
177+
manager.addDrop(expired)
178+
manager.addDrop(c)
179+
180+
# Make sure all consumers are done
181+
with DROPWaiterCtx(self, [c, d], 1):
182+
default.setCompleted()
183+
expired.setCompleted()
184+
185+
# Both directories should be there, but after cleanup B's shouldn't
186+
# be there anymore
187+
self.assertTrue(default.exists())
188+
self.assertTrue(expired.exists())
189+
time.sleep(2.5)
190+
self.assertTrue(default.exists())
191+
self.assertFalse(expired.exists())
192+
default.delete()
193+
194+
def test_expireAfterUseForMemory(self):
134195
"""
135-
Simple test for the expireAfterUse flag. Two DROPs are created with
136-
different values, and after they are used we check whether their data
137-
is still there or not
196+
Default: expireAfterUse=True, so the drop should not exist after it
197+
has been consumed.
198+
Non-default: expiredAfterUse=False, so the drop will be not be expired
199+
after it is consumed.
138200
"""
139201

140202
class MyApp(BarrierAppDROP):
141203
def run(self):
142204
pass
143205

144206
with dlm.DataLifecycleManager(check_period=0.5, cleanup_period=2) as manager:
145-
a = DirectoryContainer(
207+
208+
# Check default behaviour - deleted for memory drops
209+
default = InMemoryDROP(
146210
"a",
147211
"a",
148-
persist=False,
149-
expireAfterUse=False, # Marking persist as False should lead to expiry regardless of this flag
150-
dirname=tempfile.mkdtemp(),
151212
)
152-
b_dirname = tempfile.mkdtemp()
153-
b = DirectoryContainer(
213+
214+
# Non-default behaviour - memory is not deleted
215+
non_expired = InMemoryDROP(
154216
"b",
155217
"b",
156-
persist=True, # Persist should have no effect on expireAfterUse
157-
expireAfterUse=False,
158-
dirname=b_dirname,
218+
expireAfterUse=False
159219
)
160220
c = MyApp("c", "c")
161221
d = MyApp("d", "d")
162-
a.addConsumer(c)
163-
a.addConsumer(d)
164-
b.addConsumer(c)
165-
b.addConsumer(d)
166-
167-
manager.addDrop(a)
168-
manager.addDrop(b)
169-
manager.addDrop(b)
222+
default.addConsumer(c)
223+
default.addConsumer(d)
224+
non_expired.addConsumer(c)
225+
non_expired.addConsumer(d)
226+
227+
manager.addDrop(default)
228+
manager.addDrop(non_expired)
229+
manager.addDrop(non_expired)
170230
manager.addDrop(c)
171231

172232
# Make sure all consumers are done
173233
with DROPWaiterCtx(self, [c, d], 1):
174-
a.setCompleted()
175-
b.setCompleted()
234+
default.setCompleted()
235+
non_expired.setCompleted()
176236

177-
# Both directories should be there, but after cleanup A's shouldn't
237+
# Both directories should be there, but after cleanup B's shouldn't
178238
# be there anymore
179-
self.assertTrue(a.exists())
180-
self.assertTrue(b.exists())
239+
self.assertTrue(default.exists())
240+
self.assertTrue(non_expired.exists())
181241
time.sleep(2.5)
182-
self.assertFalse(a.exists())
183-
self.assertTrue(b.exists())
184-
b.delete()
242+
self.assertFalse(default.exists())
243+
self.assertTrue(non_expired.exists())
244+
non_expired.delete()
245+

0 commit comments

Comments
 (0)