Commits

mattip committed eac9695

merge mailbox.py and its tests with cpython 2.7.3

Comments (0)

Files changed (2)

lib-python/2.7/mailbox.py

 import email.message
 import email.generator
 import StringIO
-import contextlib
 try:
     if sys.platform == 'os2emx':
         # OS/2 EMX fcntl() not adequate
         if not self._factory:
             return self.get_message(key)
         else:
-	        return self._factory(self.get_file(key))
+            return self._factory(self.get_file(key))
 
     def get_message(self, key):
         """Return a Message representation or raise a KeyError."""
             else:
                 raise NoSuchMailboxError(self._path)
         self._toc = {}
-        self._toc_mtimes = {}
-        for subdir in ('cur', 'new'):
-            self._toc_mtimes[subdir] = os.path.getmtime(self._paths[subdir])
-        self._last_read = time.time()  # Records last time we read cur/new
-        self._skewfactor = 0.1         # Adjust if os/fs clocks are skewing
+        self._toc_mtimes = {'cur': 0, 'new': 0}
+        self._last_read = 0         # Records last time we read cur/new
+        self._skewfactor = 0.1      # Adjust if os/fs clocks are skewing
 
     def add(self, message):
         """Add message and return assigned key."""
 
     def close(self):
         """Close the file."""
-        self._file.close()
-        del self._file
+        if hasattr(self, '_file'):
+            if hasattr(self._file, 'close'):
+                self._file.close()
+            del self._file
 
     def _read(self, size, read_method):
         """Read size bytes using read_method."""
         self._pos = self._file.tell()
         return result
 
-    def __enter__(self):
-        """Context manager protocol support."""
-        return self
-
-    def __exit__(self, *exc):
-        self.close()
-
-    def readable(self):
-        return self._file.readable()
-
-    def writable(self):
-        return self._file.writable()
-
-    def seekable(self):
-        return self._file.seekable()
-
-    def flush(self):
-        return self._file.flush()
-
-    @property
-    def closed(self):
-        if not hasattr(self, '_file'):
-            return True
-        if not hasattr(self._file, 'closed'):
-            return False
-        return self._file.closed
-
 
 class _PartialFile(_ProxyFile):
     """A read-only wrapper of part of a file."""
             size = remaining
         return _ProxyFile._read(self, size, read_method)
 
+    def close(self):
+        # do *not* close the underlying file object for partial files,
+        # since it's global to the mailbox object
+        if hasattr(self, '_file'):
+            del self._file
+
 
 def _lock_file(f, dotlock=True):
     """Lock file f using lockf and dot locking."""

lib-python/2.7/test/test_mailbox.py

 import email
 import email.message
 import re
+import shutil
 import StringIO
 from test import test_support
 import unittest
     def _delete_recursively(self, target):
         # Delete a file or delete a directory recursively
         if os.path.isdir(target):
-            for path, dirs, files in os.walk(target, topdown=False):
-                for name in files:
-                    os.remove(os.path.join(path, name))
-                for name in dirs:
-                    os.rmdir(os.path.join(path, name))
-            os.rmdir(target)
+            shutil.rmtree(target)
         elif os.path.exists(target):
             os.remove(target)
 
         key1 = self._box.add(self._template % 1)
         self.assertEqual(len(self._box), 2)
         method(key0)
-        self.assertEqual(len(self._box), 1)
+        l = len(self._box)
+        self.assertEqual(l, 1)
         self.assertRaises(KeyError, lambda: self._box[key0])
         self.assertRaises(KeyError, lambda: method(key0))
         self.assertEqual(self._box.get_string(key1), self._template % 1)
         key2 = self._box.add(self._template % 2)
         self.assertEqual(len(self._box), 2)
         method(key2)
-        self.assertEqual(len(self._box), 1)
+        l = len(self._box)
+        self.assertEqual(l, 1)
         self.assertRaises(KeyError, lambda: self._box[key2])
         self.assertRaises(KeyError, lambda: method(key2))
         self.assertEqual(self._box.get_string(key1), self._template % 1)
         key0 = self._box.add(self._template % 0)
         key1 = self._box.add(_sample_message)
         msg0 = self._box.get_file(key0)
-        self.assertEqual(msg0.read().replace(os.linesep, '\n'), self._template % 0)
+        self.assertEqual(msg0.read().replace(os.linesep, '\n'), 
+                         self._template % 0)
         msg1 = self._box.get_file(key1)
-        self.assertEqual(msg1.read().replace(os.linesep, '\n'), _sample_message)
+        self.assertEqual(msg1.read().replace(os.linesep, '\n'), 
+                         _sample_message)
         msg0.close()
         msg1.close()
 
+    def test_get_file_can_be_closed_twice(self):
+        # Issue 11700
+        key = self._box.add(_sample_message)
+        f = self._box.get_file(key)
+        f.close()
+        f.close()
+
     def test_iterkeys(self):
         # Get keys using iterkeys()
         self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
         return self._path + '.lock'
 
 
-class TestMailboxSuperclass(TestBase):
+class TestMailboxSuperclass(TestBase, unittest.TestCase):
 
     def test_notimplemented(self):
         # Test that all Mailbox methods raise NotImplementedException.
         self.assertRaises(NotImplementedError, lambda: box.close())
 
 
-class TestMaildir(TestMailbox):
+class TestMaildir(TestMailbox, unittest.TestCase):
 
     _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
 
                                           key1: os.path.join('new', key1),
                                           key2: os.path.join('new', key2)})
 
+    def test_refresh_after_safety_period(self):
+        # Issue #13254: Call _refresh after the "file system safety
+        # period" of 2 seconds has passed; _toc should still be
+        # updated because this is the first call to _refresh.
+        key0 = self._box.add(self._template % 0)
+        key1 = self._box.add(self._template % 1)
+
+        self._box = self._factory(self._path)
+        self.assertEqual(self._box._toc, {})
+
+        # Emulate sleeping. Instead of sleeping for 2 seconds, use the
+        # skew factor to make _refresh think that the filesystem
+        # safety period has passed and re-reading the _toc is only
+        # required if mtimes differ.
+        self._box._skewfactor = -3
+
+        self._box._refresh()
+        self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1]))
+
     def test_lookup(self):
         # Look up message subpaths in the TOC
         self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
         self.assertFalse((perms & 0111)) # Execute bits should all be off.
 
     def test_reread(self):
+        # Do an initial unconditional refresh
+        self._box._refresh()
 
         # Put the last modified times more than two seconds into the past
         # (because mtime may have only a two second granularity).
         # refresh is done unconditionally if called for within
         # two-second-plus-a-bit of the last one, just in case the mbox has
         # changed; so now we have to wait for that interval to expire.
-        time.sleep(2.01 + self._box._skewfactor)
+        #
+        # Because this is a test, emulate sleeping. Instead of
+        # sleeping for 2 seconds, use the skew factor to make _refresh
+        # think that 2 seconds have passed and re-reading the _toc is
+        # only required if mtimes differ.
+        self._box._skewfactor = -3
 
         # Re-reading causes the ._toc attribute to be assigned a new dictionary
         # object, so we'll check that the ._toc attribute isn't a different
         self.assertFalse(refreshed())
 
         # Now, write something into cur and remove it.  This changes
-        # the mtime and should cause a re-read.
+        # the mtime and should cause a re-read. Note that "sleep
+        # emulation" is still in effect, as skewfactor is -3.
         filename = os.path.join(self._path, 'cur', 'stray-file')
         f = open(filename, 'w')
         f.close()
             self.assertEqual(contents, f.read())
         self._box = self._factory(self._path)
 
+    @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().")
+    @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
     def test_lock_conflict(self):
-        # Fork off a subprocess that will lock the file for 2 seconds,
-        # unlock it, and then exit.
-        if not hasattr(os, 'fork'):
-            return
+        # Fork off a child process that will lock the mailbox temporarily,
+        # unlock it and exit.
+        c, p = socket.socketpair()
+        self.addCleanup(c.close)
+        self.addCleanup(p.close)
+
         pid = os.fork()
         if pid == 0:
-            # In the child, lock the mailbox.
-            self._box.lock()
-            time.sleep(2)
-            self._box.unlock()
-            os._exit(0)
- 
-        # In the parent, sleep a bit to give the child time to acquire
-        # the lock.
-        time.sleep(0.5)
+            # child
+            try:
+                # lock the mailbox, and signal the parent it can proceed
+                self._box.lock()
+                c.send(b'c')
+
+                # wait until the parent is done, and unlock the mailbox
+                c.recv(1)
+                self._box.unlock()
+            finally:
+                os._exit(0)
+
+        # In the parent, wait until the child signals it locked the mailbox.
+        p.recv(1)
         try:
             self.assertRaises(mailbox.ExternalClashError,
                               self._box.lock)
         finally:
+            # Signal the child it can now release the lock and exit.
+            p.send(b'p')
             # Wait for child to exit.  Locking should now succeed.
             exited_pid, status = os.waitpid(pid, 0)
 
         self._box.close()
 
 
-class TestMbox(_TestMboxMMDF):
+class TestMbox(_TestMboxMMDF, unittest.TestCase):
 
     _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
 
             perms = st.st_mode
             self.assertFalse((perms & 0111)) # Execute bits should all be off.
 
-class TestMMDF(_TestMboxMMDF):
+class TestMMDF(_TestMboxMMDF, unittest.TestCase):
 
     _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
 
 
-class TestMH(TestMailbox):
+class TestMH(TestMailbox, unittest.TestCase):
 
     _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
 
         return os.path.join(self._path, '.mh_sequences.lock')
 
 
-class TestBabyl(TestMailbox):
+class TestBabyl(TestMailbox, unittest.TestCase):
 
     _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
 
         self.assertEqual(set(self._box.get_labels()), set(['blah']))
 
 
-class TestMessage(TestBase):
+class TestMessage(TestBase, unittest.TestCase):
 
     _factory = mailbox.Message      # Overridden by subclasses to reuse tests
 
         pass
 
 
-class TestMaildirMessage(TestMessage):
+class TestMaildirMessage(TestMessage, unittest.TestCase):
 
     _factory = mailbox.MaildirMessage
 
         self._check_sample(msg)
 
 
-class _TestMboxMMDFMessage(TestMessage):
+class _TestMboxMMDFMessage:
 
     _factory = mailbox._mboxMMDFMessage
 
                                  r"\d{2} \d{4}", msg.get_from()))
 
 
-class TestMboxMessage(_TestMboxMMDFMessage):
+class TestMboxMessage(_TestMboxMMDFMessage, TestMessage):
 
     _factory = mailbox.mboxMessage
 
 
-class TestMHMessage(TestMessage):
+class TestMHMessage(TestMessage, unittest.TestCase):
 
     _factory = mailbox.MHMessage
 
         self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
 
 
-class TestBabylMessage(TestMessage):
+class TestBabylMessage(TestMessage, unittest.TestCase):
 
     _factory = mailbox.BabylMessage
 
             self.assertEqual(visible[header], msg[header])
 
 
-class TestMMDFMessage(_TestMboxMMDFMessage):
+class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage):
 
     _factory = mailbox.MMDFMessage
 
 
-class TestMessageConversion(TestBase):
+class TestMessageConversion(TestBase, unittest.TestCase):
 
     def test_plain_to_x(self):
         # Convert Message to all formats
     def _test_close(self, proxy):
         # Close a file
         proxy.close()
-        self.assertRaises(AttributeError, lambda: proxy.close())
+        # Issue 11700 subsequent closes should be a no-op, not an error.
+        proxy.close()
 
 
-class TestProxyFile(TestProxyFileBase):
+class TestProxyFile(TestProxyFileBase, unittest.TestCase):
 
     def setUp(self):
         self._path = test_support.TESTFN
         self._test_close(mailbox._ProxyFile(self._file))
 
 
-class TestPartialFile(TestProxyFileBase):
+class TestPartialFile(TestProxyFileBase, unittest.TestCase):
 
     def setUp(self):
         self._path = test_support.TESTFN