Anonymous avatar Anonymous committed 703f7ff

Added testcases for GreenPipe.seek and truncate, fixed problem with GreenPipe(file_name).

Comments (0)

Files changed (2)

eventlet/greenio.py

             raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)
 
         if isinstance(f, basestring):
-            f = open(f, mode, bufsize=0)
+            f = open(f, mode, 0)
  
         if isinstance(f, int):
             fileno = f

tests/greenio_test.py

 import os
 import sys
 import array
+import tempfile, shutil
 
 def bufsized(sock, size=1):
     """ Resize both send and receive buffers on a socket.
     test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
     return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
 
-class TestGreenIo(LimitedTestCase):
+class TestGreenSocket(LimitedTestCase):
     def assertWriteToClosedFileRaises(self, fd):
         if sys.version_info[0]<3:
             # 2.x socket._fileobjects are odd: writes don't check
         server.close()
         client.close()
 
+    @skip_with_pyevent
+    def test_raised_multiple_readers(self):
+        debug.hub_prevent_multiple_readers(True)
+
+        def handle(sock, addr):
+            sock.recv(1)
+            sock.sendall("a")
+            raise eventlet.StopServe()
+        listener = eventlet.listen(('127.0.0.1', 0))
+        server = eventlet.spawn(eventlet.serve, 
+                                listener,
+                                handle)
+        def reader(s):
+            s.recv(1)
+
+        s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
+        a = eventlet.spawn(reader, s)
+        eventlet.sleep(0)
+        self.assertRaises(RuntimeError, s.recv, 1)
+        s.sendall('b')
+        a.wait()
+
+        
+class TestGreenPipe(LimitedTestCase):
+    def setUp(self):
+        super(self.__class__, self).setUp()
+        self.tempdir = tempfile.mkdtemp('_green_pipe_test')
+
+    def tearDown(self):
+        shutil.rmtree(self.tempdir)
+        super(self.__class__, self).tearDown()
+
+    def test_pipe(self):
+        r,w = os.pipe()
+        rf = greenio.GreenPipe(r, 'r');
+        wf = greenio.GreenPipe(w, 'w', 0);
+        def sender(f, content):
+            for ch in content:
+                eventlet.sleep(0.0001)
+                f.write(ch)
+            f.close()
+
+        one_line = "12345\n";
+        eventlet.spawn(sender, wf, one_line*5)
+        for i in xrange(5):
+            line = rf.readline()
+            eventlet.sleep(0.01)
+            self.assertEquals(line, one_line)
+        self.assertEquals(rf.readline(), '')
+
     def test_pipe_read(self):
         # ensure that 'readline' works properly on GreenPipes when data is not
         # immediately available (fd is nonblocking, was raising EAGAIN)
                 % (expected[:4], expected[-4:], buf[:4], buf[-4:], i))
         gt.wait()
 
+    def test_seek_on_buffered_pipe(self):
+        f = greenio.GreenPipe(self.tempdir+"/TestFile", 'w+', 1024)
+        self.assertEquals(f.tell(),0)
+        f.seek(0,2)
+        self.assertEquals(f.tell(),0)
+        f.write('1234567890')
+        f.seek(0,2)
+        self.assertEquals(f.tell(),10)
+        f.seek(0)
+        value = f.read(1)
+        self.assertEqual(value, '1')
+        self.assertEquals(f.tell(),1)
+        value = f.read(1)
+        self.assertEqual(value, '2')
+        self.assertEquals(f.tell(),2)
+        f.seek(0, 1)
+        self.assertEqual(f.readline(), '34567890')
+        f.seek(0)
+        self.assertEqual(f.readline(), '1234567890')
+        f.seek(0, 2)
+        self.assertEqual(f.readline(), '')
 
-    @skip_with_pyevent
-    def test_raised_multiple_readers(self):
-        debug.hub_prevent_multiple_readers(True)
+    def test_truncate(self):
+        f = greenio.GreenPipe(self.tempdir+"/TestFile", 'w+', 1024)
+        f.write('1234567890')
+        f.truncate(9)
+        self.assertEquals(f.tell(), 9)
 
-        def handle(sock, addr):
-            sock.recv(1)
-            sock.sendall("a")
-            raise eventlet.StopServe()
-        listener = eventlet.listen(('127.0.0.1', 0))
-        server = eventlet.spawn(eventlet.serve, 
-                                listener,
-                                handle)
-        def reader(s):
-            s.recv(1)
-
-        s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
-        a = eventlet.spawn(reader, s)
-        eventlet.sleep(0)
-        self.assertRaises(RuntimeError, s.recv, 1)
-        s.sendall('b')
-        a.wait()
-        
 
 class TestGreenIoLong(LimitedTestCase):
     TEST_TIMEOUT=10  # the test here might take a while depending on the OS
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.