Skip to content

Commit d443d99

Browse files
authored
Merge pull request #781 from jettero/py3-fim-single-file
fim single file [py3 rebase]
2 parents f027edf + 5e30e1c commit d443d99

File tree

2 files changed

+307
-160
lines changed

2 files changed

+307
-160
lines changed

hubblestack/extmods/modules/pulsar.py

+61-28
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
try:
3333
import pyinotify
3434
HAS_PYINOTIFY = True
35-
DEFAULT_MASK = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_MODIFY
35+
DEFAULT_MASK = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_DELETE_SELF | pyinotify.IN_MODIFY
36+
RM_WATCH_MASK = pyinotify.IN_DELETE | pyinotify.IN_DELETE_SELF | pyinotify.IN_IGNORED
3637
MASKS = {}
3738
for var in dir(pyinotify):
3839
if var.startswith('IN_'):
@@ -73,6 +74,18 @@ def _enqueue(revent):
7374
"""
7475
__context__['pulsar.queue'].append(revent)
7576

77+
def _maskname_filter(name):
78+
""" deleting a directly watched file produces IN_DELETE_SELF (not
79+
IN_DELETE) and also kicks up a an IN_IGNORED (whether you mask for it
80+
or not) to indicate the file is nolonger watched.
81+
82+
We avoid returning IN_IGNORED if we can... but IN_DELETE_SELF is
83+
corrected to IN_DELETE
84+
"""
85+
if name == 'IN_DELETE_SELF':
86+
return 'IN_DELETE'
87+
return name
88+
7689
class ConfigManager(object):
7790
_config = {}
7891
_last_update = 0
@@ -360,9 +373,9 @@ def watch(self, path, mask=None, **kw):
360373
kw['rec'] = kw.get('rec')
361374
if kw['rec'] is None:
362375
kw['rec'] = pconf['recurse']
363-
self.add_watch(path,mask,**kw)
364-
log.debug('add-watch wd={0} path={1} watch_files={2} recurse={3}'.format(
365-
self.watch_db.get(path), path, pconf['watch_files'], kw['rec']))
376+
self.add_watch(path, mask, **kw)
377+
log.debug('add-watch wd={0} path={1} watch_files={2} recurse={3} mask={4}'.format(
378+
self.watch_db.get(path), path, pconf['watch_files'], kw['rec'], mask))
366379

367380
if new_file: # process() says this is a new file
368381
self._add_recursed_file_watch(path)
@@ -772,10 +785,11 @@ def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml
772785

773786
excludes = _preprocess_excludes( config[cpath].get('exclude') )
774787
_append = not excludes(pathname)
788+
775789
if _append:
776790
config_path = config['paths'][0]
777791
pulsar_config = config_path[config_path.rfind('/') + 1:len(config_path)]
778-
sub = { 'change': event.maskname,
792+
sub = { 'change': _maskname_filter(event.maskname),
779793
'path': abspath, # goes to object_path in splunk
780794
'tag': dirname, # goes to file_path in splunk
781795
'name': basename, # goes to file_name in splunk
@@ -816,20 +830,19 @@ def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml
816830
if os.path.isfile(pathname):
817831
sub['size'] = os.path.getsize(pathname)
818832

819-
820-
821-
ret.append(sub)
833+
if event.mask != pyinotify.IN_IGNORED:
834+
ret.append(sub)
822835

823836
if not event.mask & pyinotify.IN_ISDIR:
824837
if event.mask & pyinotify.IN_CREATE:
825838
watch_this = config[cpath].get('watch_new_files', False) \
826839
or config[cpath].get('watch_files', False)
827840
if watch_this:
828841
if not excludes(pathname):
829-
log.debug("add file-watch path={0}".format(pathname))
842+
log.debug("add file-watch path={0} mask={1}".format(pathname,
843+
pyinotify.IN_MODIFY))
830844
wm.watch(pathname, pyinotify.IN_MODIFY, new_file=True)
831-
832-
elif event.mask & pyinotify.IN_DELETE:
845+
elif event.mask & RM_WATCH_MASK:
833846
wm.rm_watch(pathname)
834847
else:
835848
log.debug('Excluding {0} from event for {1}'.format(pathname, cpath))
@@ -839,7 +852,6 @@ def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml
839852
dt.mark('update_watches')
840853
log.debug("update watches")
841854
# Update existing watches and add new ones
842-
# TODO: make the config handle more options
843855
for path in config:
844856
excludes = lambda x: False
845857
if path in ['return', 'checksum', 'stats', 'batch', 'verbose',
@@ -848,22 +860,18 @@ def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml
848860
continue
849861
if isinstance(config[path], dict):
850862
mask = config[path].get('mask', DEFAULT_MASK)
851-
## commented out on 2019-08-05
852-
# watch_files = config[path].get('watch_files', DEFAULT_MASK)
853-
## note: it seems like the below was commented out because the above seems to be bugged
854-
## clearly watch_files should default to False, not DEFAULT_MASK
855-
## (aka true); that probably seemd like spurious watches from ##
856-
## duplications or whatever caused the below to be commented out.
857-
## Regardless, the above is probably wrong *and* seems unused.
858-
## commented out previous to 2019-08-05
859-
# if watch_files:
860-
# # we're going to get dup modify events if watch_files is set
861-
# # and we still monitor modify for the dir
862-
# a = mask & pyinotify.IN_MODIFY
863-
# if a:
864-
# log.debug("mask={0} -= mask & pyinotify.IN_MODIFY={1} ==> {2}".format(
865-
# mask, a, mask-a))
866-
# mask -= mask & pyinotify.IN_MODIFY
863+
watch_files = config[path].get('watch_files', False)
864+
if watch_files:
865+
# we're going to get dup modify events if watch_files is set
866+
# and we still monitor modify for the dir
867+
mask_and_modify = mask & pyinotify.IN_MODIFY
868+
if mask_and_modify:
869+
log.debug("mask={0} -= mask & pyinotify.IN_MODIFY={1}" \
870+
" ==> {2}".format(
871+
mask,
872+
mask_and_modify,
873+
mask-mask_and_modify))
874+
mask -= mask_and_modify
867875
excludes = _preprocess_excludes( config[path].get('exclude') )
868876
if isinstance(mask, list):
869877
r_mask = 0
@@ -881,7 +889,32 @@ def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml
881889
rec = False
882890
auto_add = False
883891

892+
if os.path.isfile(path) and not wm.get_wd(path):
893+
# We were not previously watching this file generate a fake
894+
# IN_CREATE to announce this fact. We'd like to only generate
895+
# CREATE events when files are created, but we can't actually
896+
# watch files that don't exist yet (not with inotify anyway).
897+
# The kernel would rather be watching directories anyway.
898+
#
899+
# You might worry we'll get lots of spurious IN_CREATEs when
900+
# the database is cleared or at startup or whatever. We
901+
# actually watch everyhthing from config at startup anyway; so
902+
# we avoid these fake IN_CREATE events at startup. They only
903+
# happen when we add a watch during update, which means the
904+
# file really is new since the last time we thought about it
905+
# (aka the last time we ran the process() function).
906+
_, abspath, dirname, basename = cm.format_path(path)
907+
config_path = config['paths'][0]
908+
pulsar_config = config_path[config_path.rfind('/') + 1:len(config_path)]
909+
fake_sub = { 'change': 'IN_CREATE',
910+
'path': abspath, # goes to object_path in splunk
911+
'tag': dirname, # goes to file_path in splunk
912+
'name': basename, # goes to file_name in splunk
913+
'pulsar_config': pulsar_config}
914+
ret.append(fake_sub)
915+
884916
wm.watch(path, mask, rec=rec, auto_add=auto_add, exclude_filter=excludes)
917+
885918
dt.fin()
886919
dt.mark('prune_watches')
887920
wm.prune()

0 commit comments

Comments
 (0)