Skip to content

Commit 16e9a56

Browse files
committed
Coalesce on remote WIP
Signed-off-by: Damien Thenot <[email protected]>
1 parent d934f92 commit 16e9a56

File tree

7 files changed

+258
-29
lines changed

7 files changed

+258
-29
lines changed

drivers/blktap2.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
LINSTOR_AVAILABLE = False
6565

6666
PLUGIN_TAP_PAUSE = "tapdisk-pause"
67+
PLUGIN_ON_SLAVE = "on-slave"
6768

6869
SOCKPATH = "/var/xapi/xcp-rrdd"
6970

@@ -461,6 +462,13 @@ def query(cls, pid, minor):
461462
total_coalesce = int(m.group(3))
462463
return (status, coalesced, total_coalesce)
463464

465+
@classmethod
466+
def cancel_commit(cls, pid, minor, wait=True):
467+
args = ["cancel", "-p", pid, "-m", minor]
468+
if wait:
469+
args.append("-w")
470+
cls._pread(args)
471+
464472
class TapdiskExists(Exception):
465473
"""Tapdisk already running."""
466474

@@ -1658,6 +1666,8 @@ def _activate_locked(self, sr_uuid, vdi_uuid, options):
16581666
if self.tap_wanted():
16591667
if not self._add_tag(vdi_uuid, not options["rdonly"]):
16601668
return False
1669+
#TODO: Need to interrupt coalesce on master, the coalesce will check for host_OpaqueRef on the VDI before trying offline coalesce
1670+
#TODO: The coalesce could happen on another slave in onlinecoalesce, interrupt coalesce on another slave (online coalesce)?
16611671
refresh = True
16621672

16631673
try:

drivers/cleanup.py

Lines changed: 123 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,7 @@ def getTreeHeight(self):
757757

758758
return maxChildHeight + 1
759759

760-
def getAllLeaves(self):
760+
def getAllLeaves(self) -> List["VDI"]:
761761
"Get all leaf nodes in the subtree rooted at self"
762762
if len(self.children) == 0:
763763
return [self]
@@ -828,6 +828,57 @@ def _clear(self):
828828
def _clearRef(self):
829829
self._vdiRef = None
830830

831+
def _call_plug_cancel(self, hostRef):
832+
args = {"path": self.path, "vdi_type": self.vdi_type}
833+
self.sr.xapi.session.xenapi.host.call_plugin( \
834+
hostRef, XAPI.PLUGIN_ON_SLAVE, "commit_cancel", args)
835+
836+
def _call_plugin_coalesce(self, hostRef):
837+
args = {"path": self.path, "vdi_type": self.vdi_type}
838+
self.sr.xapi.session.xenapi.host.call_plugin( \
839+
hostRef, XAPI.PLUGIN_ON_SLAVE, "commit_tapdisk", args)
840+
841+
def _doCoalesceOnHost(self, hostRef):
842+
self.validate()
843+
self.parent.validate(True)
844+
self.parent._increaseSizeVirt(self.sizeVirt)
845+
self.sr._updateSlavesOnResize(self.parent)
846+
#TODO: We might need to make the LV RW on the slave directly for coalesce?
847+
# Children and parent need to be RW for QCOW2 coalesce, otherwise tapdisk(libqcow) will crash trying to access them
848+
849+
def abortTest():
850+
file = self.sr._gc_running_file(self)
851+
try:
852+
with open(file, "r") as f:
853+
if not f.read():
854+
#TODO: Need to call commit cancel on the hostRef if we stop
855+
self._call_plug_cancel(hostRef)
856+
return True
857+
except OSError as e:
858+
if e.errno == errno.ENOENT:
859+
util.SMlog("File {} does not exist".format(file))
860+
else:
861+
util.SMlog("IOError: {}".format(e))
862+
return True
863+
return False
864+
865+
Util.runAbortable(lambda: self._call_plugin_coalesce(hostRef),
866+
None, self.sr.uuid, abortTest, VDI.POLL_INTERVAL, 0)
867+
868+
self.parent.validate(True)
869+
#self._verifyContents(0)
870+
self.parent.updateBlockInfo()
871+
872+
def _isOpenOnHosts(self) -> Optional[str]:
873+
for pbdRecord in self.sr.xapi.getAttachedPBDs():
874+
hostRef = pbdRecord["host"]
875+
args = {"path": self.path}
876+
is_openers = util.strtobool(self.sr.xapi.session.xenapi.host.call_plugin( \
877+
hostRef, XAPI.PLUGIN_ON_SLAVE, "is_openers", args))
878+
if is_openers:
879+
return hostRef
880+
return None
881+
831882
def _doCoalesce(self) -> None:
832883
"""Coalesce self onto parent. Only perform the actual coalescing of
833884
an image, but not the subsequent relinking. We'll do that as the next step,
@@ -914,7 +965,7 @@ def coalesce(self) -> int:
914965
return self.cowutil.coalesce(self.path)
915966

916967
@staticmethod
917-
def _doCoalesceCowImage(vdi):
968+
def _doCoalesceCowImage(vdi: "VDI"):
918969
try:
919970
startTime = time.time()
920971
allocated_size = vdi.getAllocatedSize()
@@ -943,7 +994,21 @@ def _vdi_is_raw(self, vdi_path):
943994

944995
def _coalesceCowImage(self, timeOut):
945996
Util.log(" Running COW coalesce on %s" % self)
946-
abortTest = lambda: IPCFlag(self.sr.uuid).test(FLAG_TYPE_ABORT)
997+
def abortTest():
998+
if self.cowutil.isCoalesceableOnRemote():
999+
file = self.sr._gc_running_file(self)
1000+
try:
1001+
with open(file, "r") as f:
1002+
if not f.read():
1003+
return True
1004+
except OSError as e:
1005+
if e.errno == errno.ENOENT:
1006+
util.SMlog("File {} does not exist".format(file))
1007+
else:
1008+
util.SMlog("IOError: {}".format(e))
1009+
return True
1010+
return IPCFlag(self.sr.uuid).test(FLAG_TYPE_ABORT)
1011+
9471012
try:
9481013
util.fistpoint.activate_custom_fn(
9491014
"cleanup_coalesceVHD_inject_failure",
@@ -2383,7 +2448,17 @@ def cleanupJournals(self, dryRun=False):
23832448
def cleanupCache(self, maxAge=-1) -> int:
23842449
return 0
23852450

2386-
def _coalesce(self, vdi):
2451+
def _hasLeavesAttachedOn(self, vdi: VDI):
2452+
leaves = vdi.getAllLeaves()
2453+
leaves_vdi = [leaf.uuid for leaf in leaves]
2454+
return util.get_hosts_attached_on(self.xapi.session, leaves_vdi)
2455+
2456+
def _gc_running_file(self, vdi):
2457+
run_file = "gc_running_{}".format(vdi.uuid)
2458+
return os.path.join(NON_PERSISTENT_DIR, str(self.uuid), run_file)
2459+
2460+
def _coalesce(self, vdi: VDI):
2461+
skipRelink = False
23872462
if self.journaler.get(vdi.JRN_RELINK, vdi.uuid):
23882463
# this means we had done the actual coalescing already and just
23892464
# need to finish relinking and/or refreshing the children
@@ -2393,8 +2468,35 @@ def _coalesce(self, vdi):
23932468
# order to decide whether to abort the coalesce. We remove the
23942469
# journal as soon as the COW coalesce step is done, because we
23952470
# don't expect the rest of the process to take long
2471+
2472+
#TODO: Create `gc_running` in `/run/nonpersistent/sm/<sr uuid>/`
2473+
if os.path.exists(self._gc_running_file(vdi)):
2474+
util.SMlog("gc_running already exist for {}. Ignoring...".format(self.uuid))
2475+
2476+
with open(self._gc_running_file(vdi), "w") as f:
2477+
f.write("1")
2478+
23962479
self.journaler.create(vdi.JRN_COALESCE, vdi.uuid, "1")
2397-
vdi._doCoalesce()
2480+
host_refs = self._hasLeavesAttachedOn(vdi)
2481+
#TODO: this check of multiple host_refs should be done earlier in `is_coalesceable` to avoid stopping this late every time
2482+
if len(host_refs) > 1:
2483+
util.SMlog("Not coalesceable, chain activated more than once")
2484+
raise Exception("Not coalesceable, chain activated more than once") #TODO: Use correct error
2485+
2486+
try:
2487+
if host_refs and vdi.cowutil.isCoalesceableOnRemote:
2488+
#Leaf opened on another host, we need to call online coalesce
2489+
vdi._doCoalesceOnHost(host_refs[0])
2490+
skipRelink = True
2491+
else:
2492+
vdi._doCoalesce()
2493+
except:
2494+
os.unlink(self._gc_running_file(vdi))
2495+
raise
2496+
"""
2497+
vdi._doCoalesce will call vdi._coalesceCowImage (after doing other things).
2498+
It will then call VDI._doCoalesceCowImage in a runAbortable context
2499+
"""
23982500
self.journaler.remove(vdi.JRN_COALESCE, vdi.uuid)
23992501

24002502
util.fistpoint.activate("LVHDRT_before_create_relink_journal", self.uuid)
@@ -2403,19 +2505,22 @@ def _coalesce(self, vdi):
24032505
# like SM.clone from manipulating the VDIs we'll be relinking and
24042506
# rescan the SR first in case the children changed since the last
24052507
# scan
2406-
self.journaler.create(vdi.JRN_RELINK, vdi.uuid, "1")
2508+
if not skipRelink:
2509+
self.journaler.create(vdi.JRN_RELINK, vdi.uuid, "1")
24072510

2408-
self.lock()
2409-
try:
2410-
vdi.parent._tagChildrenForRelink()
2411-
self.scan()
2412-
vdi._relinkSkip()
2413-
finally:
2414-
self.unlock()
2415-
# Reload the children to leave things consistent
2416-
vdi.parent._reloadChildren(vdi)
2511+
if not skipRelink:
2512+
self.lock()
2513+
try:
2514+
vdi.parent._tagChildrenForRelink()
2515+
self.scan()
2516+
vdi._relinkSkip()
2517+
finally:
2518+
self.unlock()
2519+
# Reload the children to leave things consistent
2520+
vdi.parent._reloadChildren(vdi)
2521+
self.journaler.remove(vdi.JRN_RELINK, vdi.uuid)
24172522

2418-
self.journaler.remove(vdi.JRN_RELINK, vdi.uuid)
2523+
os.unlink(self._gc_running_file(vdi))
24192524
self.deleteVDI(vdi)
24202525

24212526
class CoalesceTracker:
@@ -3718,8 +3823,7 @@ def _gc_init_file(sr_uuid):
37183823

37193824
def _create_init_file(sr_uuid):
37203825
util.makedirs(os.path.join(NON_PERSISTENT_DIR, str(sr_uuid)))
3721-
with open(os.path.join(
3722-
NON_PERSISTENT_DIR, str(sr_uuid), 'gc_init'), 'w+') as f:
3826+
with open(os.path.join(_gc_init_file(sr_uuid)), 'w+') as f:
37233827
f.write('1')
37243828

37253829

@@ -3748,7 +3852,7 @@ def abortTest():
37483852
Util.log("GC active, quiet period ended")
37493853

37503854

3751-
def _gcLoop(sr, dryRun=False, immediate=False):
3855+
def _gcLoop(sr: SR, dryRun=False, immediate=False):
37523856
if not lockGCActive.acquireNoblock():
37533857
Util.log("Another GC instance already active, exiting")
37543858
return

drivers/cowutil.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,18 @@ def getKeyHash(self, path: str) -> Optional[str]:
266266
def setKey(self, path: str, key_hash: str) -> None:
267267
pass
268268

269+
@abstractmethod
270+
def isCoalesceableOnRemote(self) -> bool:
271+
pass
272+
273+
@abstractmethod
274+
def coalesceOnline(self, path: str) -> int:
275+
pass
276+
277+
@abstractmethod
278+
def cancelCoalesceOnline(self, path: str) -> None:
279+
pass
280+
269281
def getParentChain(self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str) -> Dict[str, str]:
270282
"""
271283
Get the chain of all parents of 'path'. Safe to call for raw VDI's as well.

drivers/on_slave.py

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
# A plugin for synchronizing slaves when something changes on the Master
1919

2020
import sys
21+
import os
22+
import time
23+
import errno
2124
sys.path.append("/opt/xensource/sm/")
2225
import util
2326
import lock
@@ -148,7 +151,6 @@ def is_open(session, args):
148151
util.logException("is_open")
149152
raise
150153

151-
152154
def refresh_lun_size_by_SCSIid(session, args):
153155
"""Refresh the size of LUNs backing the SCSIid on the local node."""
154156
util.SMlog("on-slave.refresh_lun_size_by_SCSIid(,%s)" % args)
@@ -160,10 +162,78 @@ def refresh_lun_size_by_SCSIid(session, args):
160162
util.SMlog("on-slave.refresh_lun_size_by_SCSIid with %s failed" % args)
161163
return "False"
162164

165+
def commit_tapdisk(session, args):
166+
path = args["path"]
167+
vdi_type = args["vdi_type"]
168+
#TODO: Miss activating/changing RW, naming should reflect that it does more than coalesceing
169+
from cowutil import getCowUtil
170+
cowutil = getCowUtil(vdi_type)
171+
try:
172+
return str(cowutil.coalesceOnline(path))
173+
except:
174+
return "0"
175+
176+
def commit_cancel(session, args):
177+
path = args["path"]
178+
vdi_type = args["vdi_type"]
179+
from cowutil import getCowUtil
180+
cowutil = getCowUtil(vdi_type)
181+
try:
182+
cowutil.cancelCoalesceOnline(path)
183+
except:
184+
return "False"
185+
return "True"
186+
187+
def cancel_coalesce_master(session, args):
188+
sr_uuid = args["sr_uuid"]
189+
vdi_uuid = args["vdi_uuid"]
190+
191+
# from ipc import IPCFlag
192+
# flag = IPCFlag(sr_uuid)
193+
194+
# runningStr = "gc_running_{}".format(vdi_uuid)
195+
# abortStr = "abort_{}".format(vdi_uuid)
196+
197+
# if not flag.test(runningStr):
198+
# return "True"
199+
200+
# if not flag.test(abortStr):
201+
# flag.set(abortStr)
202+
203+
# while flag.test(abortStr) or flag.test(runningStr):
204+
# time.sleep(1)
205+
206+
# return "True"
207+
208+
path = "/run/nonpersistent/sm/{}/gc_running_{}".format(sr_uuid, vdi_uuid)
209+
210+
try:
211+
with open(path, "r+") as f:
212+
f.truncate(0)
213+
f.flush()
214+
os.fsync(f.fileno())
215+
except IOError as e:
216+
if e.errno == errno.ENOENT:
217+
return "True"
218+
raise
219+
220+
while os.path.exists(path):
221+
time.sleep(1)
222+
223+
return "True"
224+
225+
def is_openers(session, args):
226+
path = args["path"]
227+
openers_pid= util.get_openers_pid(path)
228+
return str(bool(openers_pid))
163229

164230
if __name__ == "__main__":
165231
import XenAPIPlugin
166232
XenAPIPlugin.dispatch({
167233
"multi": multi,
168234
"is_open": is_open,
169-
"refresh_lun_size_by_SCSIid": refresh_lun_size_by_SCSIid})
235+
"refresh_lun_size_by_SCSIid": refresh_lun_size_by_SCSIid,
236+
"is_openers": is_openers,
237+
"commit_tapdisk": commit_tapdisk,
238+
"commit_cancel": commit_cancel,
239+
})

0 commit comments

Comments
 (0)