Skip to content

Commit f2355d6

Browse files
faguandreyv
authored andcommitted
Use a lock file for precaching
Ensure that no two workers on the same machine simultaneously try to precache files. All workers except one skip precaching.
1 parent edf56fb commit f2355d6

File tree

2 files changed

+61
-18
lines changed

2 files changed

+61
-18
lines changed

cms/db/filecacher.py

+31
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import logging
3131
import os
3232
import tempfile
33+
import fcntl
3334
from abc import ABCMeta, abstractmethod
3435

3536
import gevent
@@ -543,6 +544,36 @@ def _create_directory_or_die(directory):
543544
logger.error(msg)
544545
raise RuntimeError(msg)
545546

547+
def precache_lock(self):
548+
"""Lock the (shared) cache for precaching if it is currently unlocked.
549+
550+
Locking is optional: Any process can perform normal cache operations
551+
at any time whether the cache is locked or not.
552+
553+
The locking mechanism's only purpose is to avoid wasting resources by
554+
ensuring that on each machine, only one worker precaches at any time.
555+
556+
return (fileobj|None): The lock file if the cache was previously
557+
unlocked. Closing the file object will release the lock.
558+
None if the cache was already locked.
559+
560+
"""
561+
lock_file = os.path.join(self.file_dir, "cache_lock")
562+
fobj = open(lock_file, 'w')
563+
returned = False
564+
try:
565+
fcntl.flock(fobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
566+
except BlockingIOError:
567+
# This exception is raised only if the errno is EWOULDBLOCK,
568+
# which means that the file is already locked.
569+
return None
570+
else:
571+
returned = True
572+
return fobj
573+
finally:
574+
if not returned:
575+
fobj.close()
576+
546577
def _load(self, digest, cache_only):
547578
"""Load a file into the cache and open it for reading.
548579

cms/service/Worker.py

+30-18
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
# Copyright © 2010-2012 Matteo Boscariol <[email protected]>
77
# Copyright © 2013-2015 Luca Wehrstedt <[email protected]>
88
# Copyright © 2016 Luca Versari <[email protected]>
9+
# Copyright © 2021 Fabian Gundlach <[email protected]>
910
#
1011
# This program is free software: you can redistribute it and/or modify
1112
# it under the terms of the GNU Affero General Public License as
@@ -70,24 +71,35 @@ def precache_files(self, contest_id):
7071
contest_id (int): the id of the contest
7172
7273
"""
73-
# In order to avoid a long-living connection, first fetch the
74-
# complete list of files and then download the files; since
75-
# this is just pre-caching, possible race conditions are not
76-
# dangerous
77-
logger.info("Precaching files for contest %d.", contest_id)
78-
with SessionGen() as session:
79-
contest = Contest.get_from_id(contest_id, session)
80-
files = enumerate_files(session, contest, skip_submissions=True,
81-
skip_user_tests=True, skip_print_jobs=True)
82-
for digest in files:
83-
try:
84-
self.file_cacher.cache_file(digest)
85-
except KeyError:
86-
# No problem (at this stage) if we cannot find the
87-
# file
88-
pass
89-
90-
logger.info("Precaching finished.")
74+
lock = self.file_cacher.precache_lock()
75+
if lock is None:
76+
# Another worker is already precaching. Hence, this worker doesn't
77+
# need to do anything.
78+
logger.info("Another worker is already precaching files for "
79+
"contest %d.", contest_id)
80+
return
81+
with lock:
82+
# In order to avoid a long-living connection, first fetch the
83+
# complete list of files and then download the files; since
84+
# this is just pre-caching, possible race conditions are not
85+
# dangerous
86+
logger.info("Precaching files for contest %d.", contest_id)
87+
with SessionGen() as session:
88+
contest = Contest.get_from_id(contest_id, session)
89+
files = enumerate_files(session,
90+
contest,
91+
skip_submissions=True,
92+
skip_user_tests=True,
93+
skip_print_jobs=True)
94+
for digest in files:
95+
try:
96+
self.file_cacher.cache_file(digest)
97+
except KeyError:
98+
# No problem (at this stage) if we cannot find the
99+
# file
100+
pass
101+
102+
logger.info("Precaching finished.")
91103

92104
@rpc_method
93105
def execute_job_group(self, job_group_dict):

0 commit comments

Comments
 (0)