- # Add some persistent NTEventLog logging for warnings & errors.
- # nteventlog_handler = logging.handlers.NTEventLogHandler("dirwatch")
- # nteventlog_handler.setFormatter(logging.Formatter(LOG_STANDARD_FMT))
- # nteventlog_handler.setLevel(logging.WARNING)
- # self.log.addHandler(nteventlog_handler)
- #except Exception, err:
- # self.log.error("Failed to add NTEventLog handler: %s", err)
+ def start_monitor(self):
+ """ Begin watching the directories, given in self.paths, for events.
- def start_monitor(self):
+ Subsequent calls, if the montior is already active, will be ignored, and
+ True, if the monitor starts successfully.
+ def stop_monitor(self):
+ """ Stops event monitoring.
+ Waits for worker threads to shutdown, and flushes event queue.
+ # Shutdown the thread pool.
+ self.log.debug("stop_monitor: Shutting down listeners")
+ for n in range(len(self._listener_threads)):
+ for thread in self._listener_threads:
+ # flush any stragglers...
+ # We don't need to process -- we're quitting.
+ # Just don't want resource leaks at this point.
+ while err == ERROR_SUCCESS:
+ err, numOfBytes, key, overlapped = win32file.GetQueuedCompletionStatus(self._iocp, 0)
+ for dirinfo in self.dir_handles.itervalues():
+ pass # already closed, so ignore.
+ super(DirWatcherNT, self).stop_monitor()
def _open_dir(self, path):
+ """ Opens a handle to the given directory.
+ Opens with shared read/write/delete, and access flags allowing use by
if os.path.exists(path) and os.path.isdir(path):
file_handle = win32file.CreateFile(path, # pointer to the file name
FILE_LIST_DIRECTORY, # access (read/write) mode
def _recover_file_notify_error(self, path):
+ """ Rebuilds internal view of watched directories, in the event of ReadDirectoryChangesW() failure.
+ This normally happens when too many events have occurred before ReadDirectoryChangesW
+ manages to notify its listener. ReadDirectoryChangesW ends up causing a buffer overflow, and
+ we may miss many events as these, and the events that didn't fit into the result buffer, are lost.
+ Short of reading the NTFS change journal, or building our own file system filter driver,
+ our main option of recovery is to take another snapshot of the directory contents as they are now.
+ Our base class' :generate_dir_snapshot_diff_events: will take care of sending "fake"
+ events to simulate the creation of new files, that we hadn't seen previously, and the
+ deletion of files that we see are now missing.
self.log.debug("_recover_file_notify_error: Trying to recover from FILE_NOTIFY_INFORMATION error with new snapshot")
self.log.debug("_recover_file_notify_error: Got recover lock")
self.snapshot[path] = new_snapshot
+ """ Worker thread main routine.
+ Waits on IOCP for incoming events. When the event(s) arrive, these
+ are parsed, and dirwatch event notifications are sent to subscribed
err, num_of_bytes, key, overlapped = win32file.GetQueuedCompletionStatus(self._iocp, -1)
self.log.debug("_listener_thread: enqueuing(%s)", str(event))
- def stop_monitor(self):
- # Shutdown the thread pool.
- self.log.debug("stop_monitor: Shutting down listeners")
- for n in range(len(self._listener_threads)):
- for thread in self._listener_threads:
- # flush any stragglers...
- # We don't need to process -- we're quitting.
- # Just don't want resource leaks at this point.
- while err == ERROR_SUCCESS:
- err, numOfBytes, key, overlapped = win32file.GetQueuedCompletionStatus(self._iocp, 0)
- for dirinfo in self.dir_handles.itervalues():
- pass # already closed, so ignore.
- super(DirWatcherNT, self).stop_monitor()
+ """ Spawns worker threads to listen on our IOCP for incoming events.
+ Requires: Valid IOCP already exists. Otherwise, a RuntimeError will be raised.
listener_count = get_cpu_count() + 1 # Keep one ready when a busy thread sleeps...
for lidx in range(listener_count):
raise RuntimeError, "IOCP not ready"
def _assoc_with_iocp(self, handle, path):
+ """ Start tracking this (handle,path) pair against incoming events from our IOCP.
+ Associates the handle with the IO Completion Port, so future events against this handle
+ will also be sent to our IOCP.
+ Also adds an entry to our handle dictionary, so when an event arrives from the IOCP,
+ we can find the associated state (handle, path, result buffer...).
assert existing is self._iocp, "Associating a new handle with existing IOCP should not change the IOCP."
- def _send_dir_watch_request(self, hdir):
+ def _send_dir_watch_request(self, hdir):
+ """ Submits the given directory handle to the OS event notification system.
+ Non-blocking. Sends the given handle to ``ReadDirectoryChangesW()``.
+ Events will be broadcast to us via our IOCP.
+ Note that this is a 'one-shot' notification. To continue monitoring this
+ directory, the handle will need to be resubmitted on receipt of the events.
win32file.ReadDirectoryChangesW(hdir, buffer, False, flags, overlapped)
+ """ Sends exit notifications to the IOCP.
+ One for each listening worker thread. Once the workers see these, they should
if self._iocp and len(self._listener_threads):
win32file.PostQueuedCompletionStatus(self._iocp, 0, self.EXIT_NOTIFICATION, None)
+ """ Public factory for DirWatcherNT instances.
+ Imported in the dirwatch module."""