Commits

James Lan committed aa2a7df

Add background io thread; revise close, connect and listen methods

  • Participants
  • Parent commits de74e84
  • Branches dev

Comments (0)

Files changed (2)

pypesocketTest.py

 import re
 import socket
 import cStringIO
-from threading import Thread
+from threading import Thread, current_thread
 import unittest
 import errno
 import time
 def server(response=None):
     def decorator(func):
         def wrapper(self):
+            print "start test @%s" % current_thread().ident
             self.server.start(response)
             try:
                 func(self)
             finally:
                 self.server.stop()
+            print "finish test"
             #print "main thread finished"
         return wrapper
     return decorator
     def start(self, response=None):
         def run():
             try:
+                print "start server @%s" % current_thread().ident
                 while True:
                     sock, addr = self.socket.accept()
-                    if self.closing:
-                        break
-                    if response:
+                    if not self.closing and response:
                         sock.sendall(response(recv_all(sock)))
                         sock.shutdown(socket.SHUT_WR)
                     sock.close()
+                    if self.closing:
+                        break
             except socket.error, err:
                 if err.args[0] != socket.EBADF:
                     raise
         #print "server thread started"
 
     def stop(self):
+        print "stoppin server"
         self.closing = True
         self.socket.close()
         try:
             # interrupt socket.accept if it has been already invoked
             sock = socket.socket(socket.AF_UNIX)
+            print "last connect"
             sock.connect(_PipePath)
             sock.close()
         except socket.error:
     @server()
     def test_connect(self):
         for i in range(10):
+            print "retry %s @%s" % (i, current_thread().ident)
             sock = socket.socket(socket.AF_UNIX)
             sock.connect(_PipePath)
             sock.close()
+            print
 
     @server()
     def test_connect_ex(self):
         err = sock.connect_ex(_PipePath)
         self.assertEqual(errno.ENOENT, err)
 
-    @server(echo)
-    def test_echo(self):
-        sock = socket.socket(socket.AF_UNIX)
-        sock.connect(_PipePath)
-        sock.sendall('hello')
-        sock.shutdown(socket.SHUT_WR)
-        self.assertEqual('hello', recv_all(sock))
-        sock.close()
+    #@server(echo)
+    #def test_echo(self):
+    #    sock = socket.socket(socket.AF_UNIX)
+    #    sock.connect(_PipePath)
+    #    sock.sendall('hello')
+    #    sock.shutdown(socket.SHUT_WR)
+    #    self.assertEqual('hello', recv_all(sock))
+    #    sock.close()
 
 
 class GetPeerNameTest(PipesocketTest):
     def test_getsockname_on_broken(self):
         sock = socket.socket(socket.AF_UNIX)
         sock.connect(_PipePath)
-        time.sleep(0.1)
+        time.sleep(0.1) # wait server to close
         self.assertEqual('', sock.getsockname())
         sock.close()
 
 import ctypes
 import ctypes.wintypes
 import Queue
-from threading import Thread
+from threading import Thread, current_thread
 import errno
 
 _kernel32 = ctypes.windll.kernel32
 _kernel32.SetNamedPipeHandleState.restype = _BOOL
 _kernel32.CreateEventA.argtypes = [_LPVOID, _BOOL, _BOOL, _LPCSTR]
 _kernel32.CreateEventA.restype = _HANDLE
+_kernel32.SetEvent.argtypes = [_HANDLE]
+_kernel32.SetEvent.restype = _BOOL
 _kernel32.ResetEvent.argtypes = [_HANDLE]
 _kernel32.ResetEvent.restype = _BOOL
+_kernel32.WaitForSingleObject.argtypes = [_HANDLE, _DWORD]
+_kernel32.WaitForSingleObject.restype = _DWORD
 _kernel32.WaitForMultipleObjects.argtypes = [_DWORD, _LPHANDLE, _BOOL, _DWORD]
 _kernel32.WaitForMultipleObjects.restype = _DWORD
 
 class _pipesocket(object):
     """Use windows named pipe to simulate unix domain socket"""
 
+    def _startio(self, pipe, event):
+        def bg_io():
+            print "start io thread @%s" % current_thread().ident
+            events = (_HANDLE * 2)()
+            events[0] = self._close_event
+            while True:
+                events[1] = self._ovlp.hEvent
+                ret = _kernel32.WaitForMultipleObjects(2, events,
+                    _FALSE, _INFINITE)
+                if ret == 0: # self._close_event, cleanup and quit
+                    print "quit from @%s" % current_thread().ident
+                    break
+                elif ret == 1: # second event, overlapped io 
+                    pass
+        
+        self._pipe = pipe
+        self._init_ovlp()
+        self._ovlp.hEvent = event
+        self._close_event = _kernel32.CreateEventA(None, _TRUE, _FALSE, None)
+        print "creating cevent %s @%s in startio" % (self._close_event, current_thread().ident)
+        self._bgworker = Thread(target=bg_io)
+        self._bgworker.start()
+
     def accept(self):
-        def run():
-            pass
         pipe, event = self._conn.get()
+        print "accept %s, %s @%s" % (pipe, event, current_thread().ident)
         if not pipe:
             raise socket.error(socket.EBADF, 'Bad file descriptor')
         sock = _pipesocket()
-        sock._handle = pipe
+        sock._startio(pipe, event)
         return sock, ''
 
     def bind(self, address):
         self._path = '\\\\.\\pipe\\' + os.path.abspath(address).replace(':', '')
 
     def close(self):
-        if hasattr(self, '_handle'):
-            _kernel32.CloseHandle(self._handle)
-        elif hasattr(self, '_conn'):
+        if hasattr(self, '_bgworker'):
             _kernel32.SetEvent(self._close_event)
             self._bgworker.join()
+            print "closing pipe %s @%s" % (self._pipe, current_thread().ident)
+            _kernel32.CloseHandle(self._pipe)
+            print "closing cevent %s @%s" % (self._close_event, current_thread().ident)
+            _kernel32.CloseHandle(self._close_event)
+            print "closing event %s @%s" % (self._ovlp.hEvent, current_thread().ident)
+            _kernel32.CloseHandle(self._ovlp.hEvent)
+        else:
+            self._closing = True
 
     def connect(self, address):
         err = self.connect_ex(address)
         self._calc_pipename(address)
         timeout = 0.01
         for retry in xrange(10):
-            self._handle = _kernel32.CreateFileA(
+            handle = _kernel32.CreateFileA(
                 self._path,
                 _GENERIC_READ | _GENERIC_WRITE,
-                0, None,
+                0,
+                None,
                 _OPEN_EXISTING,
                 0,
                 None)
-            if self._handle != _INVALID_HANDLE_VALUE:
+            print "connecting pipe %s @%s" % (handle, current_thread().ident)
+            if handle != _INVALID_HANDLE_VALUE:
                 # connected, set message mode
                 mode = _DWORD(_PIPE_READMODE_MESSAGE)
                 if not _kernel32.SetNamedPipeHandleState(
-                    self._handle,
-                    _byref(mode),
-                    None,
-                    None):
-                    return str(_kernel32.GetLastError()) # TODO: should be meaningful text
-                return 0
+                        handle,
+                        _byref(mode),
+                        None,
+                        None):
+                    err = _kernel32.GetLastError()
+                    _kernel32.CloseHandle(handle)
+                    return str(err) # TODO: should be meaningful text
+                break
             # failed to connect, see if server isn't ready
             err = _kernel32.GetLastError()
+            print "err %s" % err
             if err == _ERROR_FILE_NOT_FOUND:
-                # no instance of this pipe exists, error
+                # no instance of this pipe exists, server isn't started yet
                 return errno.ENOENT
             if err != _ERROR_PIPE_BUSY:
                 return str(err) # todo: should be meaningful text
             timeout *= 2
         else:
             return str(_ERROR_PIPE_BUSY)
+        
+        event = _kernel32.CreateEventA(None, _TRUE, _FALSE, None)
+        print "creating event %s @%s in connect" % (event, current_thread().ident)
+        self._startio(handle, event)
+        return 0
 
     def fileno(self):
-        return self.event
+        return self._event
 
     def getpeername(self):
         if not hasattr(self, '_handle'):
             512,
             _NMPWAIT_USE_DEFAULT_WAIT,
             None)
+        print "creating pipe %s @%s in createserver" % (pipe, current_thread().ident)
         if pipe == _INVALID_HANDLE_VALUE:
             raise socket.error(str(_kernel32.GetLastError()))
 
         self._ovlp.hEvent = _kernel32.CreateEventA(None, _TRUE, _FALSE, None)
+        print "creating event %s @%s in createserver" % (self._ovlp.hEvent, current_thread().ident)
 
-        conn = _kernel32.ConnectNamedPipe(pipe, _byref(self._ovlp))
-        if not conn:
-            err = _kernel32.GetLastError()
-            if err != _ERROR_PIPE_CONNECTED and err != _ERROR_IO_PENDING:
-                raise socket.error(str(err))
+        if _kernel32.ConnectNamedPipe(pipe, _byref(self._ovlp)):
+            # ConnectNamedPipe should return 0, otherwise error
+            raise socket.error(str(_kernel32.GetLastError()))
+        
+        err = _kernel32.GetLastError()
+        if err == _ERROR_PIPE_CONNECTED:
+            _kernel32.SetEvent(self._ovlp.hEvent)
+        elif err != _ERROR_IO_PENDING:
+            # neither connected nor pending, error
+            raise socket.error(str(err))
 
         return pipe
 
-    def listen(self, backlog):
-        def run():
-            events = (_HANDLE * 2)()
-            events[0] = self._close_event
-            while True:
-                events[1] = self._ovlp.hEvent
-                ret = _kernel32.WaitForMultipleObjects(2, events,
-                    _FALSE, _INFINITE)
-                if ret == 0: # first event, quit
-                    _kernel32.CloseHandle(self._ovlp.hEvent)
-                    _kernel32.CloseHandle(self._pipe)
-                    _kernel32.CloseHandle(self._close_event)
-                    break
-                elif ret == 1: # second event, connected
-                    _kernel32.ResetEvent(self._ovlp.hEvent)
-                    self._conn.put((self._pipe, self._ovlp.hEvent))
-
-                self._pipe = self._createserver()
-            #print "listening thread finished"
-
-        self._close_event = _kernel32.CreateEventA(None, _FALSE, _FALSE, None)
-        
+    def _init_ovlp(self):
         self._ovlp = _OVERLAPPED()
         self._ovlp.Offset = 0
         self._ovlp.OffsetHigh = 0
         self._ovlp.Pointer = None
- 
+
+    def listen(self, backlog):
+        def listener():
+            print "start listen thread @%s" % current_thread().ident
+            self._closing = False
+            while not self._closing:
+                ret = _kernel32.WaitForSingleObject(self._ovlp.hEvent, _INFINITE)
+                if ret == 0: # successfully connected
+                    _kernel32.ResetEvent(self._ovlp.hEvent)
+                    self._conn.put((self._pipe, self._ovlp.hEvent))
+                    if not self._closing:
+                        self._pipe = self._createserver()
+            print "closing pipe %s @%s in listener" % (self._pipe, current_thread().ident)
+            _kernel32.CloseHandle(self._pipe)
+            print "closing event %s @%s in listener" % (self._ovlp.hEvent, current_thread().ident)
+            _kernel32.CloseHandle(self._ovlp.hEvent)
+            print "listening thread finished"
+
+        self._init_ovlp()
         self._pipe = self._createserver()
 
         self._conn = Queue.Queue(backlog)
-        self._bgworker = Thread(target = run)
-        self._bgworker.start()
+        self._listener = Thread(target=listener)
+        self._listener.start()
 
     def recv(self, bufsize, flags=0):
         pass
 
     def sendall(self, string, flag=0):
         written = _DWORD(0)
-        _kernel32.WriteFile(self._handle, string, len(string), _byref(written), None)
+        _kernel32.WriteFile(self._pipe, string, len(string), _byref(written), None)
 
     def sendto(self, string, flag=0, address=None):
         pass
     def shutdown(self, how):
         pass
 
+
 # hook into socket package
 socket.AF_UNIX = 1
 _socket = socket.socket