Commits

JanKanis  committed ad685a0 Draft

Fix lots of bugs, the first test passes now!

- Read constants as unsigned instead of signed ints
- Correctly set errno/filename on OSError-derived exceptions
- Attempt to handle ConcurrenFilesystemModificationErrors correctly
- handle _Descriptor removal better
- rename _Watch to _PathWatch
- return the normalized string from PathWatcher.add
- rename some internal methods
- Don't forget to watch for IN_IGNORED events on path elements
- fix a number of bugs in the generation of IN_PATH_* events
- fix the first test and part of the second test

  • Participants
  • Parent commits a3efbf1
  • Branches pathwatcher

Comments (0)

Files changed (4)

File inotify/_inotify.c

 #define bit_name(x) {x, #x}
 
 static struct {
-	int bit;
+	unsigned int bit;
 	const char *name;
 	PyObject *pyname;
 } bit_names[] = {
 
 static void define_const(PyObject *dict, const char *name, uint32_t val)
 {
-	PyObject *pyval = PyLong_FromLong(val);
+	PyObject *pyval = PyLong_FromUnsignedLong(val);
 	PyObject *pyname = PyUnicode_FromString(name);
 
 	if (!pyname || !pyval)

File inotify/pathresolver.py

 
   '''
   global _symlinkmax
-  if not _symlinkmax is None:
+  if _symlinkmax is not None:
     return _symlinkmax
 
   try:
 
 
 class InvalidPathError (OSError):
-    def __init__(self, msg, path, *args):
+    def __init__(self, msg, path, *args, errno=None):
         self.filename = path
+        self.errno = errno
+        if errno:
+            self.strerror = os.strerror(errno)
         OSError.__init__(self, msg, *args)
 
 class SymlinkLoopError (InvalidPathError):
     def __init__(self, path, *args):
         msg = "Path not valid: The symlink at '{}' forms a symlink loop".format(path)
-        InvalidPathError.__init__(self, msg, path, errno.ELOOP, *args)
+        InvalidPathError.__init__(self, msg, path, errno=errno.ELOOP, *args)
 
 class ConcurrentFilesystemModificationError (InvalidPathError):
     def __init__(self, path, *args):
     class FileNotFoundError (InvalidPathError, FileNotFoundError):
         def __init__(self, path, *args):
             InvalidPathError.__init__(self, fnf_msg.format(path), path,
-                                          errno.ENOENT, *args)
+                                          errno=ENOENT, *args)
 
     class NotADirectoryError (InvalidPathError, NotADirectoryError):
         def __init__(self, path, *args):
             InvalidPathError.__init__(self, nad_msg.format(path), path,
-                                          errno.ENOTDIR, *args)
+                                          errno=errno.ENOTDIR, *args)
 
 else:
     class FileNotFoundError (InvalidPathError):
-        def fnf__init__(self, path, *args):
+        def __init__(self, path, *args):
             InvalidPathError.__init__(self, fnf_msg.format(path), path,
-                                          errno.ENOENT, *args)
+                                          errno=errno.ENOENT, *args)
 
     class NotADirectoryError (InvalidPathError):
         def __init__(self, path, *args):
             InvalidPathError.__init__(self, nad_msg.format(path), path,
-                                          errno.ENOTDIR, *args)
+                                          errno=errno.ENOTDIR, *args)
 
 
 

File inotify/pathwatcher.py

 from . import _inotify
 from .in_constants import constants, decode_mask, event_properties
 from .watcher import NoFilesException, _make_getter
-from .pathresolver import SymlinkLoopError
+from .pathresolver import SymlinkLoopError, ConcurrentFilesystemModificationError
 
 globals().update(constants)
 
         self.fd = _inotify.init()
         self._watchdescriptors = {}
         self._paths = {}
-        self._buffer = []
-        self._reread_required = None
+        self._pending_watch_removes = 0
         self._reconnect = []
+        self.events = []
 
     def fileno(self):
         '''Return the file descriptor this watcher uses.  Useful for passing to select
         relative to the working directory at the time of the
         operation.
 
+        Returns the normalized path string, that can be used as key
+        for received events.
+
         '''
         path = PosixPath(path)
         if path in self._paths:
             self._paths[path].update(mask=mask, remember_curdir=remember_curdir)
             return
-        self._paths[path] = _Watch(self, path, mask, remember_curdir)
+        self._paths[path] = _PathWatch(self, path, mask, remember_curdir)
+        return str(path)
 
     def _createwatch(self, path, name, mask, callback):
         'create a new _Descriptor for path'
 
         '''
         del self._watchdescriptors[descriptor.wd]
+        self._pending_watch_removes -= 1
 
-    def _signal_empty_descriptor(self, descriptor):
+    def _signal_empty_watch(self, descriptor):
         '''This method is called from a _Descriptor instance if it no longer has any
         callbacks attached to it and so should be deleted. This means
         inotify.read may need to be called again to catch the corresponding
         IN_IGNORE event.
         '''
-        _inotify.remove_watch(self.fd, descriptor.wd)
-        if not self._reread_required is None:
-            self._reread_required = True
+        if descriptor.active:
+            _inotify.remove_watch(self.fd, descriptor.wd)
+        self._pending_watch_removes += 1
 
     def _reconnect_required(self, watch):
         '''Register a watch to be .reconnect()'ed after event processing is finished'''
         if not len(self._watchdescriptors):
             raise NoFilesException("There are no files to watch")
 
-        # If a watch descriptor is removed during event processing, we want to
-        # call inotify.read again to catch and process the IN_IGNORE
-        # events. What we need is a dynamically scoped variable that can be set
-        # somewhere down the call stack during the event processing. Since
-        # Python doesn't have dynamic variables we use an instance variable and
-        # check that it is in the correnct state. This is from a design point a
-        # bit unfortunate as this variable really only has a meaning while the
-        # call to Watcher.read is active on the stack.
-        assert self._reread_required is None
-        events = []
-        try:
-            while self._reread_required in (None, True):
-                self._reread_required = False
-                for evt in _inotify.read(self.fd, bufsize):
-                    if evt.wd == -1:
-                        events.append(self._handle_descriptorless_event(evt))
-                    else:
-                        for e in self._watchdescriptors[evt.wd].handle_event(evt):
-                            events.append(e)
-        finally:
-            self._reread_required = None
-        for w in self._reconnect:
+        # We call _inotify.read once at first. If we are expecting an
+        # IN_IGNORE, we read it again until we get all pening
+        # IN_IGNOREs. Secondly, if a
+        # ConcurrentFilesystemModificationError is thrown in
+        # pathresolver, that may be because other path elements have
+        # changed and there are still unread events indicating
+        # this. So if after reconnecting there are again _PathWatches
+        # that need reconnecting, we also call _inotify.read
+        # again. Continue this loop until there are no more pending
+        # IN_IGNORE events and no more _PathWatches awaiting
+        # reconnection.
+        if self.events:
+            e, self.events = self.events, []
+            return e
+        self._do_reconnect()
+        do1 = True
+        while do1 or self._pending_watch_removes > 0 or self._reconnect:
+            do1 = False
+            do2 = True
+            while do2 or self._pending_watch_removes > 0:
+                do2 = False
+                self.events.extend(self._read_events(bufsize))
+            self._do_reconnect()
+        events, self.events = self.events, []
+        return events
+
+    def _read_events(self, bufsize):
+        for evt in _inotify.read(self.fd, bufsize):
+            if evt.wd == -1:
+                eventiter = self._handle_descriptorless_event(evt)
+            else:
+                eventiter = self._watchdescriptors[evt.wd].handle_event(evt)
+            for e in eventiter:
+                yield e
+
+    def _do_reconnect(self):
+        r = self._reconnect
+        # Do not just clear the list, but replace it, because the
+        # reconnect call can cause new pathwatches to require
+        # reconnection.
+        self._reconnect = []
+        for w in r:
             w.reconnect()
-        del self._reconnect[:]
-        return events
 
     def _handle_descriptorless_event(self, evt):
         event = Event(evt, None)
         if event.q_overflow:
             for w in self._paths.values():
                 w._queue_overflow()
-        return event
+        yield event
 
     def update(self, path, newmask=0, remember_curdir=None):
         '''Replace the mask for the watch on path by the new mask. If
 
 syntheticevent = namedtuple('syntheticevent', 'mask cookie name wd')
 
-class _Watch (object):
+class _PathWatch (object):
     root = PosixPath('/')
     curdir = PosixPath('.')
     parentdir = PosixPath('..')
     
     def __init__(self, watcher, path, mask, remember_curdir=None):
         self.watcher = watcher
-        self.path = PosixPath(path)
+        self.path = path
         self.mask = mask
         self.links = []
         # watch_complete values:
         if remember_curdir is True:
             self.cwd = PosixPath.cwd()
         elif remember_curdir is False:
-            self.cwd = _Watch.curdir
+            self.cwd = _PathWatch.curdir
          
     def reconnect(self):
         assert self.watch_complete == 0
             for path, rest in pathsiter:
                 if linkcount[0] > symlinkmax:
                     raise pathresolver.SymlinkLoopError(str(self.path))
-                if rest == _Watch.curdir:
+                if rest == _PathWatch.curdir:
                     break
                 self.add_path_element(path, rest, linkcount[0])
+        except ConcurrentFilesystemModificationError:
+            # leave watch_complete at 0, we may need to read more events first
+            return
         except OSError as e:
             if e.errno in (errno.ENOTDIR, errno.EACCES, errno.ENOENT, errno.ELOOP):
                 # Basically any kind of path fault. Mark the reconnect as
             else:
                 raise
                 
-        assert rest == _Watch.curdir
+        assert rest == _PathWatch.curdir
         self.add_leaf(path)
         self.watch_complete = 2
 
     def add_leaf(self, path):
-        self.links.append(_Link(len(self.links), self, self.mask, path, None, _Watch.curdir, None))
+        self.links.append(_Link(len(self.links), self, self.mask, path, None, _PathWatch.curdir, None))
 
     def add_path_element(self, path, rest, linkcount):
-        assert rest != _Watch.curdir
-        mask = IN_UNMOUNT | IN_ONLYDIR | IN_EXCL_UNLINK
+        assert rest != _PathWatch.curdir
+        mask = IN_UNMOUNT | IN_ONLYDIR | IN_EXCL_UNLINK | IN_IGNORED
         if rest.parts[0] == '..':
             mask |= IN_MOVE_SELF | IN_DELETE_SELF
             name = None
         self.links.append(_Link(len(self.links), self, mask, path, name, rest, linkcount))
         
     _eventmap = {IN_MOVE | IN_MOVE_SELF: IN_PATH_MOVED,
-                 IN_DELETE | IN_DELETE_SELF: IN_PATH_DELETE,
+                 IN_DELETE | IN_DELETE_SELF | IN_IGNORED: IN_PATH_DELETE,
                  IN_CREATE: IN_PATH_CREATE,
                  IN_UNMOUNT: IN_PATH_UNMOUNT,
                 }
     def handle_event(self, event, link):
         if self.watch_complete == 2 and link.idx == len(self.links) - 1:
-            assert event.mask & self.mask
-            yield Event(event, str(self.path))
+            assert event.mask & (self.mask | IN_IGNORED)
+            if event.mask & IN_IGNORED:
+                self._poplinks_from(-1)
+            if event.mask & self.mask:
+                yield Event(event, str(self.path))
         else:
             i = link.idx
             if event.mask & (IN_MOVE | IN_DELETE | IN_CREATE):
                 i += 1
-            if i >= len(self.links):
-                return
-            for p in self.links[i:]:
-                p.remove()
-            del self.links[i:]
-            self.watch_complete = 0
-            self.watcher._reconnect_required(self)
-            name = str(link.path[link.rest[0:1]])
-            for m, t in _Watch._eventmap.items():
+            if i < len(self.links):
+                reconnect = event.mask & (IN_MOVED_TO|IN_CREATE|IN_MOVE_SELF|IN_DELETE_SELF)
+                self._poplinks_from(i, reconnect=reconnect)
+            if not event.mask & (IN_MOVE_SELF | IN_DELETE_SELF | IN_IGNORED):
+                name = str(PosixPath(link.path)[link.name])
+            else:
+                name = link.path
+            for m, t in _PathWatch._eventmap.items():
                 if event.mask & m:
                     evttype = t
-            if event.mask & IN_ISDIR:
-                evttype |= IN_ISDIR
+            evttype |= (event.mask & IN_ISDIR)
             yield Event(syntheticevent(mask=evttype, cookie=0, name=name, wd=event.wd), str(self.path))
 
-    def _queue_overflow(self):
-        for p in self.links[1:]:
+    def _poplinks_from(self, startidx, reconnect=True):
+        for p in self.links[startidx:]:
             p.remove()
+        del self.links[startidx:]
         self.watch_complete = 0
         self.watcher._reconnect_required(self)
 
+    def _queue_overflow(self):
+        self._poplinks_from(1)
+
     def update(self, newmask=0, remember_curdir=None):
         self._update_curdir(remember_curdir)
         if not newmask:
             oldlink.remove()
 
     def remove(self):
-        for p in self.links:
-            p.remove()
-        del self.links[:]
-        self.watch_complete = 0
+        self._poplinks_from(0)
 
-    def __str__(self):
-        return '<_Watch for {}>'.format(str(self.path))
+    def __repr__(self):
+        return '<_PathWatch for {}>'.format(str(self.path))
 
 
 class _Link (object):
                  'watch',
                  'mask',
                  'path',
+                 'name',
                  'rest',
                  'linkcount',
                  'wd',
         self.watch = watch
         self.mask = mask
         self.path = str(path)
+        self.name = name
         self.rest = str(rest)
         self.linkcount = linkcount
         self.wd = watch.watcher._createwatch(path, name, mask, self.handle_event)
         self.wd.remove_callback(self.name, self.handle_event)
         self.wd = None
 
-    def _fullname(self):
-        if self.name:
-            return str(self.path[self.name])
-        return str(self.path)
+    def printname(self):
+        return self.path+':'+str(PosixPath(self.rest).parts[0:1])
 
-    def __str__(self):
-        return '<_Link for {}>'.format(self._fullname())
+    def __repr__(self):
+        return '<_Link for {}>'.format(self.printname())
     
 
 # python 2 compatibility
                  'wd',
                  'mask',
                  'callbacks',
+                 'active',
                 )
 
     def __init__(self, watcher, wd):
         self.watcher = watcher
         self.wd = wd
         self.mask = 0
+        self.active = True
         # callbacks is indexed by name to improve speed and because we
         # can. Indexing by name and mask would be faster but would be more
         # cumbersome to implement.
         # If the callback is to a path link element, mask will include
         # IN_ONLYDIR so we could remove that here. However the IN_ONLYDIR flag
         # can not be returned by inotify events so keeping it in does no harm.
+        assert self.active
         assert isinstance(name, (basestring, NoneType))
         assert name is None or not '/' in name
         self.mask |= mask
         if not self.callbacks[name]:
             del self.callbacks[name]
         if not self.callbacks:
-            self.watcher._signal_empty_descriptor(self)
+            self.watcher._signal_empty_watch(self)
 
     def handle_event(self, event):
-        name = PosixPath(event.name) if not event.name is None else None
+        if event.mask & IN_IGNORED:
+            self.active = False
         # The list of callbacks can be modified from the handlers, so make a
         # copy.
-        for m, c in list(self.callbacks.get(name, ())):
-            if not event.mask & m:
+        for m, c in list(self.callbacks.get(event.name, ())):
+            if not event.mask & (m | IN_IGNORED):
                 continue
             for e in c(event):
                 yield e
         if event.mask & IN_IGNORED:
             assert not self.callbacks
+            assert not self.active
             self.watcher._removewatch(self)
         
-    def __str__(self):
-        names = ', '.join(c.__self__._fullname() for c in l for l in self.callbacks.values())
-        return '<_Descriptor for wd {}: {}>'.format(self.wd, ', '.join(names))
+    def __repr__(self):
+        names = ', '.join(c.__self__.printname() for lst in self.callbacks.values() for m, c in lst)
+        return '<_Descriptor for wd {}: {}>'.format(self.wd, names)
 
 

File test/testpath.py

 
 import inotify
 
+globals().update(inotify.constants)
+
 print("\nTesting inotify module from", inotify.__file__)
 
 
+"""
+more needed concurrency tests:
+
+- remember_curdir==False and current directory is deleted
+
+- remember_curdir==True and current directory is deleted
+
+- path element directory foo is deleted and replaced with a file, but
+  the creation event has not been read yet. Make sure no infinite loop
+  occurs due to ConcurrentFilesystemModificationError, and also make
+  sure we wait for the creation event and restore the watch.
+
+- Can _inotify.read be interrupted without losing events or losing
+  consistency?
+
+"""
+
+
 # from IPython.terminal.ipapp import TerminalIPythonApp
 from IPython import embed as ipythonembed
 # ipapp = TerminalIPythonApp.instance()
 
 
 def test_open(w):
-  mask = inotify.IN_OPEN | inotify.IN_CLOSE
+  mask = IN_OPEN | IN_CLOSE
   w.add('testfile', mask)
   watch = w._paths[P('testfile')]
-  import pdb; pdb.set_trace()
 
   assert len(watch.links) == 2
   assert watch.path == P('testfile')
   assert watch.watcher == w
   assert watch.mask == mask
-  link = watch.links[0]
-  assert link.idx == 0
-  assert link.path == str(P.getcwd())
-  assert link.rest == 'testfile'
-  link = watch.links[1]
-  assert link.idx == 1
-  assert link.path == str(P.getcwd()['testfile'])
-  assert link.rest == P('.')
-  linkmask = mask | inotify.IN_MOVE | inotify.IN_DELETE
-  assert link.mask == linkmask
-  assert link.watch == watch
-  wd = link.wd
-  assert wd.callbacks[None] == [(linkmask, link.handle_event)]
-  assert wd.mask == linkmask
+
+  link1 = watch.links[0]
+  assert link1.idx == 0
+  assert link1.path == str(P.cwd())
+  assert link1.rest == 'testfile'
+  assert link1.mask == IN_UNMOUNT | IN_ONLYDIR | IN_EXCL_UNLINK | IN_IGNORED | IN_MOVE | IN_DELETE | IN_CREATE
+  assert link1.watch == watch
+  wd = link1.wd
+  assert wd.callbacks['testfile'] == [(link1.mask, link1.handle_event)]
+  assert wd.mask == link1.mask
   assert wd.watcher == w
   watchdesc = wd.wd
   assert w._watchdescriptors[watchdesc] == wd
-  assert w._paths[P('testfile')] == watch
+
+  link2 = watch.links[1]
+  assert link2.idx == 1
+  assert link2.path == str(P.cwd()['testfile'])
+  assert link2.rest == '.'
+  assert link2.mask == IN_OPEN | IN_CLOSE
+  assert link2.watch == watch
+  wd = link2.wd
+  assert wd.callbacks[None] == [(link2.mask, link2.handle_event)]
+  assert wd.mask == link2.mask
+  assert wd.watcher == w
+  watchdesc = wd.wd
+  assert w._watchdescriptors[watchdesc] == wd
   
   open('testfile').close()
   ev1, ev2 = w.read(block=False)
   assert ev1.open
   assert ev2.close
   assert ev2.close_nowrite
+
+  os.remove('testfile')
+  ev3 = w.read(block=False)[0]
+  assert ev3.path_delete and ev3.path_changed
+  assert ev3.path == 'testfile'
+  assert P(ev3.name).parts[-1] == 'testfile'
+
   w.close()
 
 
   os.symlink('testfile', 'link3')
   os.symlink('link3', 'link2')
   os.symlink('link2', 'link1')
-  w.add('link1', inotify.IN_OPEN)
+  w.add('link1', inotify.IN_OPEN, remember_curdir=False)
   watch = w._paths[P('link1')]
-  assert len(watch.links) == 4
-  w1, w2, w3, wt  = watch.links
-  assert [str(w.path[w.name]) for w in (w1, w2, w3)] == 'link1 link2 link3'.split()
-  assert (wt.path, wt.name) == (P('testfile'), None)
-  assert w1.wd == w2.wd == w3.wd
+  assert len(watch.links) == 5
+  w1, w2, w3, w4, wt  = watch.links
+  assert [w.name for w in (w1, w2, w3, w4)] == 'link1 link2 link3 testfile'.split()
+  assert (wt.path, wt.name) == ('testfile', None)
+  assert w1.wd == w2.wd == w3.wd == w4.wd
   desc = w1.wd
   linkmask = inotify.IN_MOVE | inotify.IN_DELETE | inotify.IN_CREATE | inotify.IN_ONLYDIR
   assert desc.callbacks[P('link1')] == [(linkmask, w1.handle_event)]