1. jython
  2. jython

Commits

Frank Wierzbicki  committed 6f0976c

Update test_mailbox and test_support for latest 2.7.

  • Participants
  • Parent commits a438b8f
  • Branches default

Comments (0)

Files changed (2)

File Lib/test/test_mailbox.py

View file
 import re
 import shutil
 import StringIO
+import tempfile
 from test import test_support
 import unittest
 import mailbox
 # Silence Py3k warning
 rfc822 = test_support.import_module('rfc822', deprecated=True)
 
-class TestBase(unittest.TestCase):
+class TestBase:
 
     def _check_sample(self, msg):
         # Inspect a mailbox.Message representation of the sample message
     def _delete_recursively(self, target):
         # Delete a file or delete a directory recursively
         if os.path.isdir(target):
-            shutil.rmtree(target)
+            test_support.rmtree(target)
         elif os.path.exists(target):
-            os.remove(target)
+            test_support.unlink(target)
 
 
 class TestMailbox(TestBase):
 
     _factory = None     # Overridden by subclasses to reuse tests
-    _template = 'From: foo\n\n%s'
+    _template = 'From: foo\n\n%s\n'
 
     def setUp(self):
         self._path = test_support.TESTFN
         for i in (1, 2, 3, 4):
             self._check_sample(self._box[keys[i]])
 
+    def test_add_file(self):
+        with tempfile.TemporaryFile('w+') as f:
+            f.write(_sample_message)
+            f.seek(0)
+            key = self._box.add(f)
+        self.assertEqual(self._box.get_string(key).split('\n'),
+            _sample_message.split('\n'))
+
+    def test_add_StringIO(self):
+        key = self._box.add(StringIO.StringIO(self._template % "0"))
+        self.assertEqual(self._box.get_string(key), self._template % "0")
+
     def test_remove(self):
         # Remove messages using remove()
         self._test_remove_or_delitem(self._box.remove)
         key0 = self._box.add(self._template % 0)
         msg = self._box.get(key0)
         self.assertEqual(msg['from'], 'foo')
-        self.assertEqual(msg.get_payload(), '0')
+        self.assertEqual(msg.get_payload(), '0\n')
         self.assertIs(self._box.get('foo'), None)
         self.assertFalse(self._box.get('foo', False))
         self._box.close()
         key1 = self._box.add(self._template % 1)
         msg = self._box.get(key1)
         self.assertEqual(msg['from'], 'foo')
-        self.assertEqual(msg.fp.read(), '1')
+        self.assertEqual(msg.fp.read(), '1' + os.linesep)
+        msg.fp.close()
 
     def test_getitem(self):
         # Retrieve message using __getitem__()
         key0 = self._box.add(self._template % 0)
         msg = self._box[key0]
         self.assertEqual(msg['from'], 'foo')
-        self.assertEqual(msg.get_payload(), '0')
+        self.assertEqual(msg.get_payload(), '0\n')
         self.assertRaises(KeyError, lambda: self._box['foo'])
         self._box.discard(key0)
         self.assertRaises(KeyError, lambda: self._box[key0])
         msg0 = self._box.get_message(key0)
         self.assertIsInstance(msg0, mailbox.Message)
         self.assertEqual(msg0['from'], 'foo')
-        self.assertEqual(msg0.get_payload(), '0')
+        self.assertEqual(msg0.get_payload(), '0\n')
         self._check_sample(self._box.get_message(key1))
 
     def test_get_string(self):
         # Get file representations of messages
         key0 = self._box.add(self._template % 0)
         key1 = self._box.add(_sample_message)
-        self.assertEqual(self._box.get_file(key0).read().replace(os.linesep, '\n'),
+        msg0 = self._box.get_file(key0)
+        self.assertEqual(msg0.read().replace(os.linesep, '\n'),
                          self._template % 0)
-        self.assertEqual(self._box.get_file(key1).read().replace(os.linesep, '\n'),
+        msg1 = self._box.get_file(key1)
+        self.assertEqual(msg1.read().replace(os.linesep, '\n'),
                          _sample_message)
+        msg0.close()
+        msg1.close()
 
     def test_get_file_can_be_closed_twice(self):
         # Issue 11700
         self.assertIn(key0, self._box)
         key1 = self._box.add(self._template % 1)
         self.assertIn(key1, self._box)
-        self.assertEqual(self._box.pop(key0).get_payload(), '0')
+        self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
         self.assertNotIn(key0, self._box)
         self.assertIn(key1, self._box)
         key2 = self._box.add(self._template % 2)
         self.assertIn(key2, self._box)
-        self.assertEqual(self._box.pop(key2).get_payload(), '2')
+        self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
         self.assertNotIn(key2, self._box)
         self.assertIn(key1, self._box)
-        self.assertEqual(self._box.pop(key1).get_payload(), '1')
+        self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
         self.assertNotIn(key1, self._box)
         self.assertEqual(len(self._box), 0)
 
         # Write changes to disk
         self._test_flush_or_close(self._box.flush, True)
 
+    def test_popitem_and_flush_twice(self):
+        # See #15036.
+        self._box.add(self._template % 0)
+        self._box.add(self._template % 1)
+        self._box.flush()
+
+        self._box.popitem()
+        self._box.flush()
+        self._box.popitem()
+        self._box.flush()
+
     def test_lock_unlock(self):
         # Lock and unlock the mailbox
         self.assertFalse(os.path.exists(self._get_lock_path()))
         self._box.add(contents[0])
         self._box.add(contents[1])
         self._box.add(contents[2])
+        oldbox = self._box
         method()
         if should_call_close:
             self._box.close()
         self.assertEqual(len(keys), 3)
         for key in keys:
             self.assertIn(self._box.get_string(key), contents)
+        oldbox.close()
 
     def test_dump_message(self):
         # Write message representations to disk
         return self._path + '.lock'
 
 
-class TestMailboxSuperclass(TestBase):
+class TestMailboxSuperclass(TestBase, unittest.TestCase):
 
     def test_notimplemented(self):
         # Test that all Mailbox methods raise NotImplementedException.
         self.assertRaises(NotImplementedError, lambda: box.close())
 
 
-class TestMaildir(TestMailbox):
+class TestMaildir(TestMailbox, unittest.TestCase):
 
     _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
 
         msg_returned = self._box.get_message(key)
         self.assertEqual(msg_returned.get_subdir(), 'new')
         self.assertEqual(msg_returned.get_flags(), '')
-        self.assertEqual(msg_returned.get_payload(), '1')
+        self.assertEqual(msg_returned.get_payload(), '1\n')
         msg2 = mailbox.MaildirMessage(self._template % 2)
         msg2.set_info('2,S')
         self._box[key] = msg2
         msg_returned = self._box.get_message(key)
         self.assertEqual(msg_returned.get_subdir(), 'new')
         self.assertEqual(msg_returned.get_flags(), 'S')
-        self.assertEqual(msg_returned.get_payload(), '3')
+        self.assertEqual(msg_returned.get_payload(), '3\n')
 
     def test_consistent_factory(self):
         # Add a message.
             self.assertTrue(match is not None, "Invalid file name: '%s'" % tail)
             groups = match.groups()
             if previous_groups is not None:
-                self.assertTrue(int(groups[0] >= previous_groups[0]),
+                self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]),
                              "Non-monotonic seconds: '%s' before '%s'" %
                              (previous_groups[0], groups[0]))
-                self.assertTrue(int(groups[1] >= previous_groups[1]) or
-                             groups[0] != groups[1],
-                             "Non-monotonic milliseconds: '%s' before '%s'" %
-                             (previous_groups[1], groups[1]))
+                if int(groups[0]) == int(previous_groups[0]):
+                    self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]),
+                                "Non-monotonic milliseconds: '%s' before '%s'" %
+                                (previous_groups[1], groups[1]))
                 self.assertTrue(int(groups[2]) == pid,
                              "Process ID mismatch: '%s' should be '%s'" %
                              (groups[2], pid))
         self._box._refresh()
         self.assertTrue(refreshed())
 
-class _TestMboxMMDF(TestMailbox):
+
+class _TestSingleFile(TestMailbox):
+    '''Common tests for single-file mailboxes'''
+
+    def test_add_doesnt_rewrite(self):
+        # When only adding messages, flush() should not rewrite the
+        # mailbox file. See issue #9559.
+
+        # Inode number changes if the contents are written to another
+        # file which is then renamed over the original file. So we
+        # must check that the inode number doesn't change.
+        inode_before = os.stat(self._path).st_ino
+
+        self._box.add(self._template % 0)
+        self._box.flush()
+
+        inode_after = os.stat(self._path).st_ino
+        self.assertEqual(inode_before, inode_after)
+
+        # Make sure the message was really added
+        self._box.close()
+        self._box = self._factory(self._path)
+        self.assertEqual(len(self._box), 1)
+
+    def test_permissions_after_flush(self):
+        # See issue #5346
+
+        # Make the mailbox world writable. It's unlikely that the new
+        # mailbox file would have these permissions after flush(),
+        # because umask usually prevents it.
+        mode = os.stat(self._path).st_mode | 0o666
+        os.chmod(self._path, mode)
+
+        self._box.add(self._template % 0)
+        i = self._box.add(self._template % 1)
+        # Need to remove one message to make flush() create a new file
+        self._box.remove(i)
+        self._box.flush()
+
+        self.assertEqual(os.stat(self._path).st_mode, mode)
+
+
+class _TestMboxMMDF(_TestSingleFile):
 
     def tearDown(self):
         self._box.close()
 
     def test_add_from_string(self):
         # Add a string starting with 'From ' to the mailbox
-        key = self._box.add('From foo@bar blah\nFrom: foo\n\n0')
+        key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n')
         self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
-        self.assertEqual(self._box[key].get_payload(), '0')
+        self.assertEqual(self._box[key].get_payload(), '0\n')
 
     def test_add_mbox_or_mmdf_message(self):
         # Add an mboxMessage or MMDFMessage
         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
-            msg = class_('From foo@bar blah\nFrom: foo\n\n0')
+            msg = class_('From foo@bar blah\nFrom: foo\n\n0\n')
             key = self._box.add(msg)
 
     def test_open_close_open(self):
         self._box.close()
 
 
-class TestMbox(_TestMboxMMDF):
+class TestMbox(_TestMboxMMDF, unittest.TestCase):
 
     _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
 
             perms = st.st_mode
             self.assertFalse((perms & 0111)) # Execute bits should all be off.
 
-class TestMMDF(_TestMboxMMDF):
+    def test_terminating_newline(self):
+        message = email.message.Message()
+        message['From'] = 'john@example.com'
+        message.set_payload('No newline at the end')
+        i = self._box.add(message)
+
+        # A newline should have been appended to the payload
+        message = self._box.get(i)
+        self.assertEqual(message.get_payload(), 'No newline at the end\n')
+
+    def test_message_separator(self):
+        # Check there's always a single blank line after each message
+        self._box.add('From: foo\n\n0')  # No newline at the end
+        with open(self._path) as f:
+            data = f.read()
+            self.assertEqual(data[-3:], '0\n\n')
+
+        self._box.add('From: foo\n\n0\n')  # Newline at the end
+        with open(self._path) as f:
+            data = f.read()
+            self.assertEqual(data[-3:], '0\n\n')
+
+
+class TestMMDF(_TestMboxMMDF, unittest.TestCase):
 
     _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
 
 
-class TestMH(TestMailbox):
+class TestMH(TestMailbox, unittest.TestCase):
 
     _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
 
         return os.path.join(self._path, '.mh_sequences.lock')
 
 
-class TestBabyl(TestMailbox):
+class TestBabyl(_TestSingleFile, unittest.TestCase):
 
     _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
 
         self.assertEqual(set(self._box.get_labels()), set(['blah']))
 
 
-class TestMessage(TestBase):
+class TestMessage(TestBase, unittest.TestCase):
 
     _factory = mailbox.Message      # Overridden by subclasses to reuse tests
 
         pass
 
 
-class TestMaildirMessage(TestMessage):
+class TestMaildirMessage(TestMessage, unittest.TestCase):
 
     _factory = mailbox.MaildirMessage
 
         self._check_sample(msg)
 
 
-class _TestMboxMMDFMessage(TestMessage):
+class _TestMboxMMDFMessage:
 
     _factory = mailbox._mboxMMDFMessage
 
                                  r"\d{2} \d{4}", msg.get_from()))
 
 
-class TestMboxMessage(_TestMboxMMDFMessage):
+class TestMboxMessage(_TestMboxMMDFMessage, TestMessage):
 
     _factory = mailbox.mboxMessage
 
 
-class TestMHMessage(TestMessage):
+class TestMHMessage(TestMessage, unittest.TestCase):
 
     _factory = mailbox.MHMessage
 
         self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
 
 
-class TestBabylMessage(TestMessage):
+class TestBabylMessage(TestMessage, unittest.TestCase):
 
     _factory = mailbox.BabylMessage
 
             self.assertEqual(visible[header], msg[header])
 
 
-class TestMMDFMessage(_TestMboxMMDFMessage):
+class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage):
 
     _factory = mailbox.MMDFMessage
 
 
-class TestMessageConversion(TestBase):
+class TestMessageConversion(TestBase, unittest.TestCase):
 
     def test_plain_to_x(self):
         # Convert Message to all formats
         proxy.close()
 
 
-class TestProxyFile(TestProxyFileBase):
+class TestProxyFile(TestProxyFileBase, unittest.TestCase):
 
     def setUp(self):
         self._path = test_support.TESTFN
         self._test_close(mailbox._ProxyFile(self._file))
 
 
-class TestPartialFile(TestProxyFileBase):
+class TestPartialFile(TestProxyFileBase, unittest.TestCase):
 
     def setUp(self):
         self._path = test_support.TESTFN
     def setUp(self):
         # create a new maildir mailbox to work with:
         self._dir = test_support.TESTFN
+        if os.path.isdir(self._dir):
+            test_support.rmtree(self._dir)
+        if os.path.isfile(self._dir):
+            test_support.unlink(self._dir)
         os.mkdir(self._dir)
         os.mkdir(os.path.join(self._dir, "cur"))
         os.mkdir(os.path.join(self._dir, "tmp"))
 
     def tearDown(self):
         map(os.unlink, self._msgfiles)
-        os.rmdir(os.path.join(self._dir, "cur"))
-        os.rmdir(os.path.join(self._dir, "tmp"))
-        os.rmdir(os.path.join(self._dir, "new"))
-        os.rmdir(self._dir)
+        test_support.rmdir(os.path.join(self._dir, "cur"))
+        test_support.rmdir(os.path.join(self._dir, "tmp"))
+        test_support.rmdir(os.path.join(self._dir, "new"))
+        test_support.rmdir(self._dir)
 
     def createMessage(self, dir, mbox=False):
         t = int(time.time() % 1000000)
         self.createMessage("cur")
         self.mbox = mailbox.Maildir(test_support.TESTFN)
         #self.assertTrue(len(self.mbox.boxes) == 1)
-        self.assertIsNot(self.mbox.next(), None)
+        msg = self.mbox.next()
+        self.assertIsNot(msg, None)
+        msg.fp.close()
         self.assertIs(self.mbox.next(), None)
         self.assertIs(self.mbox.next(), None)
 
         self.createMessage("new")
         self.mbox = mailbox.Maildir(test_support.TESTFN)
         #self.assertTrue(len(self.mbox.boxes) == 1)
-        self.assertIsNot(self.mbox.next(), None)
+        msg = self.mbox.next()
+        self.assertIsNot(msg, None)
+        msg.fp.close()
         self.assertIs(self.mbox.next(), None)
         self.assertIs(self.mbox.next(), None)
 
         self.createMessage("new")
         self.mbox = mailbox.Maildir(test_support.TESTFN)
         #self.assertTrue(len(self.mbox.boxes) == 2)
-        self.assertIsNot(self.mbox.next(), None)
-        self.assertIsNot(self.mbox.next(), None)
+        msg = self.mbox.next()
+        self.assertIsNot(msg, None)
+        msg.fp.close()
+        msg = self.mbox.next()
+        self.assertIsNot(msg, None)
+        msg.fp.close()
         self.assertIs(self.mbox.next(), None)
         self.assertIs(self.mbox.next(), None)
 
         import email.parser
         fname = self.createMessage("cur", True)
         n = 0
-        for msg in mailbox.PortableUnixMailbox(open(fname),
+        fid = open(fname)
+        for msg in mailbox.PortableUnixMailbox(fid,
                                                email.parser.Parser().parse):
             n += 1
             self.assertEqual(msg["subject"], "Simple Test")
             self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE))
+        fid.close()
         self.assertEqual(n, 1)
 
 ## End: classes from the original module (for backward compatibility).

File Lib/test/test_support.py

View file
 import UserDict
 import re
 import time
+import struct
+import sysconfig
+
+try:
+    import _testcapi
+except ImportError:
+    _testcapi = None
+
 try:
     import thread
 except ImportError:
     except KeyError:
         pass
 
+if sys.platform.startswith("win"):
+    def _waitfor(func, pathname, waitall=False):
+        # Peform the operation
+        func(pathname)
+        # Now setup the wait loop
+        if waitall:
+            dirname = pathname
+        else:
+            dirname, name = os.path.split(pathname)
+            dirname = dirname or '.'
+        # Check for `pathname` to be removed from the filesystem.
+        # The exponential backoff of the timeout amounts to a total
+        # of ~1 second after which the deletion is probably an error
+        # anyway.
+        # Testing on a i7@4.3GHz shows that usually only 1 iteration is
+        # required when contention occurs.
+        timeout = 0.001
+        while timeout < 1.0:
+            # Note we are only testing for the existance of the file(s) in
+            # the contents of the directory regardless of any security or
+            # access rights.  If we have made it this far, we have sufficient
+            # permissions to do that much using Python's equivalent of the
+            # Windows API FindFirstFile.
+            # Other Windows APIs can fail or give incorrect results when
+            # dealing with files that are pending deletion.
+            L = os.listdir(dirname)
+            if not (L if waitall else name in L):
+                return
+            # Increase the timeout and try again
+            time.sleep(timeout)
+            timeout *= 2
+        warnings.warn('tests may fail, delete still pending for ' + pathname,
+                      RuntimeWarning, stacklevel=4)
+
+    def _unlink(filename):
+        _waitfor(os.unlink, filename)
+
+    def _rmdir(dirname):
+        _waitfor(os.rmdir, dirname)
+
+    def _rmtree(path):
+        def _rmtree_inner(path):
+            for name in os.listdir(path):
+                fullname = os.path.join(path, name)
+                if os.path.isdir(fullname):
+                    _waitfor(_rmtree_inner, fullname, waitall=True)
+                    os.rmdir(fullname)
+                else:
+                    os.unlink(fullname)
+        _waitfor(_rmtree_inner, path, waitall=True)
+        _waitfor(os.rmdir, path)
+else:
+    _unlink = os.unlink
+    _rmdir = os.rmdir
+    _rmtree = shutil.rmtree
+
 def unlink(filename):
     try:
-        os.unlink(filename)
+        _unlink(filename)
     except OSError:
         pass
 
+def rmdir(dirname):
+    try:
+        _rmdir(dirname)
+    except OSError as error:
+        # The directory need not exist.
+        if error.errno != errno.ENOENT:
+            raise
+
 def rmtree(path):
     try:
-        shutil.rmtree(path)
+        _rmtree(path)
     except OSError, e:
         # Unix returns ENOENT, Windows returns ESRCH.
         if e.errno not in (errno.ENOENT, errno.ESRCH):
     the CWD, an error is raised.  If it's True, only a warning is raised
     and the original CWD is used.
     """
-    if isinstance(name, unicode):
+    if have_unicode and isinstance(name, unicode):
         try:
             name = name.encode(sys.getfilesystemencoding() or 'ascii')
         except UnicodeEncodeError:
         ('EAI_FAIL', -4),
         ('EAI_NONAME', -2),
         ('EAI_NODATA', -5),
+        # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
+        # implementation actually returns WSANO_DATA i.e. 11004.
+        ('WSANO_DATA', 11004),
     ]
 
     denied = ResourceDenied("Resource '%s' is not available" % resource_name)
     return captured_output("stdin")
 
 
+_header = '2P'
+if hasattr(sys, "gettotalrefcount"):
+    _header = '2P' + _header
+_vheader = _header + 'P'
+
+def calcobjsize(fmt):
+    return struct.calcsize(_header + fmt + '0P')
+
+def calcvobjsize(fmt):
+    return struct.calcsize(_vheader + fmt + '0P')
+
+
+_TPFLAGS_HAVE_GC = 1<<14
+_TPFLAGS_HEAPTYPE = 1<<9
+
+def check_sizeof(test, o, size):
+    result = sys.getsizeof(o)
+    # add GC header size
+    if (_testcapi and\
+        (type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
+        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
+        size += _testcapi.SIZEOF_PYGC_HEAD
+    msg = 'wrong size for %s: got %d, expected %d' \
+            % (type(o), result, size)
+    test.assertEqual(result, size, msg)
+
+
 #=======================================================================
 # Decorator for running a function in a different locale, correctly resetting
 # it afterwards.
         return wrapper
     return decorator
 
-def precisionbigmemtest(size, memuse, overhead=5*_1M):
+def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
     def decorator(f):
         def wrapper(self):
             if not real_max_memuse:
             else:
                 maxsize = size
 
-                if real_max_memuse and real_max_memuse < maxsize * memuse:
-                    if verbose:
-                        sys.stderr.write("Skipping %s because of memory "
-                                         "constraint\n" % (f.__name__,))
-                    return
+            if ((real_max_memuse or not dry_run)
+                and real_max_memuse < maxsize * memuse):
+                if verbose:
+                    sys.stderr.write("Skipping %s because of memory "
+                                     "constraint\n" % (f.__name__,))
+                return
 
             return f(self, maxsize)
         wrapper.size = size
             suite.addTest(unittest.makeSuite(cls))
     _run_suite(suite)
 
+#=======================================================================
+# Check for the presence of docstrings.
+
+HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
+                   sys.platform == 'win32' or
+                   sysconfig.get_config_var('WITH_DOC_STRINGS'))
+
+requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
+                                          "test requires docstrings")
+
 
 #=======================================================================
 # doctest driver.
             except:
                 break
 
+@contextlib.contextmanager
+def swap_attr(obj, attr, new_val):
+    """Temporary swap out an attribute with a new object.
+
+    Usage:
+        with swap_attr(obj, "attr", 5):
+            ...
+
+        This will set obj.attr to 5 for the duration of the with: block,
+        restoring the old value at the end of the block. If `attr` doesn't
+        exist on `obj`, it will be created and then deleted at the end of the
+        block.
+    """
+    if hasattr(obj, attr):
+        real_val = getattr(obj, attr)
+        setattr(obj, attr, new_val)
+        try:
+            yield
+        finally:
+            setattr(obj, attr, real_val)
+    else:
+        setattr(obj, attr, new_val)
+        try:
+            yield
+        finally:
+            delattr(obj, attr)
+
 def py3k_bytes(b):
     """Emulate the py3k bytes() constructor.