1. JanKanis
  2. python-inotify

Commits

JanKanis  committed fa63fef Draft

lots of changes:
- Some more changes to the package structure
- added more documentation
- improve handling of the current directory in PathWatcher
- fix handling of q_overflow and other watchdescriptor-less events
- add PathWatcher.__del__ method
- add __slots__ to some PathWatcher constituents

  • Participants
  • Parent commits 4fb237d
  • Branches pathwatcher

Comments (0)

Files changed (9)

File inotify/__init__.py

View file
  • Ignore whitespace
 
 __author__ = "Jan Kanis <jan.code@jankanis.nl>"
 
-
-procfs_path = '/proc/sys/fs/inotify'
+_procfs_path = '/proc/sys/fs/inotify'
 
 def _read_procfs_value(name):
     def read_value():
         try:
-            return int(open(procfs_path + '/' + name).read())
+            return int(open(_procfs_path + '/' + name).read())
         except OSError as err:
             return None
 
 max_user_watches = _read_procfs_value('max_user_watches')
 
 
-from .constants import *
-from .watcher import *
-from .pathwatcher import *
+from . import _inotify as inotify
+from .in_constants import constants, event_properties, watch_properties, decode_mask
+from .watcher import Watcher, AutoWatcher, Threshold, NoFilesException
+from .pathwatcher import PathWatcher
+from .pathresolver import InvalidPathError, SymlinkLoopError, \
+        ConcurrentFilesystemModificationError, FileNotFoundError, NotADirectoryError
+globals().update(constants)
+
+

File inotify/_inotify.c

View file
  • Ignore whitespace
 	pos = 0;
 	
 	while (pos < nread) {
+		// FIXME: If there is more to read in the fd than the buffer size,
+		// we may read a partial struct inotify_event. The current code
+		// does not handle this and can segfault if it tries to read an
+		// event past the end of the buffer.
 		struct inotify_event *in = (struct inotify_event *) (buf + pos);
 		struct event *evt;
 		PyObject *obj;

File inotify/constants.py

  • Ignore whitespace
-# constants.py - submodule containing inotify constants and descriptions
-
-# Copyright 2006 Bryan O'Sullivan <bos@serpentine.com>
-# Copyright 2012-2013 Jan Kanis <jan.code@jankanis.nl>
-
-# This library is free software; you can redistribute it and/or modify
-# it under the terms of version 2.1 of the GNU Lesser General Public
-# License, incorporated herein by reference.
-
-# Additionally, code written by Jan Kanis may also be redistributed and/or 
-# modified under the terms of any version of the GNU Lesser General Public 
-# License greater than 2.1. 
-
-import functools, operator
-import ._inotify
-
-constants = {k: v for k,v in _inotify.__dict__.items() if k.startswith('IN_')}
-
-inotify_builtin_constants = functools.reduce(operator.or_, constants.values())
-IN_LINK_CHANGED = 1
-while IN_LINK_CHANGED < inotify_builtin_constants:
-    IN_LINK_CHANGED <<= 1
-constants['IN_LINK_CHANGED'] = IN_LINK_CHANGED
-
-globals().update(constants)
-
-
-# Inotify flags that can be specified on a watch and can be returned in an event
-_inotify_props = {
-    'access': 'File was accessed',
-    'modify': 'File was modified',
-    'attrib': 'Attribute of a directory entry was changed',
-    'close': 'File was closed',
-    'close_write': 'File was closed after being written to',
-    'close_nowrite': 'File was closed without being written to',
-    'open': 'File was opened',
-    'move': 'Directory entry was renamed',
-    'moved_from': 'Directory entry was renamed from this name',
-    'moved_to': 'Directory entry was renamed to this name',
-    'create': 'Directory entry was created',
-    'delete': 'Directory entry was deleted',
-    'delete_self': 'The watched directory entry was deleted',
-    'move_self': 'The watched directory entry was renamed',
-    'link_changed': 'The named path no longer resolves to the same file',
-    }
-
-# Inotify flags that can only be returned in an event
-event_properties = {
-    'unmount': 'Directory was unmounted, and can no longer be watched',
-    'q_overflow': 'Kernel dropped events due to queue overflow',
-    'ignored': 'Directory entry is no longer being watched',
-    'isdir': 'Event occurred on a directory',
-    }
-event_properties.update(_inotify_props)
-
-# Inotify flags that can only be specified in a watch
-watch_properties = {
-    'dont_follow': "Don't dereference pathname if it is a symbolic link",
-    'excl_unlink': "Don't generate events after the file has been unlinked",
-    'onlydir': "Only watch pathname if it is a directory",
-    'oneshot': "Monitor pathname for one event, then stop watching it",
-    'mask_add': "Add this mask to the existing mask instead of replacing it",
-    }
-watch_properties.update(_inotify_props)
-
-
-def decode_mask(mask):
-    d = _inotify.decode_mask(mask & inotify_builtin_constants)
-    if mask & inotify.IN_LINK_CHANGED:
-        d.append('IN_LINK_CHANGED')
-    return d

File inotify/in_constants.py

View file
  • Ignore whitespace
+# constants.py - submodule containing inotify constants and descriptions
+
+# Copyright 2006 Bryan O'Sullivan <bos@serpentine.com>
+# Copyright 2012-2013 Jan Kanis <jan.code@jankanis.nl>
+
+# This library is free software; you can redistribute it and/or modify
+# it under the terms of version 2.1 of the GNU Lesser General Public
+# License, incorporated herein by reference.
+
+# Additionally, code written by Jan Kanis may also be redistributed and/or 
+# modified under the terms of any version of the GNU Lesser General Public 
+# License greater than 2.1. 
+
+import functools, operator
+from . import _inotify
+
+constants = {k: v for k,v in _inotify.__dict__.items() if k.startswith('IN_')}
+
+
+# These constants are not part of the linux inotify api, they are
+# added by this module for use in PathWatcher.
+inotify_builtin_constants = functools.reduce(operator.or_, constants.values())
+IN_PATH_MOVED = 1
+while IN_PATH_MOVED <= inotify_builtin_constants:
+    IN_PATH_MOVED <<= 1
+IN_PATH_CREATE = IN_PATH_MOVED << 1
+IN_PATH_DELETE = IN_PATH_MOVED << 2
+IN_PATH_UNMOUNT = IN_PATH_MOVED << 3
+IN_PATH_CHANGED = IN_PATH_MOVED | IN_PATH_CREATE | IN_PATH_DELETE | IN_PATH_UNMOUNT
+
+constants.update(dict(IN_PATH_MOVED=IN_PATH_MOVED,
+                      IN_PATH_CREATE=IN_PATH_CREATE,
+                      IN_PATH_DELETE=IN_PATH_DELETE,
+                      IN_PATH_UNMOUNT=IN_PATH_UNMOUNT,
+                      IN_PATH_CHANGED=IN_PATH_CHANGED))
+
+
+# Inotify flags that can be specified on a watch and can be returned in an event
+_inotify_properties = {
+    'access': 'File was accessed',
+    'modify': 'File was modified',
+    'attrib': 'Attribute of a directory entry was changed',
+    'close': 'File was closed',
+    'close_write': 'File was closed after being written to',
+    'close_nowrite': 'File was closed without being written to',
+    'open': 'File was opened',
+    'move': 'Directory entry was renamed',
+    'moved_from': 'Directory entry was renamed from this name',
+    'moved_to': 'Directory entry was renamed to this name',
+    'create': 'Directory entry was created',
+    'delete': 'Directory entry was deleted',
+    'delete_self': 'The watched directory entry was deleted',
+    'move_self': 'The watched directory entry was renamed',
+    'path_changed': 'The named path no longer resolves to the same file',
+    'path_moved': 'A component of path was moved away',
+    'path_create': 'A previously nonexisting component along the path was created',
+    'path_delete': 'A component of path was deleted',
+    'path_unmount': 'A component of path was unmounted',
+    }
+
+# Inotify flags that can only be returned in an event
+event_properties = {
+    'unmount': 'Directory was unmounted, and can no longer be watched',
+    'q_overflow': 'Kernel dropped events due to queue overflow',
+    'ignored': 'Directory entry is no longer being watched',
+    'isdir': 'Event occurred on a directory',
+    }
+event_properties.update(_inotify_properties)
+
+# Inotify flags that can only be specified in a watch
+watch_properties = {
+    'dont_follow': "Don't dereference pathname if it is a symbolic link",
+    'excl_unlink': "Don't generate events after the file has been unlinked",
+    'onlydir': "Only watch pathname if it is a directory",
+    'oneshot': "Monitor pathname for one event, then stop watching it",
+    'mask_add': "Add this mask to the existing mask instead of replacing it",
+    }
+watch_properties.update(_inotify_properties)
+
+
+combined_masks = set('IN_ALL_EVENTS IN_MOVE IN_CLOSE IN_PATH_CHANGED'.split())
+def decode_mask(mask):
+    return [name for name, m in constants.items() if not name in combined_masks and m & mask]
+

File inotify/pathresolver.py

View file
  • Ignore whitespace
 # version 2.1 of the License, or (at your option) any later version.
 
 
+"""
+This module contains a few functions that help traverse paths with
+symlinks.
+
+`resolve_symlinks` is a generator that yields pairs of paths, with one
+yield for each path element that is traversed to reach the final
+target of a path. This includes path elements and paths from symlinks.
+
+`resolve_path` is a wrapper around `resolve_symlinks` that takes a
+single path as an argument and sets up the other arguments to
+`resolve_symlinks`.
+
+`get_symlinkmax` is a function that determines the maximum number of
+symlinks the system will traverse in a path.
+
+Note: this module forms part of python-inotify, but is considered an
+internal module. As such there are no stability guarantees regarding
+the api's of these functions.
+"""
+
+
 __author__ = "Jan Kanis <jan.code@jankanis.nl>"
 
 
-import os, errno
+import sys, os, errno
 import tempfile, shutil
 from pathlib import PosixPath
 
 
 
 def resolve_symlink(location, link_contents, active_links, known_links, linkcounter):
-    '''Recursively resolve a symlink to the file or directory it ultimately points
-    to. This function handles an unlimited number of symlinks, and
-    correctly detects symlink loops. All path parameters should be given as
-    pathlib.PosixPath instances.
+    '''Recursively resolve a symlink (or another path) to the file or
+    directory it ultimately points to. This function handles an
+    unlimited number of symlinks, and correctly detects symlink
+    loops. All path parameters should be given as pathlib.PosixPath
+    instances.
 
     location: The directory in which the currently to be resolved link resides.
 
 fnf_msg = "Path not valid: '{}' does not exist"
 nad_msg = "Path not valid: '{}' is not a directory"
 
-if sys.version_info[0] >= 3:
+if sys.version_info >= (3, 3):
     class FileNotFoundError (InvalidPathError, FileNotFoundError):
         def __init__(self, path, *args):
             InvalidPathError.__init__(self, fnf_msg.format(path), path,

File inotify/pathwatcher.py

View file
  • Ignore whitespace
 The inotify subsystem provides an efficient mechanism for file status
 monitoring and change notification.
 
-The Watcher class hides the low-level details of the inotify
-interface, and provides a Pythonic wrapper around it.  It generates
-events that provide somewhat more information than raw inotify makes
-available.
-
-The AutoWatcher class is more useful, as it automatically watches
-newly-created directories on your behalf.'''
+The PathWatcher class is a wrapper over the low-level inotify
+interface. The exposed interface is path based rather than filesystem
+inode based such as the low level inotify and the Watcher class
+are. This means that if you watch a path /symlink where symlink links
+to myfile, PathWatcher will also generate an event if symlink is
+removed or changed, where native inotify and the Watcher class will
+not.
+'''
 
 __author__ = "Jan Kanis <jan.code@jankanis.nl>"
 
 
 from pathlib import PosixPath
 
-from . import constants, pathresolver
+from . import pathresolver
 from . import _inotify
-from .constants import decode_mask, event_properties, watch_properties
+from .in_constants import constants, decode_mask, event_properties
 from .watcher import NoFilesException, _make_getter
 from .pathresolver import SymlinkLoopError
 
-globals().update(constants.constants)
+globals().update(constants)
 
 
 
 
     The following fields are available:
 
-        mask: event mask, indicating what kind of event this is
+    mask: event mask, indicating what kind of event this is
 
-        cookie: rename cookie, if a rename-related event
+    cookie: rename cookie, if a rename-related event
 
-        fullpath: the full path of the file or directory to which the event
-        occured. If this watch has more than one path, a path is chosen
-        arbitrarily.
+    path: The path of the watched file/directory
 
-        paths: a list of paths that resolve to the watched file/directory
+    name: name of the directory entry to which the event occurred. If
+    the event is of an IN_PATH_* type, name contains the full path to
+    the path element that changed. name may be None if the event did
+    not happen to a directory entry.
 
-        name: name of the directory entry to which the event occurred
-        (may be None if the event happened to a watched directory)
-
-        wd: watch descriptor that triggered this event
+    raw: The underlying inotify event. In the case of IN_PATH_*
+    events, the raw event is constructed and not from the underlying
+    inotify system.
 
     '''
 
     __slots__ = (
         'cookie',
         'mask',
+        'path',
         'name',
         'raw',
-        'path',
         )
 
-    @property
-    def fullpath(self):
-        if self.name:
-            return os.path.join(self.path, self.name)
-        return self.path
-
     def __init__(self, raw, path):
         self.raw = raw
         self.path = path
         r += ')'
         return r
 
-for name, doc in _event_props.items():
+for name, doc in event_properties.items():
     setattr(Event, name, property(_make_getter(name, doc), doc=doc))
 
 
 
-
 class PathWatcher (object):
     '''This watcher can watch file system paths for changes. Unlike the standard
     inotify system, this watcher also watches for any changes in the meaning of
         '''
         return self.fd
 
-    def add(self, path, mask):
-        '''Add a watch with the given mask for path. If the path is already watched,
-        update the mask according to Watcher.update_mask.
+    def add(self, path, mask, remember_curdir=None):
+        '''Add a watch with the given mask for path. If the path is
+        already watched, update the mask according to
+        Watcher.update_mask. If remember_curdir is set to True, the
+        watch will store the path of the current working directory, so
+        that future chdir operations don't change the path. However
+        the current path is not watched, so if the current directory
+        is moved the meaning of watched paths may change
+        undetected. If it is False, relative paths are always resolved
+        relative to the working directory at the time of the
+        operation.
 
         '''
         pth = PosixPath(path)
         if pth in self._paths:
-            self._paths[path].update_mask(mask)
+            self._paths[path].update(mask=mask, remember_curdir=remember_curdir)
             return
-        self._paths[path] = _Watch(self, path, mask)
+        self._paths[path] = _Watch(self, path, mask, remember_curdir)
 
     def _createwatch(self, path, name, mask, callback):
         'create a new _Descriptor for path'
             while self._reread_required in (None, True):
                 self._reread_required = False
                 for evt in _inotify.read(self.fd, bufsize):
-                    for e in self._watchdescriptors[evt.wd].handle_event(evt):
-                        events.append(e)
+                    if evt.wd == -1:
+                        events.append(self._handle_descriptorless_event(evt))
+                    else:
+                        for e in self._watchdescriptors[evt.wd].handle_event(evt):
+                            events.append(e)
         finally:
             self._reread_required = None
         for w in self._reconnect:
         del self._reconnect[:]
         return events
 
-    def update_mask(self, path, newmask):
-        '''Replace the mask for the watch on path by the new mask. If IN_MASK_ADD is
-        set, add the new mask into the existing mask.
+    def _handle_descriptorless_event(self, evt):
+        event = Event(evt, None)
+        if event.q_overflow:
+            for w in self._paths.values():
+                w._queue_overflow()
+        return event
+
+    def update(self, path, newmask=0, remember_curdir=None):
+        '''Replace the mask for the watch on path by the new mask. If
+        IN_MASK_ADD is set, add the new mask into the existing
+        mask. If remember_curdir is set to True, save the current
+        working directory in the watch.
         '''
-        self._paths[PosixPath(path)].update_mask(newmask)
+        self._paths[PosixPath(path)].update(newmask, remember_curdir)
 
     def remove(self, path):
         '''Remove watch on the given path.'''
     def close(self):
         'close this watcher instance'
         os.close(self.fd)
+        self._watchdescriptors.clear()
+        self._paths.clear()
+        self.fd = None
 
+    def __del__(self):
+        if self.fd is not None:
+            self.close()
+            
+
+syntheticevent = namedtuple('syntheticevent', 'mask cookie name wd')
 
 class _Watch (object):
     root = PosixPath('/')
-    cwd = PosixPath('.')
+    curdir = PosixPath('.')
     parentdir = PosixPath('..')
     
-    def __init__(self, watcher, path, mask):
+    def __init__(self, watcher, path, mask, remember_curdir=None):
         self.watcher = watcher
         self.path = PosixPath(path)
-        self.cwd = PosixPath.cwd()
         self.mask = mask
         self.links = []
         self.watch_complete = False
+        self._update_curdir(True if remember_curdir is None else remember_curdir)
         self.reconnect()
 
+    def _update_curdir(remember_curdir):
+        if remember_curdir is True:
+            self.cwd = PosixPath.cwd()
+        elif remember_curdir is False:
+            self.cwd = _Watch.curdir
+         
     def reconnect(self):
-        path = _Watch.cwd
+        assert not self.watch_complete
+        path = self.cwd
         rest = self.path
         if self.links:
             path = self.links[-1].path
             for path, rest in pathresolver.resolve_symlink(path, rest, set(), {}, linkcount):
                 if linkcount[0] > symlinkmax:
                     raise pathresolver.SymlinkLoopError(str(self.path))
-                if path == _Watch.cwd:
+                if path == _Watch.curdir:
                     break
                 self.add_path_element(path, rest)
         except OSError as e:
             else:
                 raise
                 
-        assert path == _Watch.cwd or linkcount[0] > symlinkmax
+        assert path == _Watch.curdir or linkcount[0] > symlinkmax
         self.add_leaf(path)
         self.watch_complete = True
 
         self.complete_watch = True
 
     def add_path_element(self, path, rest):
+        assert rest != _Watch.curdir
         mask = IN_UNMOUNT | IN_ONLYDIR | IN_EXCL_UNLINK
-        assert rest != _Watch.cwd
         if rest.parts[0] == '..':
             mask |= IN_MOVE_SELF | IN_DELETE_SELF
             name = None
             name = rest.parts[0]
         self.links.append(_Link(len(self.links), self, mask, path, name, rest))
         
+    _eventmap = {IN_MOVE | IN_MOVE_SELF: IN_PATH_MOVED,
+                 IN_DELETE | IN_DELETE_SELF: IN_PATH_DELETE,
+                 IN_CREATE: IN_PATH_CREATE,
+                 IN_UNMOUNT: IN_PATH_UNMOUNT,
+                }
     def handle_event(self, event, link):
         if self.watch_complete and link.idx == len(self.links) - 1:
             assert event.mask & self.mask
             yield Event(event, str(self.path))
         else:
-            for p in self.links[link.idx:]:
+            i = link.idx
+            if event.mask & (IN_MOVE | IN_DELETE | IN_CREATE):
+                i += 1
+            for p in self.links[i:]:
                 p.remove()
-            del self.links[link.idx:]
+            del self.links[i:]
             self.watch_complete = False
-            yield Event(mediumevent(mask=IN_PATH_CHANGED, cookie=0, name=None, wd=event.wd), str(self.path))
+            self.watcher._reconnect_required(self)
+            name = str(link.path[link.rest[0:1]])
+            for m, t in _Watch._eventmap.items():
+                if event.mask & m:
+                    evttype = t
+            if event.mask & IN_ISDIR:
+                evttype |= IN_ISDIR
+            yield Event(syntheticevent(mask=evttype, cookie=0, name=name, wd=event.wd), str(self.path))
 
-    def update_mask(self, newmask):
+    def _queue_overflow(self):
+        for p in self.links[1:]:
+            p.remove()
+        self.watch_complete = False
+        self.watcher._reconnect_required(self)
+
+    def update(self, newmask=0, remember_curdir=None):
+        self._update_curdir(remember_curdir)
+        if not newmask:
+            return
         if newmask & IN_MASK_ADD:
             self.mask &= newmask
         else:
 
     def __str__(self):
         return '<_Watch for {}>'.format(str(self.path))
-             
-
-mediumevent = namedtuple('mediumevent', 'mask cookie name wd')
 
 
 class _Link (object):
+
+    __slots__ = ('idx',
+                 'watch',
+                 'mask',
+                 'path',
+                 'rest',
+                 'wd',
+                )
+
     def __init__(self, idx, watch, mask, path, name, rest):
         self.idx = idx
         self.watch = watch
 
 class _Descriptor (object):
 
+    __slots__ = ('watcher',
+                 'wd',
+                 'mask',
+                 'callbacks',
+                )
+
     def __init__(self, watcher, wd):
         self.watcher = watcher
         self.wd = wd
         if event.mask & IN_IGNORED:
             assert not self.callbacks
             self.watcher._removewatch(self)
-      
+        
     def __str__(self):
         names = ', '.join(c.__self__._fullname() for c in l for l in self.callbacks.values())
         return '<_Descriptor for wd {}: {}>'.format(self.wd, ', '.join(names))

File inotify/watcher.py

View file
  • Ignore whitespace
 
 from . import constants
 from . import _inotify as inotify
-from . import _inotify_props, _event_props, _watch_props
+from . import event_properties, watch_properties
 import array
 import errno
 import fcntl
 class Event(object):
     '''Derived inotify event class.
 
-    The following fields are available:
+    The following fields and properties are available:
 
-        mask: event mask, indicating what kind of event this is
+    mask: event mask, indicating what kind of event this is
 
-        cookie: rename cookie, if a rename-related event
+    cookie: rename cookie, if a rename-related event
 
-        fullpath: the full path of the file or directory to which the event
-        occured. If this watch has more than one path, a path is chosen
-        arbitrarily.
+    fullpath: the full path of the file or directory to which the event
+    occured. If this watch has more than one path, a path is chosen
+    arbitrarily.
 
-        paths: a list of paths that resolve to the watched file/directory
+    paths: a list of paths that resolve to the watched file/directory
 
-        name: name of the directory entry to which the event occurred
-        (may be None if the event happened to a watched directory)
+    name: name of the directory entry to which the event occurred
+    (may be None if the event happened to a watched directory)
 
-        wd: watch descriptor that triggered this event
+    wd: watch descriptor that triggered this event
 
     '''
 
 
     @property
     def paths(self):
-        return list(self.watch.paths)
+        if self.watch:
+            return list(self.watch.paths)
+        return []
 
     @property
     def fullpath(self):
-        p = self.paths[0]
-        if self.name:
-            p += '/' + self.name
-        return p
+        pts = self.paths
+        if pts:
+            p = pts[0]
+            if self.name:
+                p += '/' + self.name
+            return p
+        else:
+            return None
 
     def __init__(self, raw, watch):
         self.raw = raw
         return ('Event(paths={}, ' + r[r.find('(')+1:]).format(repr(self.paths))
 
 
-for name, doc in _event_props.items():
+for name, doc in event_properties.items():
     setattr(Event, name, property(_make_getter(name, doc), doc=doc))
 
 
         return '{}.Watch({}, {})'.format(__name__, self._watcher, self.wd)
 
 
-for name, doc in _watch_props.items():
+for name, doc in watch_properties.items():
     setattr(_Watch, name, property(_make_getter(name, doc), doc=doc))
 
 
 
-  
-
-
 class Watcher(object):
     '''Provide a Pythonic interface to the low-level inotify API.
 
 
         os.close(self.fd)
         self.fd = None
-        self._paths = None
-        self._watches = None
+        self._paths.clear()
+        self._watches.clear()
 
     def num_paths(self):
         '''Return the number of explicitly watched paths.'''
         return self._paths.keys()
 
     def get_watch(self, path):
-        'Return the watcher for a given path'
+        'Return the watch for a given path'
         return self._paths[path]
 
     def __del__(self):
         if self.fd is not None:
-            os.close(self.fd)
+            self.close()
 
     ignored_errors = [errno.ENOENT, errno.EPERM, errno.ENOTDIR]
 
         return self.readable() >= self.threshold
 
 
-class InotifyWatcherException (Exception):
-    pass
 
-class NoFilesException (InotifyWatcherException):
+class NoFilesException (Exception):
     '''This inotify instance does not watch anything.'''
     pass

File test/newtest.py

  • Ignore whitespace
-#!/usr/bin/env py.test
-
-# This testing script can be run either from python 3 or python 2. Run with
-# `py.test test.py` or `py.test-2.7 test.py`.
-#
-# This script will try to import the inotify module from the build directory in
-# ../build/lib.linux-{platform}-{pyversion}/inotify relative to its own
-# location. If that directory cannot be found it will import the inotify module
-# from the default path.
-
-
-# from __future__ import print_function
-
-import sys, os, errno, shutil, tempfile, itertools
-import pytest
-from pathlib import PosixPath as P
-
-if not sys.platform.startswith('linux'): raise Exception("This module will only work on Linux")
-
-# Find the package to test. We first try an inotify in the current directory,
-# then try to find one in the build directory of this package, and else we
-# import from the default path.
-un = os.uname()
-ver = '.'.join(str(x) for x in sys.version_info[:2])
-testdir = os.path.dirname(os.path.abspath(__file__))
-inotify_dir = os.path.normpath(testdir + '/../build/lib.{sys}-{plat}-{ver}/'.format(
-    sys=un[0].lower(), plat=un[4], ver=ver))
-idx = None
-if os.path.exists(inotify_dir+'/inotify') and not inotify_dir in sys.path:
-  # Insert at the beginning of sys.path, but not before the current directory
-  # as we do not want to override an explicit inotify package in the current
-  # directory.
-  try:
-    idx = next(i for i, p in enumerate(sys.path) if p and os.path.samefile(p, '.'))
-  except StopIteration:
-    # In interactive mode, there is no entry for the current directory, but the
-    # first entry of sys.path is the empty string which is interpreted as
-    # current directory. So if a path to the current directory is not found,
-    # insert after this first empty string.
-    idx = 0
-  sys.path.insert(idx + 1, inotify_dir)
-del un, ver, testdir, idx
-
-
-import inotify
-from inotify import newwatcher as watcher
-
-print("\nTesting inotify module from", inotify.__file__)
-
-
-# from IPython.terminal.ipapp import TerminalIPythonApp
-# from IPython.terminal.embed import embed as ipythonembed
-# ipapp = TerminalIPythonApp.instance()
-# ipapp.initialize(argv=[]) # argv=[] instructs IPython to ignore sys.argv
-
-
-@pytest.fixture(autouse=True)
-def preparedir(request):
-  # global tempdir
-  tempdir = tempfile.mkdtemp(prefix='inotify-test-tmpdir-')
-  request.addfinalizer(lambda tempdir=tempdir: shutil.rmtree(tempdir))
-  os.chdir(tempdir)
-  open('testfile', 'w').close()
-  os.mkdir('testdir')
-
-
-@pytest.fixture(scope='module')
-def symlinkmax():
-  symlinkmax = pathresolver.get_symlinkmax()
-  print('\ndetected system SYMLINKMAX:', symlinkmax)
-  return symlinkmax
-
-def makelinkchain(target, directory, numlinks):
-  for i in range(1, numlinks+1):
-    name = 'l'+str(i)
-    os.symlink(target, 'directory/'+name)
-    target = name
-
-
-@pytest.fixture
-def w():
-  return watcher.Watcher()
-
-
-def test_open(w):
-  mask = inotify.IN_OPEN | inotify.IN_CLOSE
-  w.add('testfile', mask)
-  watch = w._paths[P('testfile')]
-
-  assert len(watch.links) == 1
-  assert watch.path == P('testfile')
-  assert watch.watcher == w
-  st = os.stat('testfile')
-  # assert watch.inode == (st.st_dev, st.st_ino)
-  assert watch.mask == mask
-  link = watch.links[0]
-  assert link.idx == 0
-  assert link.path == P('testfile')
-  linkmask = mask | inotify.IN_MOVE_SELF | inotify.IN_DELETE_SELF
-  assert link.mask == linkmask
-  assert link.watch == watch
-  wd = link.wd
-  assert wd.callbacks[None] == [(linkmask, link.handle_event)]
-  assert wd.mask == linkmask
-  assert wd.watcher == w
-  watchdesc = wd.wd
-  assert w._watchdescriptors[watchdesc] == wd
-  assert w._paths[P('testfile')] == watch
-  
-  open('testfile').close()
-  ev1, ev2 = w.read(block=False)
-  assert ev1.open
-  assert ev2.close
-  assert ev2.close_nowrite
-  w.close()
-
-
-def test_linkchange(w):
-  os.symlink('testfile', 'link3')
-  os.symlink('link3', 'link2')
-  os.symlink('link2', 'link1')
-  w.add('link1', inotify.IN_OPEN)
-  watch = w._paths[P('link1')]
-  assert len(watch.links) == 4
-  w1, w2, w3, wt  = watch.links
-  assert [str(w.path[w.name]) for w in (w1, w2, w3)] == 'link1 link2 link3'.split()
-  assert (wt.path, wt.name) == (P('testfile'), None)
-  assert w1.wd == w2.wd == w3.wd
-  desc = w1.wd
-  linkmask = inotify.IN_MOVE | inotify.IN_DELETE | inotify.IN_CREATE | inotify.IN_ONLYDIR
-  assert desc.callbacks[P('link1')] == [(linkmask, w1.handle_event)]
-  assert desc.callbacks[P('link2')] == [(linkmask, w2.handle_event)]
-  assert desc.callbacks[P('link3')] == [(linkmask, w3.handle_event)]
-
-  os.rename('link2', 'link2new')
-  e = w.read()
-  assert len(e) == 1
-  e1 = e[0]
-  assert e1.link_changed
-  assert len(w._watchdescriptors) == 1
-  assert len(watch.links) == 1
-  assert len(list(itertools.chain(*watch.links[0].wd.callbacks.values()))) == 1
-
-  os.rename('link1', 'link1new')
-  e = w.read()
-  assert len(e) == 1
-  e1 = e[0]
-  assert e1.link_changed
-  assert len(w._watchdescriptors) == 0
-  assert len(watch.links) == 0
-
-  # ipythonembed()
-
-# def test_move(w):
-#   w.add('.', inotify.IN_MOVE)
-#   assert w.read(0) == []
-#   os.rename('testfile', 'targetfile')
-#   ev = w.read(0)
-#   for e in ev:
-#     if e.name == 'testfile':
-#       assert e.moved_from
-#     if e.name == 'targetfile':
-#       assert e.moved_to
-#   assert ev[0].cookie and ev[0].cookie == ev[1].cookie
-
-
-# def test_alias(w):
-#   '''The inotify system maps watch requests to aliases (e.g. symlinks) to the
-#   same watch descriptor, so we need to be sure that a watch is only really
-#   removed if all paths it is watching are dismissed.'''
-
-#   os.symlink('testfile', 'testlink')
-#   w1 = w.add('testfile', inotify.IN_OPEN)
-#   w2 = w.add('testlink', inotify.IN_OPEN)
-#   assert w1 == w2
-#   assert set(w.paths()) == {'testfile', 'testlink'}
-#   assert w.get_watch('testfile') == w.get_watch('testlink')
-#   assert len(w.watches()) == 1
-#   open('testlink').close()
-#   ev = w.read(0)
-#   assert len(ev) == 1
-#   w.remove_path('testfile')
-#   open('testlink').close()
-#   ev = w.read(0)
-#   assert len(ev) == 1
-
-
-# def test_delete(w):
-#     w.add('testfile', inotify.IN_DELETE_SELF)
-#     os.remove('testfile')
-#     ev1, ev2 = w.read(0)
-#     assert ev1.delete_self
-#     assert ev2.ignored
-#     assert w.num_watches() == 0
-
-# def test_wrongpath(w):
-#     with pytest.raises(OSError) as excinfo:
-#         w.add('nonexistant', inotify.IN_OPEN)
-#     assert excinfo.value.errno == os.errno.ENOENT
-#     with pytest.raises(OSError) as excinfo:
-#         w.add_all('nonexistant', inotify.IN_OPEN)
-#     assert excinfo.value.errno == os.errno.ENOENT

File test/testpath.py

View file
  • Ignore whitespace
+#!/usr/bin/env py.test
+
+# This testing script can be run either from python 3 or python 2. Run with
+# `py.test test.py` or `py.test-2.7 test.py`.
+#
+# This script will try to import the inotify module from the build directory in
+# ../build/lib.linux-{platform}-{pyversion}/inotify relative to its own
+# location. If that directory cannot be found it will import the inotify module
+# from the default path.
+
+
+# from __future__ import print_function
+
+import sys, os, errno, shutil, tempfile, itertools
+import pytest
+from pathlib import PosixPath as P
+
+if not sys.platform.startswith('linux'): raise Exception("This module will only work on Linux")
+
+# Find the package to test. We first try an inotify in the current directory,
+# then try to find one in the build directory of this package, and else we
+# import from the default path.
+un = os.uname()
+ver = '.'.join(str(x) for x in sys.version_info[:2])
+testdir = os.path.dirname(os.path.abspath(__file__))
+inotify_dir = os.path.normpath(testdir + '/../build/lib.{sys}-{plat}-{ver}/'.format(
+    sys=un[0].lower(), plat=un[4], ver=ver))
+idx = None
+if os.path.exists(inotify_dir+'/inotify') and not inotify_dir in sys.path:
+  # Insert at the beginning of sys.path, but not before the current directory
+  # as we do not want to override an explicit inotify package in the current
+  # directory.
+  try:
+    idx = next(i for i, p in enumerate(sys.path) if p and os.path.samefile(p, '.'))
+  except StopIteration:
+    # In interactive mode, there is no entry for the current directory, but the
+    # first entry of sys.path is the empty string which is interpreted as
+    # current directory. So if a path to the current directory is not found,
+    # insert after this first empty string.
+    idx = 0
+  sys.path.insert(idx + 1, inotify_dir)
+del un, ver, testdir, idx
+
+
+import inotify
+from inotify import newwatcher as watcher
+
+print("\nTesting inotify module from", inotify.__file__)
+
+
+# from IPython.terminal.ipapp import TerminalIPythonApp
+# from IPython.terminal.embed import embed as ipythonembed
+# ipapp = TerminalIPythonApp.instance()
+# ipapp.initialize(argv=[]) # argv=[] instructs IPython to ignore sys.argv
+
+
+@pytest.fixture(autouse=True)
+def preparedir(request):
+  # global tempdir
+  tempdir = tempfile.mkdtemp(prefix='inotify-test-tmpdir-')
+  request.addfinalizer(lambda tempdir=tempdir: shutil.rmtree(tempdir))
+  os.chdir(tempdir)
+  open('testfile', 'w').close()
+  os.mkdir('testdir')
+
+
+@pytest.fixture(scope='module')
+def symlinkmax():
+  symlinkmax = pathresolver.get_symlinkmax()
+  print('\ndetected system SYMLINKMAX:', symlinkmax)
+  return symlinkmax
+
+def makelinkchain(target, directory, numlinks):
+  for i in range(1, numlinks+1):
+    name = 'l'+str(i)
+    os.symlink(target, 'directory/'+name)
+    target = name
+
+
+@pytest.fixture
+def w():
+  return watcher.Watcher()
+
+
+def test_open(w):
+  mask = inotify.IN_OPEN | inotify.IN_CLOSE
+  w.add('testfile', mask)
+  watch = w._paths[P('testfile')]
+
+  assert len(watch.links) == 1
+  assert watch.path == P('testfile')
+  assert watch.watcher == w
+  st = os.stat('testfile')
+  # assert watch.inode == (st.st_dev, st.st_ino)
+  assert watch.mask == mask
+  link = watch.links[0]
+  assert link.idx == 0
+  assert link.path == P('testfile')
+  linkmask = mask | inotify.IN_MOVE_SELF | inotify.IN_DELETE_SELF
+  assert link.mask == linkmask
+  assert link.watch == watch
+  wd = link.wd
+  assert wd.callbacks[None] == [(linkmask, link.handle_event)]
+  assert wd.mask == linkmask
+  assert wd.watcher == w
+  watchdesc = wd.wd
+  assert w._watchdescriptors[watchdesc] == wd
+  assert w._paths[P('testfile')] == watch
+  
+  open('testfile').close()
+  ev1, ev2 = w.read(block=False)
+  assert ev1.open
+  assert ev2.close
+  assert ev2.close_nowrite
+  w.close()
+
+
+def test_linkchange(w):
+  os.symlink('testfile', 'link3')
+  os.symlink('link3', 'link2')
+  os.symlink('link2', 'link1')
+  w.add('link1', inotify.IN_OPEN)
+  watch = w._paths[P('link1')]
+  assert len(watch.links) == 4
+  w1, w2, w3, wt  = watch.links
+  assert [str(w.path[w.name]) for w in (w1, w2, w3)] == 'link1 link2 link3'.split()
+  assert (wt.path, wt.name) == (P('testfile'), None)
+  assert w1.wd == w2.wd == w3.wd
+  desc = w1.wd
+  linkmask = inotify.IN_MOVE | inotify.IN_DELETE | inotify.IN_CREATE | inotify.IN_ONLYDIR
+  assert desc.callbacks[P('link1')] == [(linkmask, w1.handle_event)]
+  assert desc.callbacks[P('link2')] == [(linkmask, w2.handle_event)]
+  assert desc.callbacks[P('link3')] == [(linkmask, w3.handle_event)]
+
+  os.rename('link2', 'link2new')
+  e = w.read()
+  assert len(e) == 1
+  e1 = e[0]
+  assert e1.link_changed
+  assert len(w._watchdescriptors) == 1
+  assert len(watch.links) == 1
+  assert len(list(itertools.chain(*watch.links[0].wd.callbacks.values()))) == 1
+
+  os.rename('link1', 'link1new')
+  e = w.read()
+  assert len(e) == 1
+  e1 = e[0]
+  assert e1.link_changed
+  assert len(w._watchdescriptors) == 0
+  assert len(watch.links) == 0
+
+  # ipythonembed()
+
+# def test_move(w):
+#   w.add('.', inotify.IN_MOVE)
+#   assert w.read(0) == []
+#   os.rename('testfile', 'targetfile')
+#   ev = w.read(0)
+#   for e in ev:
+#     if e.name == 'testfile':
+#       assert e.moved_from
+#     if e.name == 'targetfile':
+#       assert e.moved_to
+#   assert ev[0].cookie and ev[0].cookie == ev[1].cookie
+
+
+# def test_alias(w):
+#   '''The inotify system maps watch requests to aliases (e.g. symlinks) to the
+#   same watch descriptor, so we need to be sure that a watch is only really
+#   removed if all paths it is watching are dismissed.'''
+
+#   os.symlink('testfile', 'testlink')
+#   w1 = w.add('testfile', inotify.IN_OPEN)
+#   w2 = w.add('testlink', inotify.IN_OPEN)
+#   assert w1 == w2
+#   assert set(w.paths()) == {'testfile', 'testlink'}
+#   assert w.get_watch('testfile') == w.get_watch('testlink')
+#   assert len(w.watches()) == 1
+#   open('testlink').close()
+#   ev = w.read(0)
+#   assert len(ev) == 1
+#   w.remove_path('testfile')
+#   open('testlink').close()
+#   ev = w.read(0)
+#   assert len(ev) == 1
+
+
+# def test_delete(w):
+#     w.add('testfile', inotify.IN_DELETE_SELF)
+#     os.remove('testfile')
+#     ev1, ev2 = w.read(0)
+#     assert ev1.delete_self
+#     assert ev2.ignored
+#     assert w.num_watches() == 0
+
+# def test_wrongpath(w):
+#     with pytest.raises(OSError) as excinfo:
+#         w.add('nonexistant', inotify.IN_OPEN)
+#     assert excinfo.value.errno == os.errno.ENOENT
+#     with pytest.raises(OSError) as excinfo:
+#         w.add_all('nonexistant', inotify.IN_OPEN)
+#     assert excinfo.value.errno == os.errno.ENOENT