Christopher Nilsson avatar Christopher Nilsson committed d257d7b

Actually check how big the FILE_NOTIFY_INFORMATION result is.

Comments (0)

Files changed (2)

dirwatch/nt_dirwatch.py

         self.dir_handles = {}
         self._listener_threads = []
         self._state_lock = threading.RLock()
+        self._recover_lock = threading.RLock()
+        
         self._iocp = None
         
         # Add some persistent NTEventLog logging for warnings & errors.
                 return file_handle
         return None    
     
+    def _recover_file_notify_error(self, path):
+        self.log.debug("_recover_file_notify_error: Trying to recover from FILE_NOTIFY_INFORMATION error with new snapshot")
+        with self._recover_lock:
+            self.log.debug("_recover_file_notify_error: Got recover lock")
+            new_snapshot = self.get_dir_snapshot(path)
+            old_snapshot = self.snapshot.get(path, set([]))
+            self.generate_dir_snapshot_diff_events(old_snapshot, new_snapshot)
+            self.snapshot[path] = new_snapshot
+            self.log.debug("_recover_file_notify_error: recovered.")
+    
     def _listener_thread(self):
         
         while self._iocp:
                 dirinfo = self.dir_handles.get(key, None)
             if not dirinfo:
                 continue # back to sleep...
-            
-            path = dirinfo[self.DIRINFO_PATH_IDX]
-            self.log.debug("_listener_thread: event received for '%s'", path)
-            
-            if num_of_bytes:
-                info = win32file.FILE_NOTIFY_INFORMATION(dirinfo[self.DIRINFO_BUF_IDX], num_of_bytes)
-                self.log.debug("_listener_thread: notify info: %s" % str(info))
+            try:
+                path = dirinfo[self.DIRINFO_PATH_IDX]
+                self.log.debug("_listener_thread: event received for '%s'", path)
                 
-                # Need to translate the windows-specific action codes to dirwatch events.               
-                event_id = None 
-                for action, filename in info:
-                    fullpath = os.path.join(path, filename)
+                if num_of_bytes:
+                    if num_of_bytes > self.READ_BUFFER_SIZE:
+                        self.log.debug("_listener_thread: Received %d bytes - falling back to snapshots", num_of_bytes)
+                        self._recover_file_notify_error(path)
+                        continue
                     
-                    # FIXME: file rename/move ops aren't too easy with the given OS events.
-                    #   We *could* check hashes on similarly timed delete-create event pairs, 
-                    #   and see that the new file is the same as the old (hence, a move), but
-                    #   I think speed will suffer greatly, for not much benefit.
+                    try:
+                        info = win32file.FILE_NOTIFY_INFORMATION(dirinfo[self.DIRINFO_BUF_IDX], num_of_bytes)
+                        self.log.debug("_listener_thread: notify info: %s" % str(info))
+                    except RuntimeError, err:
+                        self.log.error("_listener_thread: failed to parse incoming FILE_NOTIFY_INFORMATION: %s", err)
+                        self._recover_file_notify_error(path)              
+                        info = None
+                        
+                    if not info:
+                        continue
                     
-                    if action == FILE_NOTIFY_CHANGE_FILE_NAME:
-                        if os.path.exists(fullpath):
-                            event_id = eventtype.EV_FILE_CREATED
-                        else:
-                            event_id = eventtype.EV_FILE_DELETED
-                            
-                    elif action == FILE_NOTIFY_CHANGE_LAST_WRITE:
-                        event_id = eventtype.EV_FILE_DATA_CHANGED
-                    
-                    if event_id:              
-                        # send to our central queue....
-                        # Sending the time, the fully qualified path, and the event type.
-                        event = (time.time(), fullpath, event_id)      
-                        self.monitor_events.put(event)
+                    # Need to translate the windows-specific action codes to dirwatch events.               
+                    event_id = None 
+                    for action, filename in info:
+                        fullpath = os.path.join(path, filename)
                         
-                        self.log.debug("_listener_thread: enqueuing(%s)", str(event))
-                        self._send_dir_watch_request(dirinfo[self.DIRINFO_HANDLE_IDX])
+                        # FIXME: file rename/move ops aren't too easy with the given OS events.
+                        #   We *could* check hashes on similarly timed delete-create event pairs, 
+                        #   and see that the new file is the same as the old (hence, a move), but
+                        #   I think speed will suffer greatly, for not much benefit.
+                        
+                        if action == FILE_NOTIFY_CHANGE_FILE_NAME:
+                            if os.path.exists(fullpath):
+                                event_id = eventtype.EV_FILE_CREATED
+                            else:
+                                event_id = eventtype.EV_FILE_DELETED
+                                
+                        elif action == FILE_NOTIFY_CHANGE_LAST_WRITE:
+                            event_id = eventtype.EV_FILE_DATA_CHANGED
+                        
+                        if event_id:              
+                            # send to our central queue....
+                            # Sending the time, the fully qualified path, and the event type.
+                            event = (time.time(), fullpath, event_id)  
+                            self.log.debug("_listener_thread: enqueuing(%s)", str(event))    
+                            self.monitor_events.put(event)
+            finally:                    
+                self._send_dir_watch_request(dirinfo[self.DIRINFO_HANDLE_IDX])
                     
     
     def stop_monitor(self):

dirwatch/test/test_dirwatch.py

             if path not in self.path_events:
                 self.path_events[path] = []
             self.path_events[path].append(event) 
-            #print "%s [%s] %s" % (event.timestamp, event.event_type, event.path)           
+            print "%s [%s] %s" % (time.ctime(event.timestamp), event.event_type, event.path)           
         
     def test_on_create(self):
         """ Check we see some events on creating a test file. """                
         self.dw.add_watch(self.testdir)
         self.dw.start_monitor()
         try:
-            num_of_paths = 10
+            num_of_paths = 1000            
             for idx in range(num_of_paths):
                 with open(self.testfilename + "_" + str(idx), "w+") as testfile:
                     testfile.write("test")
                     testfile.close()
                     self.cleanup.append(self.testfilename + "_" + str(idx))
-            time.sleep(0.04)
+                time.sleep(0.02)
         finally:
             self.dw.stop_monitor()
         self.assertEquals(num_of_paths, len(self.path_events.keys()))
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.