Commits

James Lan committed 9daf8cb

Add new test cases for getpeername and getsockname; fix some bug to pass some test case….

  • Participants
  • Parent commits f4cf409

Comments (0)

Files changed (2)

pypesocketTest.py

                 func(self, *args, **kwargs)
             except socket.error, e:
                 err = e.args
-            self.assertEqual(err[0], errcode)
+            self.assertEqual(errcode, err[0])
             if errtext:
                 self.assertTrue(errtext.match(err[1]))
         return test_decorated
     return test_decorator
 
 
-def recvall(sock):
+def recv_all(sock):
     cs = cStringIO.StringIO()
     s = True
     try:
                     while True:
                         sock, addr = self.socket.accept()
                         if response and not self.closing:
-                            sock.sendall(response(recvall(sock)))
+                            sock.sendall(response(recv_all(sock)))
                             sock.shutdown(socket.SHUT_WR)
                         sock.close()
                 except socket.error, err:
     def test_connect_ex_fail(self):
         sock = socket.socket(socket.AF_UNIX)
         err = sock.connect_ex(self.PipePath)
-        self.assertEqual(err, errno.ENOENT)
+        self.assertEqual(errno.ENOENT, err)
 
     @server(echo)
     def test_getpeername(self):
         sock = socket.socket(socket.AF_UNIX)
         sock.connect(self.PipePath)
-        self.assertEqual(sock.getpeername(), self.PipePath)
+        self.assertEqual(self.PipePath, sock.getpeername())
         sock.shutdown(socket.SHUT_WR)
         time.sleep(0.1)
         sock.close()
 
     @expect_socketerror(errno.ENOTCONN)
+    def test_getpeername_server_wo_listen(self):
+        sock = socket.socket(socket.AF_UNIX)
+        sock.bind(PipesocketTest.PipePath)
+        sock.getpeername()
+
+    @expect_socketerror(errno.ENOTCONN)
+    def test_getpeername_server_wo_connect(self):
+        sock = socket.socket(socket.AF_UNIX)
+        sock.bind(PipesocketTest.PipePath)
+        sock.listen(5)
+        try:
+            sock.getpeername()
+        finally:
+            sock.close()
+
+    def test_getpeername_server(self):
+        svr = socket.socket(socket.AF_UNIX)
+        svr.bind(PipesocketTest.PipePath)
+        svr.listen(5)
+
+        cli = socket.socket(socket.AF_UNIX)
+        cli.connect(PipesocketTest.PipePath)
+
+        sock, addr = svr.accept()
+        self.assertEqual('', sock.getpeername())
+        self.assertEqual('', addr)
+
+        sock.close()
+        cli.close()
+        svr.close()
+
+    @expect_socketerror(errno.ENOTCONN)
     def test_getpeername_wo_connect(self):
         sock = socket.socket(socket.AF_UNIX)
         sock.getpeername()
     def test_getpeername_on_broken(self):
         sock = socket.socket(socket.AF_UNIX)
         sock.connect(self.PipePath)
-        time.sleep(0.1)
+        time.sleep(0.1)   # wait server to close
         sock.getpeername()
         sock.close()
 
     @server(echo)
     def test_getsockname(self):
         sock = socket.socket(socket.AF_UNIX)
-        self.assertEqual(sock.getsockname(), '')
+        self.assertEqual('', sock.getsockname())
         sock.connect(self.PipePath)
-        self.assertEqual(sock.getsockname(), '')
+        self.assertEqual('', sock.getsockname())
         sock.shutdown(socket.SHUT_WR)
         time.sleep(0.1)
         sock.close()
 
+    def test_getsockname_server(self):
+        sock = socket.socket(socket.AF_UNIX)
+        sock.bind(PipesocketTest.PipePath)
+        self.assertEqual(PipesocketTest.PipePath, sock.getsockname())
+
     @server()
     def test_getsockname_on_broken(self):
         sock = socket.socket(socket.AF_UNIX)
         sock.connect(self.PipePath)
         time.sleep(0.1)
-        self.assertEqual(sock.getsockname(), '')
+        self.assertEqual('', sock.getsockname())
         sock.close()
 
     @server(echo)
     def test_echo(self):
         sock = socket.socket(socket.AF_UNIX)
         sock.connect(self.PipePath)
-        sock.sendall("hello")
+        sock.sendall('hello')
         sock.shutdown(socket.SHUT_WR)
-        self.assertEqual(recvall(sock), "hello")
+        self.assertEqual('hello', recv_all(sock))
         sock.close()
 
 if __name__ == '__main__':
 import ctypes.wintypes
 import Queue
 from threading import Thread
+import errno
 
 _kernel32 = ctypes.windll.kernel32
 
         if not pipe:
             raise socket.error(socket.EBADF, 'Bad file descriptor')
         sock = _pipesocket()
-        sock.handle = pipe
-        sock.server = False
-        return sock, None
+        sock._handle = pipe
+        sock._server = False
+        return sock, ''
 
     def bind(self, address):
-        self.path = '\\\\.\\pipe\\' + os.path.abspath(address)
+        self._addr = address
+        self.calc_pipename(address)
+
+    def calc_pipename(self, address):
+        self._path = '\\\\.\\pipe\\' + os.path.abspath(address)
 
     def close(self):
-        if self.server:
+        if self._server:
             timeout = 0.01
             for retry in xrange(10):
                 self.closing = True
-                _kernel32.DeleteFileA(self.path)
+                _kernel32.DeleteFileA(self._path)
                 time.sleep(timeout)
                 if self.closed:
                     break
                 timeout *= 2
             else:
-                raise socket.error("failed to close socket " + self.path)
+                raise socket.error("failed to close socket " + self._path)
             self.listener.join()
         else:
-            _kernel32.CloseHandle(self.handle)
+            _kernel32.CloseHandle(self._handle)
 
     def connect(self, address):
-        self.server = False
-        self.bind(address)
+        self._server = False
+        self.calc_pipename(address)
         timeout = 0.01
         for retry in xrange(10):
-            self.handle = _kernel32.CreateFileA(
-                self.path,
+            self._handle = _kernel32.CreateFileA(
+                self._path,
                 _GENERIC_READ | _GENERIC_WRITE,
                 0, None,
                 _OPEN_EXISTING,
                 0,
                 None)
-            if self.handle != _INVALID_HANDLE_VALUE:
+            if self._handle != _INVALID_HANDLE_VALUE:
                 # connected, set message mode
                 mode = _DWORD(_PIPE_READMODE_MESSAGE)
                 if not _kernel32.SetNamedPipeHandleState(
-                        self.handle,
+                        self._handle,
                         _byref(mode),
                         None,
                         None):
         return self.event
 
     def getpeername(self):
-        pass
+        if not hasattr(self, '_handle'):
+            raise socket.error(errno.ENOTCONN)
+        return getattr(self, '_addr', '')
 
     def getsockname(self):
-        return ''
+        return getattr(self, '_addr', '')
 
     def getsockopt(self, level, optname, buflen):
         pass
     def listen(self, backlog):
         def run():
             while True:
-                pipe = _kernel32.CreateNamedPipeA(self.path,
+                pipe = _kernel32.CreateNamedPipeA(self._path,
                     _PIPE_ACCESS_DUPLEX,
                     _PIPE_TYPE_MESSAGE | _PIPE_READMODE_MESSAGE |
                     _PIPE_WAIT,
                     _kernel32.SetEvent(self.event)
             print "listening thread finished"
 
-        self.server = True
+        self._server = True
         self.closing = False
         self.closed = False
         self.event = _kernel32.CreateEventA(None, _FALSE, _FALSE, None)
         pass
 
     def sendall(self, string, flag=0):
-        _kernel32.WriteFileA(self.handle, string, len(string), None, None)
+        _kernel32.WriteFile(self._handle, string, len(string), None, None)
 
     def sendto(self, string, flag=0, address=None):
         pass