Commits

Denis Bilenko  committed d7fc49b

test__examples.py: use gevent.subprocess instead of mysubprocess

better start up procedure: check that we started by connecting to an app; also use this check to make sure no other app is bound to this socket

  • Participants
  • Parent commits 632f519

Comments (0)

Files changed (1)

File greentest/test__examples.py

     from urllib import request as urllib2
 else:
     import urllib2
-import time
-import signal
-import re
+from urlparse import urlparse
+from time import time
 import gevent
 from gevent import socket
-import mysubprocess as subprocess
+from gevent import subprocess
 from gevent.server import DatagramServer, StreamServer
-import gevent.hub
-
-
-gevent.hub.Hub.backend = (getattr(gevent.hub.Hub, 'backend') or '') + ',nochild'
 
 # Ignore tracebacks: KeyboardInterrupt
 
 def run_script(path, *args):
     cmd = [sys.executable, join(examples_directory, path)] + list(args)
     popen = subprocess.Popen(cmd)
-    result = popen.gevent_wait()
-    if result != 0:
-        raise AssertionError('%r failed with code %s' % (cmd, result))
+    if popen.wait() != 0:
+        raise AssertionError('%r failed with code %s' % (cmd, popen.wait()))
+
+
+def kill(popen, signum=9):
+    while True:
+        try:
+            popen.send_signal(signum)
+        except OSError, ex:
+            if ex.errno == 3:  # No such process
+                return
+            raise
+        else:
+            gevent.sleep(0.01)
 
 
 class BaseTestServer(unittest.TestCase):
     args = []
 
+    stderr = None
+
+    def get_address(self):
+        parsed = urlparse(self.URL)
+        return parsed.hostname, parsed.port
+
+    def get_sock_type(self):
+        parsed = urlparse(self.URL)
+        if parsed.scheme.lower() == 'udp':
+            return socket.SOCK_DGRAM
+        return socket.SOCK_STREAM
+
+    def assert_refused(self):
+        if self.get_sock_type() == socket.SOCK_DGRAM:
+            return
+        try:
+            self.connect()
+        except socket.error, ex:
+            if 'refused' not in str(ex):
+                raise
+        else:
+            raise AssertionError('Connection to %s succeeds before starting the server' % self.URL)
+
+    def connect(self):
+        sock = socket.socket(type=self.get_sock_type())
+        sock.connect(self.get_address())
+        return sock
+
+    def wait_start(self, timeout):
+        end = time() + timeout
+        while time() <= end:
+            if self.process.poll() is not None:
+                raise AssertionError('Process died')
+            try:
+                self.connect()
+            except socket.error:
+                gevent.sleep(0.01)
+            else:
+                return
+        else:
+            raise AssertionError('Failed to start')
+
     def setUp(self):
-        self.process = subprocess.Popen([sys.executable, join(examples_directory, self.path)] + self.args, cwd=examples_directory)
-        time.sleep(1)
+        self.assert_refused()
+        self.process = subprocess.Popen([sys.executable, join(examples_directory, self.path)] + self.args,
+                                        cwd=examples_directory, stderr=self.stderr)
+        try:
+            self.wait_start(1)
+        except:
+            kill(self.process)
+            raise
+        assert self.process.poll() is None, self.process
 
     def tearDown(self):
-        self.assertEqual(self.process.poll(), None)
-        self.process.interrupt()
-        time.sleep(0.05)
+        kill(self.process)
 
 
 class Test_httpserver(BaseTestServer):
     path = 'wsgiserver.py'
 
 
-if hasattr(socket, 'ssl'):
+class Test_wsgiserver_ssl(Test_httpserver):
+    path = 'wsgiserver_ssl.py'
+    URL = 'https://localhost:8443'
 
-    class Test_wsgiserver_ssl(Test_httpserver):
-        path = 'wsgiserver_ssl.py'
-        URL = 'https://localhost:8443'
-
-else:
-
-    class Test_wsgiserver_ssl(unittest.TestCase):
-        path = 'wsgiserver_ssl.py'
-
-        def setUp(self):
-            self.process = subprocess.Popen([sys.executable, join(examples_directory, self.path)],
-                                            cwd=examples_directory, stderr=subprocess.PIPE)
-            time.sleep(1)
-
-        def test(self):
-            self.assertEqual(self.process.poll(), 1)
-            stderr = self.process.stderr.read().strip()
-            m = re.match('Traceback \(most recent call last\):.*?ImportError: .*?ssl.*', stderr, re.DOTALL)
-            assert m is not None, repr(stderr)
-
-        def tearDown(self):
-            if self.process.poll() is None:
-                try:
-                    SIGINT = getattr(signal, 'SIGINT', None)
-                    if SIGINT is not None:
-                        os.kill(self.process.pid, SIGINT)
-                        time.sleep(0.1)
-                    self.assertEqual(self.process.poll(), 1)
-                finally:
-                    if self.process.poll() is None:
-                        self.process.kill()
+    def connect(self):
+        sock = super(Test_wsgiserver_ssl, self).connect()
+        from gevent import ssl
+        return ssl.wrap_socket(sock)
 
 
 class Test_webpy(Test_httpserver):
         assert "Hello, world" in data, repr(data)
 
     def _test_long(self):
-        start = time.time()
+        start = time()
         status, data = self.read('/long')
-        delay = time.time() - start
+        delay = time() - start
         assert 10 - 0.1 < delay < 10 + 0.1, delay
         self.assertEqual(status, '200 OK')
         self.assertEqual(data, 'Hello, 10 seconds later')
 
 
 class Test_echoserver(BaseTestServer):
+    URL = 'tcp://127.0.0.1:6000'
     path = 'echoserver.py'
 
     def test(self):
         def test_client(message):
-            conn = socket.create_connection(('127.0.0.1', 6000)).makefile(bufsize=1)
+            conn = self.connect().makefile(bufsize=1)
             welcome = conn.readline()
             assert 'Welcome' in welcome, repr(welcome)
             conn.write(message)
 
 class Test_udp_server(BaseTestServer):
     path = 'udp_server.py'
+    URL = 'udp://localhost:9000'
 
     def test(self):
-        address = ('localhost', 9000)
-        sock = socket.socket(type=socket.SOCK_DGRAM)
-        sock.connect(address)
+        gevent.sleep(0.3)
+        sock = self.connect()
         sock.send('Test_udp_server')
         data, address = sock.recvfrom(8192)
         self.assertEqual(data, 'Received 15 bytes')
 class Test_portforwarder(BaseTestServer):
     path = 'portforwarder.py'
     args = ['127.0.0.5:9999', '127.0.0.6:9999']
+    URL = 'tcp://' + args[0]
 
     def test(self):
         log = []
         def handle(socket, address):
             while True:
                 data = socket.recv(1024)
-                log.append(data)
                 if not data:
                     break
+                log.append(data)
 
-        server = StreamServer('127.0.0.6:9999', handle)
+        server = StreamServer(self.args[1], handle)
         server.start()
         try:
-            conn = socket.create_connection(('127.0.0.5', 9999))
+            conn = socket.create_connection(self.get_address())
+            # make sure the connection is accepted at app level rather than at OS level
+            # before sending a signal
             conn.sendall('msg1')
             gevent.sleep(0.1)
-            self.assertEqual(log, ['msg1'])
+            self.process.send_signal(15)
+            # now let's make sure the signal was received
+            gevent.sleep(0.1)
             conn.sendall('msg2')
             conn.close()
         finally:
             server.close()
 
+        with gevent.Timeout(0.1):
+            self.process.wait()
+
+        self.assertEqual(['msg1', 'msg2'], log)
+
 
 tests = set()
 for klass in globals().keys():