Commits

Matt Caldwell  committed 7d6f37b

allow tests to pass for python 3

  • Participants
  • Parent commits 7b75212

Comments (0)

Files changed (4)

File storages/backends/hashpath.py

 import os, hashlib, errno
 
 from django.core.files.storage import FileSystemStorage
-from django.utils.encoding import force_unicode
 
 class HashPathStorage(FileSystemStorage):
     """
         # Get the SHA1 hash of the uploaded file
         sha1 = hashlib.sha1()
         for chunk in content.chunks():
-            sha1.update(chunk)
+            sha1.update(chunk.encode('utf-8'))
         sha1sum = sha1.hexdigest()
 
         # Build the new path and split it into directory and filename
         # Try to create the directory relative to location specified in __init__
         try:
             os.makedirs(os.path.join(self.location, dir_name))
-        except OSError, e:
+        except OSError as e:
             if e.errno is not errno.EEXIST:
                 raise e
 
         name = self._save(name, content)
 
         # Store filenames with forward slashes, even on Windows
-        return force_unicode(name.replace('\\', '/'))
+        return name.replace('\\', '/')

File storages/backends/s3boto.py

 from gzip import GzipFile
 import datetime
 
-try:
-    from cStringIO import StringIO
-except ImportError:
-    from StringIO import StringIO  # noqa
+from io import StringIO, BytesIO
 
 from django.core.files.base import File
 from django.core.files.storage import Storage
 from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation
-from django.utils.encoding import force_unicode, smart_str
+from django.utils.encoding import smart_str
 
 try:
     from boto import __version__ as boto_version
     Paths outside the base path indicate a possible security
     sensitive operation.
     """
-    from urlparse import urljoin
-    base_path = force_unicode(base)
+    from urllib.parse import urljoin
+    base_path = base
     base_path = base_path.rstrip('/')
-    paths = [force_unicode(p) for p in paths]
+    paths = [p for p in paths]
 
     final_path = base_path
     for path in paths:
     def __init__(self, acl=None, bucket=None, **settings):
         # check if some of the settings we've provided as class attributes
         # need to be overwritten with values passed in here
-        for name, value in settings.items():
+        for name, value in list(settings.items()):
             if hasattr(self, name):
                 setattr(self, name, value)
 
         return smart_str(name, encoding=self.file_name_charset)
 
     def _decode_name(self, name):
-        return force_unicode(name, encoding=self.file_name_charset)
+        return name.encode(self.file_name_charset)
 
     def _compress_content(self, content):
         """Gzip a given string content."""
-        zbuf = StringIO()
+        zbuf = BytesIO()
         zfile = GzipFile(mode='wb', compresslevel=6, fileobj=zbuf)
         try:
-            zfile.write(content.read())
+            zfile.write(bytes(content.read(), 'utf-8'))
         finally:
             zfile.close()
         zbuf.seek(0)

File storages/tests/s3boto.py

-import os
-import mock
-from uuid import uuid4
-from urllib2 import urlopen
-import datetime
-
-from django.test import TestCase
-from django.core.files.base import ContentFile
-from django.conf import settings
-from django.core.files.storage import FileSystemStorage
-
-from boto.s3.key import Key
-
-from storages.backends import s3boto
-
-__all__ = (
-    'ParseTsExtendedCase',
-    'SafeJoinTest',
-    'S3BotoStorageTests',
-    #'S3BotoStorageFileTests',
-)
-
-class ParseTsExtendedCase(TestCase):
-    def test_normal(self):
-        value = s3boto.parse_ts_extended("Wed, 13 Mar 2013 12:45:49 GMT")
-        self.assertEquals(value, datetime.datetime(2013, 3, 13, 12, 45, 49))
-
-class S3BotoTestCase(TestCase):
-    @mock.patch('storages.backends.s3boto.S3Connection')
-    def setUp(self, S3Connection):
-        self.storage = s3boto.S3BotoStorage()
-        self.storage._bucket = mock.MagicMock()
-
-class SafeJoinTest(TestCase):
-    def test_normal(self):
-        path = s3boto.safe_join("", "path/to/somewhere", "other", "path/to/somewhere")
-        self.assertEquals(path, "path/to/somewhere/other/path/to/somewhere")
-
-    def test_with_dot(self):
-        path = s3boto.safe_join("", "path/./somewhere/../other", "..",
-                                ".", "to/./somewhere")
-        self.assertEquals(path, "path/to/somewhere")
-
-    def test_base_url(self):
-        path = s3boto.safe_join("base_url", "path/to/somewhere")
-        self.assertEquals(path, "base_url/path/to/somewhere")
-
-    def test_base_url_with_slash(self):
-        path = s3boto.safe_join("base_url/", "path/to/somewhere")
-        self.assertEquals(path, "base_url/path/to/somewhere")
-
-    def test_suspicious_operation(self):
-        self.assertRaises(ValueError,
-            s3boto.safe_join, "base", "../../../../../../../etc/passwd")
-
-class S3BotoStorageTests(S3BotoTestCase):
-
-    def test_storage_save(self):
-        """
-        Test saving a file
-        """
-        name = 'test_storage_save.txt'
-        content = ContentFile('new content')
-        self.storage.save(name, content)
-        self.storage.bucket.get_key.assert_called_once_with(name)
-
-        key = self.storage.bucket.get_key.return_value
-        key.set_metadata.assert_called_with('Content-Type', 'text/plain')
-        key.set_contents_from_file.assert_called_with(
-            content,
-            headers={'Content-Type': 'text/plain'},
-            policy=self.storage.default_acl,
-            reduced_redundancy=self.storage.reduced_redundancy,
-            rewind=True
-        )
-
-    def test_storage_save_gzip(self):
-        """
-        Test saving a file with gzip enabled.
-        """
-        if not s3boto.S3BotoStorage.gzip:  # Gzip not available.
-            return
-        name = 'test_storage_save.css'
-        content = ContentFile("I should be gzip'd")
-        self.storage.save(name, content)
-        key = self.storage.bucket.get_key.return_value
-        key.set_metadata.assert_called_with('Content-Type', 'text/css')
-        key.set_contents_from_file.assert_called_with(
-            content,
-            headers={'Content-Type': 'text/css', 'Content-Encoding': 'gzip'},
-            policy=self.storage.default_acl,
-            reduced_redundancy=self.storage.reduced_redundancy,
-            rewind=True,
-        )
-
-    def test_compress_content_len(self):
-        """
-        Test that file returned by _compress_content() is readable.
-        """
-        if not s3boto.S3BotoStorage.gzip:  # Gzip not available.
-            return
-        content = ContentFile("I should be gzip'd")
-        content = self.storage._compress_content(content)
-        self.assertTrue(len(content.read()) > 0)
-
-    def test_storage_open_write(self):
-        """
-        Test opening a file in write mode
-        """
-        name = 'test_open_for_writing.txt'
-        content = 'new content'
-
-        # Set the ACL header used when creating/writing data.
-        self.storage.bucket.connection.provider.acl_header = 'x-amz-acl'
-        # Set the mocked key's bucket
-        self.storage.bucket.get_key.return_value.bucket = self.storage.bucket
-        # Set the name of the mock object
-        self.storage.bucket.get_key.return_value.name = name
-
-        file = self.storage.open(name, 'w')
-        self.storage.bucket.get_key.assert_called_with(name)
-
-        file.write(content)
-        self.storage.bucket.initiate_multipart_upload.assert_called_with(
-            name,
-            headers={'x-amz-acl': 'public-read'},
-            reduced_redundancy=self.storage.reduced_redundancy,
-        )
-
-        # Save the internal file before closing
-        _file = file.file
-        file.close()
-        file._multipart.upload_part_from_file.assert_called_with(
-            _file, 1, headers=self.storage.headers,
-        )
-        file._multipart.complete_upload.assert_called_once()
-
-    #def test_storage_exists_and_delete(self):
-    #    # show file does not exist
-    #    name = self.prefix_path('test_exists.txt')
-    #    self.assertFalse(self.storage.exists(name))
-    #
-    #    # create the file
-    #    content = 'new content'
-    #    file = self.storage.open(name, 'w')
-    #    file.write(content)
-    #    file.close()
-    #
-    #    # show file exists
-    #    self.assertTrue(self.storage.exists(name))
-    #
-    #    # delete the file
-    #    self.storage.delete(name)
-    #
-    #    # show file does not exist
-    #    self.assertFalse(self.storage.exists(name))
-
-    def test_storage_listdir_base(self):
-        file_names = ["some/path/1.txt", "2.txt", "other/path/3.txt", "4.txt"]
-
-        self.storage.bucket.list.return_value = []
-        for p in file_names:
-            key = mock.MagicMock(spec=Key)
-            key.name = p
-            self.storage.bucket.list.return_value.append(key)
-
-        dirs, files = self.storage.listdir("")
-
-        self.assertEqual(len(dirs), 2)
-        for directory in ["some", "other"]:
-            self.assertTrue(directory in dirs,
-                            """ "%s" not in directory list "%s".""" % (
-                                directory, dirs))
-
-        self.assertEqual(len(files), 2)
-        for filename in ["2.txt", "4.txt"]:
-            self.assertTrue(filename in files,
-                            """ "%s" not in file list "%s".""" % (
-                                filename, files))
-
-    def test_storage_listdir_subdir(self):
-        file_names = ["some/path/1.txt", "some/2.txt"]
-
-        self.storage.bucket.list.return_value = []
-        for p in file_names:
-            key = mock.MagicMock(spec=Key)
-            key.name = p
-            self.storage.bucket.list.return_value.append(key)
-
-        dirs, files = self.storage.listdir("some/")
-        self.assertEqual(len(dirs), 1)
-        self.assertTrue('path' in dirs,
-                        """ "path" not in directory list "%s".""" % (dirs,))
-
-        self.assertEqual(len(files), 1)
-        self.assertTrue('2.txt' in files,
-                        """ "2.txt" not in files list "%s".""" % (files,))
-
-    #def test_storage_size(self):
-    #    name = self.prefix_path('test_storage_size.txt')
-    #    content = 'new content'
-    #    f = ContentFile(content)
-    #    self.storage.save(name, f)
-    #    self.assertEqual(self.storage.size(name), f.size)
-    #
-    #def test_storage_url(self):
-    #    name = self.prefix_path('test_storage_size.txt')
-    #    content = 'new content'
-    #    f = ContentFile(content)
-    #    self.storage.save(name, f)
-    #    self.assertEqual(content, urlopen(self.storage.url(name)).read())
-
-#class S3BotoStorageFileTests(S3BotoTestCase):
-#    def test_multipart_upload(self):
-#        nparts = 2
-#        name = self.prefix_path("test_multipart_upload.txt")
-#        mode = 'w'
-#        f = s3boto.S3BotoStorageFile(name, mode, self.storage)
-#        content_length = 1024 * 1024# 1 MB
-#        content = 'a' * content_length
-#
-#        bytes = 0
-#        target = f._write_buffer_size * nparts
-#        while bytes < target:
-#            f.write(content)
-#            bytes += content_length
-#
-#        # make the buffer roll over so f._write_counter
-#        # is incremented
-#        f.write("finished")
-#
-#        # verify upload was multipart and correctly partitioned
-#        self.assertEqual(f._write_counter, nparts)
-#
-#        # complete the upload
-#        f.close()
-#
-#        # verify that the remaining buffered bytes were
-#        # uploaded when the file was closed.
-#        self.assertEqual(f._write_counter, nparts+1)
+import os
+import mock
+from uuid import uuid4
+from urllib.request import urlopen
+import datetime
+
+from django.test import TestCase
+from django.core.files.base import ContentFile
+from django.conf import settings
+from django.core.files.storage import FileSystemStorage
+
+from boto.s3.key import Key
+
+from storages.backends import s3boto
+
+__all__ = (
+    'ParseTsExtendedCase',
+    'SafeJoinTest',
+    'S3BotoStorageTests',
+    #'S3BotoStorageFileTests',
+)
+
+class ParseTsExtendedCase(TestCase):
+    def test_normal(self):
+        value = s3boto.parse_ts_extended("Wed, 13 Mar 2013 12:45:49 GMT")
+        self.assertEquals(value, datetime.datetime(2013, 3, 13, 12, 45, 49))
+
+class S3BotoTestCase(TestCase):
+    @mock.patch('storages.backends.s3boto.S3Connection')
+    def setUp(self, S3Connection):
+        self.storage = s3boto.S3BotoStorage()
+        self.storage._bucket = mock.MagicMock()
+
+class SafeJoinTest(TestCase):
+    def test_normal(self):
+        path = s3boto.safe_join("", "path/to/somewhere", "other", "path/to/somewhere")
+        self.assertEquals(path, "path/to/somewhere/other/path/to/somewhere")
+
+    def test_with_dot(self):
+        path = s3boto.safe_join("", "path/./somewhere/../other", "..",
+                                ".", "to/./somewhere")
+        self.assertEquals(path, "path/to/somewhere")
+
+    def test_base_url(self):
+        path = s3boto.safe_join("base_url", "path/to/somewhere")
+        self.assertEquals(path, "base_url/path/to/somewhere")
+
+    def test_base_url_with_slash(self):
+        path = s3boto.safe_join("base_url/", "path/to/somewhere")
+        self.assertEquals(path, "base_url/path/to/somewhere")
+
+    def test_suspicious_operation(self):
+        self.assertRaises(ValueError,
+            s3boto.safe_join, "base", "../../../../../../../etc/passwd")
+
+class S3BotoStorageTests(S3BotoTestCase):
+
+    def test_storage_save(self):
+        """
+        Test saving a file
+        """
+        name = 'test_storage_save.txt'
+        content = ContentFile('new content')
+        self.storage.save(name, content)
+        self.storage.bucket.get_key.assert_called_once_with(name)
+
+        key = self.storage.bucket.get_key.return_value
+        key.set_metadata.assert_called_with('Content-Type', 'text/plain')
+        key.set_contents_from_file.assert_called_with(
+            content,
+            headers={'Content-Type': 'text/plain'},
+            policy=self.storage.default_acl,
+            reduced_redundancy=self.storage.reduced_redundancy,
+            rewind=True
+        )
+
+    def test_storage_save_gzip(self):
+        """
+        Test saving a file with gzip enabled.
+        """
+        if not s3boto.S3BotoStorage.gzip:  # Gzip not available.
+            return
+        name = 'test_storage_save.css'
+        content = ContentFile("I should be gzip'd")
+        self.storage.save(name, content)
+        key = self.storage.bucket.get_key.return_value
+        key.set_metadata.assert_called_with('Content-Type', 'text/css')
+        key.set_contents_from_file.assert_called_with(
+            content,
+            headers={'Content-Type': 'text/css', 'Content-Encoding': 'gzip'},
+            policy=self.storage.default_acl,
+            reduced_redundancy=self.storage.reduced_redundancy,
+            rewind=True,
+        )
+
+    def test_compress_content_len(self):
+        """
+        Test that file returned by _compress_content() is readable.
+        """
+        if not s3boto.S3BotoStorage.gzip:  # Gzip not available.
+            return
+        content = ContentFile("I should be gzip'd")
+        content = self.storage._compress_content(content)
+        self.assertTrue(len(content.read()) > 0)
+
+    def test_storage_open_write(self):
+        """
+        Test opening a file in write mode
+        """
+        name = 'test_open_for_writing.txt'
+        content = 'new content'
+
+        # Set the ACL header used when creating/writing data.
+        self.storage.bucket.connection.provider.acl_header = 'x-amz-acl'
+        # Set the mocked key's bucket
+        self.storage.bucket.get_key.return_value.bucket = self.storage.bucket
+        # Set the name of the mock object
+        self.storage.bucket.get_key.return_value.name = name
+
+        file = self.storage.open(name, 'w')
+        self.storage.bucket.get_key.assert_called_with(name)
+
+        file.write(content)
+        self.storage.bucket.initiate_multipart_upload.assert_called_with(
+            name,
+            headers={'x-amz-acl': 'public-read'},
+            reduced_redundancy=self.storage.reduced_redundancy,
+        )
+
+        # Save the internal file before closing
+        _file = file.file
+        file.close()
+        file._multipart.upload_part_from_file.assert_called_with(
+            _file, 1, headers=self.storage.headers,
+        )
+        file._multipart.complete_upload.assert_called_once()
+
+    #def test_storage_exists_and_delete(self):
+    #    # show file does not exist
+    #    name = self.prefix_path('test_exists.txt')
+    #    self.assertFalse(self.storage.exists(name))
+    #
+    #    # create the file
+    #    content = 'new content'
+    #    file = self.storage.open(name, 'w')
+    #    file.write(content)
+    #    file.close()
+    #
+    #    # show file exists
+    #    self.assertTrue(self.storage.exists(name))
+    #
+    #    # delete the file
+    #    self.storage.delete(name)
+    #
+    #    # show file does not exist
+    #    self.assertFalse(self.storage.exists(name))
+
+    def test_storage_listdir_base(self):
+        file_names = ["some/path/1.txt", "2.txt", "other/path/3.txt", "4.txt"]
+
+        self.storage.bucket.list.return_value = []
+        for p in file_names:
+            key = mock.MagicMock(spec=Key)
+            key.name = p
+            self.storage.bucket.list.return_value.append(key)
+
+        dirs, files = self.storage.listdir("")
+
+        self.assertEqual(len(dirs), 2)
+        for directory in ["some", "other"]:
+            self.assertTrue(directory in dirs,
+                            """ "%s" not in directory list "%s".""" % (
+                                directory, dirs))
+
+        self.assertEqual(len(files), 2)
+        for filename in ["2.txt", "4.txt"]:
+            self.assertTrue(filename in files,
+                            """ "%s" not in file list "%s".""" % (
+                                filename, files))
+
+    def test_storage_listdir_subdir(self):
+        file_names = ["some/path/1.txt", "some/2.txt"]
+
+        self.storage.bucket.list.return_value = []
+        for p in file_names:
+            key = mock.MagicMock(spec=Key)
+            key.name = p
+            self.storage.bucket.list.return_value.append(key)
+
+        dirs, files = self.storage.listdir("some/")
+        self.assertEqual(len(dirs), 1)
+        self.assertTrue('path' in dirs,
+                        """ "path" not in directory list "%s".""" % (dirs,))
+
+        self.assertEqual(len(files), 1)
+        self.assertTrue('2.txt' in files,
+                        """ "2.txt" not in files list "%s".""" % (files,))
+
+    #def test_storage_size(self):
+    #    name = self.prefix_path('test_storage_size.txt')
+    #    content = 'new content'
+    #    f = ContentFile(content)
+    #    self.storage.save(name, f)
+    #    self.assertEqual(self.storage.size(name), f.size)
+    #
+    #def test_storage_url(self):
+    #    name = self.prefix_path('test_storage_size.txt')
+    #    content = 'new content'
+    #    f = ContentFile(content)
+    #    self.storage.save(name, f)
+    #    self.assertEqual(content, urlopen(self.storage.url(name)).read())
+
+#class S3BotoStorageFileTests(S3BotoTestCase):
+#    def test_multipart_upload(self):
+#        nparts = 2
+#        name = self.prefix_path("test_multipart_upload.txt")
+#        mode = 'w'
+#        f = s3boto.S3BotoStorageFile(name, mode, self.storage)
+#        content_length = 1024 * 1024# 1 MB
+#        content = 'a' * content_length
+#
+#        bytes = 0
+#        target = f._write_buffer_size * nparts
+#        while bytes < target:
+#            f.write(content)
+#            bytes += content_length
+#
+#        # make the buffer roll over so f._write_counter
+#        # is incremented
+#        f.write("finished")
+#
+#        # verify upload was multipart and correctly partitioned
+#        self.assertEqual(f._write_counter, nparts)
+#
+#        # complete the upload
+#        f.close()
+#
+#        # verify that the remaining buffered bytes were
+#        # uploaded when the file was closed.
+#        self.assertEqual(f._write_counter, nparts+1)
     You can play with a django model without a complete django app installation.
     http://www.djangosnippets.org/snippets/1044/
     """
-    sys.exc_clear()
-
     os.environ["DJANGO_SETTINGS_MODULE"] = "django.conf.global_settings"
     from django.conf import global_settings