Commits

James Lan committed 5f964a4

Fix running exceptions, however it still does not work - will hang

Comments (0)

Files changed (1)

 _LPCSTR = ctypes.wintypes.LPCSTR
 _LPVOID = ctypes.wintypes.LPVOID
 _LPDWORD = _POINTER(ctypes.wintypes.DWORD)
+_LPHANDLE = _POINTER(_HANDLE)
 
 class _OVERLAPPED_INNERS(ctypes.Structure):
     _fields_ = [
 _ERROR_FILE_NOT_FOUND = 2
 _ERROR_PIPE_CONNECTED = 535
 _ERROR_PIPE_BUSY = 231
+_ERROR_IO_PENDING = 997
 _GENERIC_READ = 0x80000000
 _GENERIC_WRITE = 0x40000000
 
     """Use windows named pipe to simulate unix domain socket"""
 
     def accept(self):
-        pipe = self._conn.get()
+        def run():
+            pass
+        pipe, event = self._conn.get()
         if not pipe:
             raise socket.error(socket.EBADF, 'Bad file descriptor')
         sock = _pipesocket()
 
     def bind(self, address):
         self._addr = address
-        self.calc_pipename(address)
+        self._calc_pipename(address)
 
-    def calc_pipename(self, address):
-        self._path = '\\\\.\\pipe\\' + os.path.abspath(address)
+    def _calc_pipename(self, address):
+        self._path = '\\\\.\\pipe\\' + os.path.abspath(address).replace(':', '')
 
     def close(self):
         if hasattr(self, '_handle'):
             _kernel32.CloseHandle(self._handle)
         elif hasattr(self, '_conn'):
-            timeout = 0.01
-            for retry in xrange(10):
-                self.closing = True
-                _kernel32.DeleteFileA(self._path)
-                time.sleep(timeout)
-                if self.closed:
-                    break
-                timeout *= 2
-            else:
-                raise socket.error("failed to close socket " + self._path)
-            self.listener.join()
+            _kernel32.SetEvent(self._close_event)
+            self._bgworker.join()
 
     def connect(self, address):
         err = self.connect_ex(address)
 
     def connect_ex(self, address):
         self._peer = address
-        self.calc_pipename(address)
+        self._calc_pipename(address)
         timeout = 0.01
         for retry in xrange(10):
             self._handle = _kernel32.CreateFileA(
         if pipe == _INVALID_HANDLE_VALUE:
             raise socket.error(str(_kernel32.GetLastError()))
 
-        event = _kernel32.CreateEventA(None, _TRUE, _FALSE, None)
-        self.ovlp = _OVERLAPPED()
-        self.ovlp.Offset = 0
-        self.ovlp.OffsetHigh = 0
-        self.ovlp.Pointer = None
-        self.ovlp.hEvent = event
+        self._ovlp.hEvent = _kernel32.CreateEventA(None, _TRUE, _FALSE, None)
 
-        conn = _kernel32.ConnectNamedPipe(pipe, _byref(self.ovlp))
+        conn = _kernel32.ConnectNamedPipe(pipe, _byref(self._ovlp))
         if not conn:
             err = _kernel32.GetLastError()
-            if err != _ERROR_PIPE_CONNECTED:
+            if err != _ERROR_PIPE_CONNECTED and err != _ERROR_IO_PENDING:
                 raise socket.error(str(err))
 
         return pipe
 
     def listen(self, backlog):
         def run():
+            events = (_HANDLE * 2)()
+            events[0] = self._close_event
             while True:
-                _kernel32.WaitForMultipleObjects(2, self.events,
+                events[1] = self._ovlp.hEvent
+                ret = _kernel32.WaitForMultipleObjects(2, events,
                     _FALSE, _INFINITE)
-                if self.closing:
-                    self.closed = True
-                    _kernel32.CloseHandle(self.event)
-                    _kernel32.CloseHandle(pipe)
-                    self._conn.put(None)
+                if ret == 0: # first event, quit
+                    _kernel32.CloseHandle(self._ovlp.hEvent)
+                    _kernel32.CloseHandle(self._pipe)
+                    _kernel32.CloseHandle(self._close_event)
                     break
-                else:
-                    self._conn.put(pipe)
-                    _kernel32.SetEvent(self.event)
+                elif ret == 1: # second event, connected
+                    self._conn.put((self._pipe, self._ovlp.hEvent))
+                    
+                self._pipe = self._createserver()
             #print "listening thread finished"
-        self.pipe = self._createserver()
-        closing = _kernel32.CreateEventA(None, _FALSE, _FALSE, None)
-        self.events = [closing, self.]
 
-        self.closed = False
-        self.event = _kernel32.CreateEventA(None, _FALSE, _FALSE, None)
+        self._close_event = _kernel32.CreateEventA(None, _FALSE, _FALSE, None)
+        
+        self._ovlp = _OVERLAPPED()
+        self._ovlp.Offset = 0
+        self._ovlp.OffsetHigh = 0
+        self._ovlp.Pointer = None
+        
+        self._pipe = self._createserver()
+
         self._conn = Queue.Queue(backlog)
-        self.listener = Thread(target = run)
-        self.listener.start()
+        self._bgworker = Thread(target = run)
+        self._bgworker.start()
 
     def recv(self, bufsize, flags=0):
         pass