Commits

Jannis Leidel committed e8b661b Merge

Comments (0)

Files changed (4)

storages/backends/s3boto.py

 import os
-import re
-import time
 import mimetypes
-import calendar
-from datetime import datetime
 
 try:
     from cStringIO import StringIO
 
     return final_path.lstrip('/')
 
-# Dates returned from S3's API look something like this:
-# "Sun, 11 Mar 2012 17:01:41 GMT"
-MONTH_NAMES = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
-               'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
-DATESTR_RE = re.compile(r"^.+, (?P<day>\d{1,2}) (?P<month_name>%s) (?P<year>\d{4}) (?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2}) (GMT|UTC)$" % ("|".join(MONTH_NAMES)))
-def _parse_datestring(dstr):
-    """
-    Parse a simple datestring returned by the S3 API and returns
-    a datetime object in the local timezone.
-    """
-    # This regular expression and thus this function
-    # assumes the date is GMT/UTC
-    m = DATESTR_RE.match(dstr)
-    if m:
-        # This code could raise a ValueError if there is some
-        # bad data or the date is invalid.
-        datedict = m.groupdict()
-        utc_datetime = datetime(
-            int(datedict['year']),
-            int(MONTH_NAMES.index(datedict['month_name'])) + 1,
-            int(datedict['day']),
-            int(datedict['hour']),
-            int(datedict['minute']),
-            int(datedict['second']),
-        )
-
-        # Convert the UTC datetime object to local time.
-        return datetime(*time.localtime(calendar.timegm(utc_datetime.timetuple()))[:6])
-    else:
-        raise ValueError("Could not parse date string: " + dstr)
 
 class S3BotoStorage(Storage):
     """
     Amazon Simple Storage Service using Boto
-    
+
     This storage backend supports opening files in read or write
     mode and supports streaming(buffering) data in chunks to S3
     when writing.
     def _get_access_keys(self):
         """
         Gets the access keys to use when accessing S3. If none
-        are provided to the class in the constructor or in the 
+        are provided to the class in the constructor or in the
         settings then get them from the environment variables.
         """
         access_key = ACCESS_KEY_NAME
         """Gzip a given string content."""
         zbuf = StringIO()
         zfile = GzipFile(mode='wb', compresslevel=6, fileobj=zbuf)
-        zfile.write(content.read())
-        zfile.close()
+        try:
+            zfile.write(content.read())
+        finally:
+            zfile.close()
         content.file = zbuf
+        content.seek(0)
         return content
 
     def _open(self, name, mode='rb'):
         return self.bucket.get_key(self._encode_name(name)).size
 
     def modified_time(self, name):
+        try:
+            from dateutil import parser, tz
+        except ImportError:
+            raise NotImplementedError()
         name = self._normalize_name(self._clean_name(name))
         entry = self.entries.get(name)
+        # only call self.bucket.get_key() if the key is not found
+        # in the preloaded metadata.
         if entry is None:
             entry = self.bucket.get_key(self._encode_name(name))
-
-        # Parse the last_modified string to a local datetime object.
-        return _parse_datestring(entry.last_modified)
+        # convert to string to date
+        last_modified_date = parser.parse(entry.last_modified)
+        # if the date has no timzone, assume UTC
+        if last_modified_date.tzinfo == None:
+            last_modified_date = last_modified_date.replace(tzinfo=tz.tzutc())
+        # convert date to local time w/o timezone
+        timezone = tz.gettz(settings.TIME_ZONE)
+        return last_modified_date.astimezone(timezone).replace(tzinfo=None)
 
     def url(self, name):
         name = self._normalize_name(self._clean_name(name))
 class S3BotoStorageFile(File):
     """
     The default file object used by the S3BotoStorage backend.
-    
+
     This file implements file streaming using boto's multipart
     uploading functionality. The file can be opened in read or
     write mode.
     in your application.
     """
     # TODO: Read/Write (rw) mode may be a bit undefined at the moment. Needs testing.
-    # TODO: When Django drops support for Python 2.5, rewrite to use the 
+    # TODO: When Django drops support for Python 2.5, rewrite to use the
     #       BufferedIO streams in the Python 2.6 io module.
 
     def __init__(self, name, mode, storage, buffer_size=FILE_BUFFER_SIZE):
         self._multipart = None
         # 5 MB is the minimum part size (if there is more than one part).
         # Amazon allows up to 10,000 parts.  The default supports uploads
-        # up to roughly 50 GB.  Increase the part size to accommodate 
+        # up to roughly 50 GB.  Increase the part size to accommodate
         # for files larger than this.
         self._write_buffer_size = buffer_size
         self._write_counter = 0
     def size(self):
         return self.key.size
 
-    @property
-    def file(self):
+    def _get_file(self):
         if self._file is None:
             self._file = StringIO()
             if 'r' in self._mode:
                 self._is_dirty = False
                 self.key.get_contents_to_file(self._file)
                 self._file.seek(0)
+            content_type = mimetypes.guess_type(self.name)[0] or Key.DefaultContentType
+            if (self._storage.gzip and
+                    content_type in self._storage.gzip_content_types):
+                self._file = GzipFile(mode=self._mode, fileobj=self._file)
         return self._file
 
+    def _set_file(self, value):
+        self._file = value
+
+    file = property(_get_file, _set_file)
+
     def read(self, *args, **kwargs):
         if 'r' not in self._mode:
             raise AttributeError("File was not opened in read mode.")
             upload_headers.update(self._storage.headers)
             self._multipart = self._storage.bucket.initiate_multipart_upload(
                 self.key.name,
-                headers = upload_headers,
-                reduced_redundancy = self._storage.reduced_redundancy
+                headers=upload_headers,
+                reduced_redundancy=self._storage.reduced_redundancy
             )
         if self._write_buffer_size <= self._buffer_file_size:
             self._flush_write_buffer()
     @property
     def _buffer_file_size(self):
         pos = self.file.tell()
-        self.file.seek(0,os.SEEK_END)
+        self.file.seek(0, os.SEEK_END)
         length = self.file.tell()
         self.file.seek(pos)
         return length

storages/tests/s3boto.py

     def test_suspicious_operation(self):
         self.assertRaises(ValueError,
             s3boto.safe_join, "base", "../../../../../../../etc/passwd")
-    
+
 class S3BotoStorageTests(S3BotoTestCase):
 
     def test_storage_save(self):
         content = ContentFile('new content')
         self.storage.save(name, content)
         self.storage.bucket.get_key.assert_called_once_with(name)
-        
+
         key = self.storage.bucket.get_key.return_value
         key.set_metadata.assert_called_with('Content-Type', 'text/plain')
         key.set_contents_from_file.assert_called_with(
             policy=self.storage.acl,
             reduced_redundancy=self.storage.reduced_redundancy,
         )
-    
+
+    def test_storage_save_gzip(self):
+        """
+        Test saving a file with gzip enabled.
+        """
+        if not s3boto.IS_GZIPPED:  # Gzip not available.
+            return
+        name = 'test_storage_save.css'
+        content = ContentFile("I should be gzip'd")
+        self.storage.save(name, content)
+        key = self.storage.bucket.get_key.return_value
+        key.set_metadata.assert_called_with('Content-Type', 'text/css')
+        key.set_contents_from_file.assert_called_with(
+            content,
+            headers={'Content-Encoding': 'gzip'},
+            policy=self.storage.acl,
+            reduced_redundancy=self.storage.reduced_redundancy,
+        )
+
+    def test_compress_content_len(self):
+        """
+        Test that file returned by _compress_content() is readable.
+        """
+        if not s3boto.IS_GZIPPED:  # Gzip not available.
+            return
+        content = ContentFile("I should be gzip'd")
+        content = self.storage._compress_content(content)
+        self.assertGreater(len(content.read()), 0)
+
     def test_storage_open_write(self):
         """
         Test opening a file in write mode
         # Set the mocked key's bucket
         self.storage.bucket.get_key.return_value.bucket = self.storage.bucket
         # Set the name of the mock object
-        self.storage.bucket.get_key.return_value.name = name 
+        self.storage.bucket.get_key.return_value.name = name
 
         file = self.storage.open(name, 'w')
         self.storage.bucket.get_key.assert_called_with(name)
             _file, 1, headers=self.storage.headers,
         )
         file._multipart.complete_upload.assert_called_once()
-    
+
     #def test_storage_exists_and_delete(self):
     #    # show file does not exist
     #    name = self.prefix_path('test_exists.txt')
     #    self.assertFalse(self.storage.exists(name))
-    #    
+    #
     #    # create the file
     #    content = 'new content'
     #    file = self.storage.open(name, 'w')
     #    file.write(content)
     #    file.close()
-    #    
+    #
     #    # show file exists
     #    self.assertTrue(self.storage.exists(name))
-    #    
+    #
     #    # delete the file
     #    self.storage.delete(name)
-    #    
+    #
     #    # show file does not exist
     #    self.assertFalse(self.storage.exists(name))
 
 
         self.assertEqual(len(dirs), 2)
         for directory in ["some", "other"]:
-            self.assertTrue(directory in dirs, 
+            self.assertTrue(directory in dirs,
                             """ "%s" not in directory list "%s".""" % (
                                 directory, dirs))
-            
+
         self.assertEqual(len(files), 2)
         for filename in ["2.txt", "4.txt"]:
-            self.assertTrue(filename in files, 
+            self.assertTrue(filename in files,
                             """ "%s" not in file list "%s".""" % (
                                 filename, files))
 
 
         dirs, files = self.storage.listdir("some/")
         self.assertEqual(len(dirs), 1)
-        self.assertTrue('path' in dirs, 
+        self.assertTrue('path' in dirs,
                         """ "path" not in directory list "%s".""" % (dirs,))
-            
+
         self.assertEqual(len(files), 1)
-        self.assertTrue('2.txt' in files, 
+        self.assertTrue('2.txt' in files,
                         """ "2.txt" not in files list "%s".""" % (files,))
 
     #def test_storage_size(self):
     #    f = ContentFile(content)
     #    self.storage.save(name, f)
     #    self.assertEqual(self.storage.size(name), f.size)
-    #    
+    #
     #def test_storage_url(self):
     #    name = self.prefix_path('test_storage_size.txt')
     #    content = 'new content'
     #    f = ContentFile(content)
     #    self.storage.save(name, f)
     #    self.assertEqual(content, urlopen(self.storage.url(name)).read())
-        
+
 #class S3BotoStorageFileTests(S3BotoTestCase):
 #    def test_multipart_upload(self):
 #        nparts = 2
 #        f = s3boto.S3BotoStorageFile(name, mode, self.storage)
 #        content_length = 1024 * 1024# 1 MB
 #        content = 'a' * content_length
-#        
+#
 #        bytes = 0
 #        target = f._write_buffer_size * nparts
 #        while bytes < target:
 #            f.write(content)
 #            bytes += content_length
-#            
+#
 #        # make the buffer roll over so f._write_counter
 #        # is incremented
 #        f.write("finished")
-#        
+#
 #        # verify upload was multipart and correctly partitioned
 #        self.assertEqual(f._write_counter, nparts)
-#        
+#
 #        # complete the upload
 #        f.close()
-#        
+#
 #        # verify that the remaining buffered bytes were
 #        # uploaded when the file was closed.
 #        self.assertEqual(f._write_counter, nparts+1)
         'beproud.django.authutils.middleware.AuthMiddleware',
     )
     global_settings.DEFAULT_FILE_STORAGE = 'backends.s3boto.S3BotoStorage'
+    global_settings.AWS_IS_GZIPPED = True
+    global_settings.SECRET_KEY = "tralala"
 
     from django.test.utils import get_runner
     test_runner = get_runner(global_settings)
 # content of: tox.ini , put in same dir as setup.py
 [tox]
-envlist = django11,django12,django13
+envlist = django11,django12,django13,django14
 
 [testenv]
 deps=boto
     django==1.3.1
     boto==2.2.2
 commands=python setup.py test
+
+[testenv:django14]
+deps=
+    mock==0.8.0
+    django==1.4
+    boto==2.2.2
+commands=python setup.py test
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.