Skip to content

Commit 5e30e1c

Browse files
committed
add the tests for the current problem and fixes for it
1 parent fdc0b0f commit 5e30e1c

File tree

2 files changed

+132
-56
lines changed

2 files changed

+132
-56
lines changed

hubblestack/extmods/modules/pulsar.py

+61-27
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
try:
3333
import pyinotify
3434
HAS_PYINOTIFY = True
35-
DEFAULT_MASK = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_MODIFY
35+
DEFAULT_MASK = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_DELETE_SELF | pyinotify.IN_MODIFY
36+
RM_WATCH_MASK = pyinotify.IN_DELETE | pyinotify.IN_DELETE_SELF | pyinotify.IN_IGNORED
3637
MASKS = {}
3738
for var in dir(pyinotify):
3839
if var.startswith('IN_'):
@@ -73,6 +74,18 @@ def _enqueue(revent):
7374
"""
7475
__context__['pulsar.queue'].append(revent)
7576

77+
def _maskname_filter(name):
78+
""" deleting a directly watched file produces IN_DELETE_SELF (not
79+
IN_DELETE) and also kicks up a an IN_IGNORED (whether you mask for it
80+
or not) to indicate the file is nolonger watched.
81+
82+
We avoid returning IN_IGNORED if we can... but IN_DELETE_SELF is
83+
corrected to IN_DELETE
84+
"""
85+
if name == 'IN_DELETE_SELF':
86+
return 'IN_DELETE'
87+
return name
88+
7689
class ConfigManager(object):
7790
_config = {}
7891
_last_update = 0
@@ -360,9 +373,9 @@ def watch(self, path, mask=None, **kw):
360373
kw['rec'] = kw.get('rec')
361374
if kw['rec'] is None:
362375
kw['rec'] = pconf['recurse']
363-
self.add_watch(path,mask,**kw)
364-
log.debug('add-watch wd={0} path={1} watch_files={2} recurse={3}'.format(
365-
self.watch_db.get(path), path, pconf['watch_files'], kw['rec']))
376+
self.add_watch(path, mask, **kw)
377+
log.debug('add-watch wd={0} path={1} watch_files={2} recurse={3} mask={4}'.format(
378+
self.watch_db.get(path), path, pconf['watch_files'], kw['rec'], mask))
366379

367380
if new_file: # process() says this is a new file
368381
self._add_recursed_file_watch(path)
@@ -772,10 +785,11 @@ def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml
772785

773786
excludes = _preprocess_excludes( config[cpath].get('exclude') )
774787
_append = not excludes(pathname)
788+
775789
if _append:
776790
config_path = config['paths'][0]
777791
pulsar_config = config_path[config_path.rfind('/') + 1:len(config_path)]
778-
sub = { 'change': event.maskname,
792+
sub = { 'change': _maskname_filter(event.maskname),
779793
'path': abspath, # goes to object_path in splunk
780794
'tag': dirname, # goes to file_path in splunk
781795
'name': basename, # goes to file_name in splunk
@@ -816,20 +830,19 @@ def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml
816830
if os.path.isfile(pathname):
817831
sub['size'] = os.path.getsize(pathname)
818832

819-
820-
821-
ret.append(sub)
833+
if event.mask != pyinotify.IN_IGNORED:
834+
ret.append(sub)
822835

823836
if not event.mask & pyinotify.IN_ISDIR:
824837
if event.mask & pyinotify.IN_CREATE:
825838
watch_this = config[cpath].get('watch_new_files', False) \
826839
or config[cpath].get('watch_files', False)
827840
if watch_this:
828841
if not excludes(pathname):
829-
log.debug("add file-watch path={0}".format(pathname))
842+
log.debug("add file-watch path={0} mask={1}".format(pathname,
843+
pyinotify.IN_MODIFY))
830844
wm.watch(pathname, pyinotify.IN_MODIFY, new_file=True)
831-
832-
elif event.mask & pyinotify.IN_DELETE:
845+
elif event.mask & RM_WATCH_MASK:
833846
wm.rm_watch(pathname)
834847
else:
835848
log.debug('Excluding {0} from event for {1}'.format(pathname, cpath))
@@ -847,22 +860,18 @@ def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml
847860
continue
848861
if isinstance(config[path], dict):
849862
mask = config[path].get('mask', DEFAULT_MASK)
850-
## commented out on 2019-08-05
851-
# watch_files = config[path].get('watch_files', DEFAULT_MASK)
852-
## note: it seems like the below was commented out because the above seems to be bugged
853-
## clearly watch_files should default to False, not DEFAULT_MASK
854-
## (aka true); that probably seemd like spurious watches from ##
855-
## duplications or whatever caused the below to be commented out.
856-
## Regardless, the above is probably wrong *and* seems unused.
857-
## commented out previous to 2019-08-05
858-
# if watch_files:
859-
# # we're going to get dup modify events if watch_files is set
860-
# # and we still monitor modify for the dir
861-
# a = mask & pyinotify.IN_MODIFY
862-
# if a:
863-
# log.debug("mask={0} -= mask & pyinotify.IN_MODIFY={1} ==> {2}".format(
864-
# mask, a, mask-a))
865-
# mask -= mask & pyinotify.IN_MODIFY
863+
watch_files = config[path].get('watch_files', False)
864+
if watch_files:
865+
# we're going to get dup modify events if watch_files is set
866+
# and we still monitor modify for the dir
867+
mask_and_modify = mask & pyinotify.IN_MODIFY
868+
if mask_and_modify:
869+
log.debug("mask={0} -= mask & pyinotify.IN_MODIFY={1}" \
870+
" ==> {2}".format(
871+
mask,
872+
mask_and_modify,
873+
mask-mask_and_modify))
874+
mask -= mask_and_modify
866875
excludes = _preprocess_excludes( config[path].get('exclude') )
867876
if isinstance(mask, list):
868877
r_mask = 0
@@ -880,7 +889,32 @@ def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml
880889
rec = False
881890
auto_add = False
882891

892+
if os.path.isfile(path) and not wm.get_wd(path):
893+
# We were not previously watching this file generate a fake
894+
# IN_CREATE to announce this fact. We'd like to only generate
895+
# CREATE events when files are created, but we can't actually
896+
# watch files that don't exist yet (not with inotify anyway).
897+
# The kernel would rather be watching directories anyway.
898+
#
899+
# You might worry we'll get lots of spurious IN_CREATEs when
900+
# the database is cleared or at startup or whatever. We
901+
# actually watch everyhthing from config at startup anyway; so
902+
# we avoid these fake IN_CREATE events at startup. They only
903+
# happen when we add a watch during update, which means the
904+
# file really is new since the last time we thought about it
905+
# (aka the last time we ran the process() function).
906+
_, abspath, dirname, basename = cm.format_path(path)
907+
config_path = config['paths'][0]
908+
pulsar_config = config_path[config_path.rfind('/') + 1:len(config_path)]
909+
fake_sub = { 'change': 'IN_CREATE',
910+
'path': abspath, # goes to object_path in splunk
911+
'tag': dirname, # goes to file_path in splunk
912+
'name': basename, # goes to file_name in splunk
913+
'pulsar_config': pulsar_config}
914+
ret.append(fake_sub)
915+
883916
wm.watch(path, mask, rec=rec, auto_add=auto_add, exclude_filter=excludes)
917+
884918
dt.fin()
885919
dt.mark('prune_watches')
886920
wm.prune()

tests/unittests/test_pulsar.py

+71-29
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,11 @@ def config_get(_, default):
183183
def process(self):
184184
self.events.extend([ "{change}({path})".format(**x) for x in pulsar.process() ])
185185

186+
def get_clear_events(self):
187+
ret = self.events
188+
self.events = list()
189+
return ret
190+
186191
def nuke_tdir(self):
187192
if os.path.isdir(self.tdir):
188193
shutil.rmtree(self.tdir)
@@ -402,55 +407,92 @@ def test_pruning_watch_files_then_nothing(self):
402407
assert set1 == set([self.atdir, fname1, fname2, self.atfile])
403408
assert set2 == set()
404409

405-
def test_fim_single_file(self):
406-
config = {self.atfile: {}}
410+
def test_watch_files_events(self):
411+
config = {self.atdir: { 'watch_files': True }}
407412
self.reset(**config)
408413
self.mk_tdir_and_write_tfile()
409-
self.watch_manager.watch(self.tfile)
410414

411415
set0 = set(self.watch_manager.watch_db)
412-
levents0 = len(self.events)
413-
# we should be watching 1 file, no events
414416

415-
self.process()
417+
pulsar.process()
416418
set1 = set(self.watch_manager.watch_db)
417419
levents1 = len(self.events)
418-
# we should be watching 1 file, no events
420+
assert set0 == set()
421+
assert set1 == set([self.atdir, self.atfile])
422+
assert levents1 == 0
419423

420424
with open(self.atfile, 'a') as fh:
421425
fh.write('supz\n')
422-
423426
self.process()
424-
set2 = set(self.watch_manager.watch_db)
425-
levents2 = len(self.events)
426-
# we should still be watching 1 file, 1 event
427+
set_ = set(self.watch_manager.watch_db)
428+
events_ = self.get_clear_events()
429+
assert set_ == set1
430+
assert events_ == ['IN_MODIFY({})'.format(self.atfile)]
427431

428432
os.unlink(self.atfile)
433+
self.process()
434+
set_ = set(self.watch_manager.watch_db)
435+
events_ = self.get_clear_events()
436+
assert set_ == set([self.atdir])
437+
assert events_ == ['IN_DELETE({})'.format(self.atfile)]
438+
439+
with open(self.atfile, 'a') as fh:
440+
fh.write('supz\n')
441+
self.process()
442+
set_ = set(self.watch_manager.watch_db)
443+
events_ = self.get_clear_events()
444+
assert set_ == set1
445+
assert events_ == ['IN_CREATE({})'.format(self.atfile)]
429446

447+
with open(self.atfile, 'a') as fh:
448+
fh.write('supz\n')
430449
self.process()
431-
set3 = set(self.watch_manager.watch_db)
432-
levents3 = len(self.events)
433-
# we should now be watching 0 files, 2 events
450+
set_ = set(self.watch_manager.watch_db)
451+
events_ = self.get_clear_events()
452+
assert set_ == set1
453+
assert events_ == ['IN_MODIFY({})'.format(self.atfile)]
434454

435-
assert levents0 == 0
436-
assert levents1 == 0
437-
assert levents2 == 1
438-
assert levents3 == 2
439-
assert set0 == set([self.atfile])
455+
def test_single_file_events(self):
456+
config = {self.atfile: dict()}
457+
self.reset(**config)
458+
self.mk_tdir_and_write_tfile()
459+
460+
set0 = set(self.watch_manager.watch_db)
461+
assert set0 == set()
462+
463+
pulsar.process()
464+
set1 = set(self.watch_manager.watch_db)
465+
levents1 = len(self.events)
440466
assert set1 == set([self.atfile])
441-
assert set2 == set([self.atfile])
442-
assert set3 == set()
443-
### BASELINE ###
467+
assert levents1 == 0
444468

445-
# the relevant connundrum: if we put the file back, this config should
446-
# somehow know to re-watch the atfile
447-
# at the time of this writing, this test fails
448469
with open(self.atfile, 'a') as fh:
449470
fh.write('supz\n')
471+
self.process()
472+
set2 = set(self.watch_manager.watch_db)
473+
events2 = self.get_clear_events()
474+
assert set2 == set1
475+
assert events2 == ['IN_MODIFY({})'.format(self.atfile)]
450476

477+
os.unlink(self.atfile)
451478
self.process()
452-
set4 = set(self.watch_manager.watch_db)
453-
levents4 = len(self.events)
479+
set_ = set(self.watch_manager.watch_db)
480+
events_ = self.get_clear_events()
481+
assert set_ == set() # this is DELETE_SELF now (technically)
482+
assert events_ == ['IN_DELETE({})'.format(self.atfile)]
454483

455-
assert set4 == set([self.atfile])
456-
assert levents4 == 3 # XXX: 4? CREATE? CREATE_MODIFY? CREATE+MODIFY??
484+
with open(self.atfile, 'a') as fh:
485+
fh.write('supz\n')
486+
self.process()
487+
set_ = set(self.watch_manager.watch_db)
488+
events_ = self.get_clear_events()
489+
assert set_ == set1
490+
assert events_ == ['IN_CREATE({})'.format(self.atfile)]
491+
492+
with open(self.atfile, 'a') as fh:
493+
fh.write('supz\n')
494+
self.process()
495+
set_ = set(self.watch_manager.watch_db)
496+
events_ = self.get_clear_events()
497+
assert set_ == set1
498+
assert events_ == ['IN_MODIFY({})'.format(self.atfile)]

0 commit comments

Comments
 (0)