Commits

Faheem Mitha committed 5dceb29

Delete ZipFileUpload object at the end of register_zip and update tests accordingly.

  • Participants
  • Parent commits fa43103

Comments (0)

Files changed (3)

 from zipfile import ZipFile
 from utils import unique
 
-def zipnamelist(filepath):
+def zipnamelist(zipfile):
     """Return list of files and directories in zip files as a list of
     tuples of the form (f,'f') and (d,'d')"""
-    f = open(filepath, 'r')
-    z = ZipFile(file = f, mode = 'r')
+    z = ZipFile(zipfile)
     namelist = z.namelist()
     z.close()
-    f.close
     # zipfile's namelist returns directories with a trailing /.
     newnamelist = []
     for name in namelist:
 def dirlist(namelist):
     """Return sublist of directories from directories in path name list. """
     assert type(namelist) == type([]), "type of namelist is not list"
+    from utils import clean
     dlist = []
     namelist = expand_pathlist(namelist)
     for name in namelist:
         if name[1] == 'd':
             dlist.append(name[0])
-    return dlist
+    return clean(dlist)
 
 def register_files(instance, folderdir):
     """Register files to the media area"""
             description = "from zip file %s"%zipfilename
         folderdir = register_folders(dlist, description, instance.folder)
         filedir = register_files(instance, folderdir)
+        instance.delete()
     except:
         import sys
         #See http://blog.ianbicking.org/2007/09/12/re-raising-exceptions/
         exc_info = sys.exc_info()
         print >> sys.stderr, "register_zip post_save signal failed for %(i)s - deleting %(i)s"%{'i':instance}
-        instance.delete()
+        try:
+            instance.delete()
+        except:
+            pass
         try:
             filedir
         except NameError:
         data = {'name':filename, 'description':'test file %s'%filename}
         file_data = {'upload': SimpleUploadedFile(filename, zf.read())}
         ff = ZipFileUploadForm(data, file_data)
-        zu = ff.save(commit=False)
-        zu.save()
-        return zf, zu
+        ff.save()
+        return zf
     def addTestZipFileToFolder(self, makezipfn, folderobj):
         zf = makezipfn()
-        #f = open(zfpath)
-        #filename = os.path.basename(zfpath)
         filename = zf.name
         data = {'name':filename, 'description':'test file %s'%filename, 'folder':str(folderobj.id)}
         file_data = {'upload': SimpleUploadedFile(filename, zf.read())}
-        #f.close()
         ff = ZipFileUploadForm(data, file_data)
-        zu = ff.save(commit=False)
-        zu.save()
-        return zf, zu
+        zu = ff.save()
+        return zf, folderobj
 
     #http://docs.djangoproject.com/en/dev/ref/forms/api/
     #See also "Streaming Uploads Discussion" in django-devel
 
     def addTestZipFile(self, makezipfn=makezip1):
         f = Fixture()
-        zfpath, zu = f.addTestZipFile(makezipfn)
-        return zfpath, zu
+        zf = f.addTestZipFile(makezipfn)
+        return zf
 
     def addTestZipFileToFolder(self, makezipfn=makezip1):
         f = Fixture()
         self.assertEqual(response['Location'].split('testserver')[1], reverse('top_folders'))
         self.assertEqual(clean(walk()), [])
 
-    def checkZipFileUpload(self, zipfile, zipfileupload=None):
+    def checkZipFileUpload(self, zf, folderobj=None):
         """
         This assumes only one zip file has been uploaded.  Check
         consistency between the ZipFileUpload object (zipfileupload)
         and the files uploaded and directories created on the system.
         """
-        if len(ZipFileUpload.objects.all()) != 1:
-            import sys
-            sys.exit("checkZipFileUpload expects exactly one ZipUpload object, not %s"%ZipFileUpload.objects.all())
-        # If ZipFileUpload object not given, try to get it from the database.
-        if zipfileupload==None:
-            zu = ZipFileUpload.objects.all()[0]
-        else:
-            zu = zipfileupload
-        from signals import dirlist, zipnamelist
-        if zu.folder == None:
+        self.assertEqual(ZipFileUpload.objects.all().count(), 0)
+        from signals import dirlist, zipnamelist, expand_pathlist
+        if folderobj == None:
             folderpath = settings.BIXFILE_UPLOAD
         else:
-            folderpath = zu.folder.path
-        zipfilepath = os.path.join(settings.ZIP_UPLOAD, zu.upload.name)
-        namelist = zipnamelist(zipfilepath)
-        # Add folder data member from ZipFileUpload object
-        dlist1 = list(set([os.path.join(folderpath, name) for name in dirlist(namelist)]))
-        from signals import expand_pathlist
-        if zu.folder != None:
-        # Add folder data member from ZipFileUpload object
-            e = expand_pathlist([(zu.folder.path,'d')])
-            e.remove((settings.BIXFILE_UPLOAD, 'd'))
-            dlist1 = dlist1 + [d[0] for d in e]
+            folderpath = folderobj.path
+        namelist = zipnamelist(zf)
+        dlist1 = [os.path.join(folderpath, d) for d in dirlist(namelist)]
+        if folderobj:
+            dlist1 = [folderobj.path]  + dlist1
         # Check folder upload paths in the zip file archive with that
         # of the FolderUpload objects.
         dlist2 = [obj.path for obj in FolderUpload.objects.all()]
         for path1, path2 in zip(pathlist1, pathlist2):
             self.assertEqual(path1, path2)
         import zipfile
-        z = zipfile.ZipFile(zipfilepath)
+        z = zipfile.ZipFile(zf)
         # Check the file contents in the zip file archive with those
         # files actually uploaded.
         for name, path in zip(namelist, pathlist2):
-           self.assertEqual(z.read(name[0]), open(path).read())
+            self.assertEqual(z.read(name[0]), open(path).read())
         # Check the directories in the FolderUpload objects have
         # actually been created
-
         # TODO
 
     def testZipFileUploadForm(self):
-        zf, zu = self.addTestZipFile()
-        self.checkZipFileUpload(zf, zu)
+        zf = self.addTestZipFile()
+        self.checkZipFileUpload(zf)
 
     def testZipFileUploadtoFolderForm(self):
-        zf, zu = self.addTestZipFileToFolder()
-        self.checkZipFileUpload(zf, zu)
+        zf, fu = self.addTestZipFileToFolder()
+        self.checkZipFileUpload(zf, fu)
 
-    def CheckZipFileUploadFormTwice(self):
+    def CheckZipFileUploadFormTwice(self, zf1, zf2):
         from utils import unique
         from signals import zipnamelist, dirlist
-        from models import ZipFileUpload
-        zfa = ZipFileUpload.objects.all()
-        zu1, zu2 = zfa[0], zfa[1]
         # File and directory lists from zip files.
-        namelist1 = zipnamelist(os.path.join(settings.ZIP_UPLOAD, zu1.upload.name))
-        namelist2 = zipnamelist(os.path.join(settings.ZIP_UPLOAD, zu2.upload.name))
+        namelist1 = zipnamelist(zf1)
+        namelist2 = zipnamelist(zf2)
         # Select sublist of directories, adding intermediate
         # directories as necessary.
         dirlist1 = dirlist(namelist1)
         filelist2 = [n[0] for n in list2 if n[1]=='f']
         # Intersection of elements in list1 and list2.
         list_int = list(set([n[0] for n in list1]).intersection(set([n[0] for n in list2])))
-
         # Check if paths in list2 collide with paths in list1.
         if list_int:
             dirlist_int = []
         self.assertEqual(clean(dirlist), clean(dirlist_from_folderupload))
 
     def testZipFileUploadFormTwice1(self):
-        zf1, zu1 = self.addTestZipFile(makezip1)
-        zf2, zu2 = self.addTestZipFile(makezip2)
-        self.CheckZipFileUploadFormTwice()
+        zf1 = self.addTestZipFile(makezip1)
+        zf2 = self.addTestZipFile(makezip2)
+        self.CheckZipFileUploadFormTwice(zf1, zf2)
 
     def testZipFileUploadFormTwice2(self):
-        zf1, zu1 = self.addTestZipFile(makezip2)
-        zf2, zu2 = self.addTestZipFile(makezip1)
-        self.CheckZipFileUploadFormTwice()
+        zf1 = self.addTestZipFile(makezip2)
+        zf2 = self.addTestZipFile(makezip1)
+        self.CheckZipFileUploadFormTwice(zf1, zf2)
 
     def testZipFileUploadFormSameTwice(self):
         zf = makezip1()
         uf = SimpleUploadedFile(filename, zf.read())
         file_data = {'upload': uf}
         ff1 = ZipFileUploadForm(data, file_data)
-        zu1 = ff1.save()
+        ff1.save()
         ff2 = ZipFileUploadForm(data, file_data)
-        zu2 = ff2.save()
-        self.CheckZipFileUploadFormTwice()
+        ff2.save()
+        self.CheckZipFileUploadFormTwice(zf, zf)
 
     def ZipFileUploadView(self, makezipname, filename):
         import commands
         zipfilename = zf.name
         zipfiledescription = request.POST['description']
         self.assertEqual("from zip file %s (%s)"%(zipfilename, zipfiledescription) in redirect_response._container[0], True)
-        zu = ZipFileUpload.objects.all().filter(id=1)[0]
-        zipfilename = os.path.basename(zu.upload.name)
-        self.assertEqual(zipfilename in redirect_response._container[0], True)
-        self.checkZipFileUpload(filename, zu)
+        self.checkZipFileUpload(zf)
 
     def testZipFileUploadView(self):
         self.ZipFileUploadView(makezip1, 'testZipFileUploadView.zip')
         redirect_response = self.c.get(reverse('top_folders'))
         self.assertEqual(redirect_response.status_code, 200)
         self.assertEqual("The zip file upload was created successfully" in redirect_response._container[0], True)
-        self.checkZipFileUpload(zf)
+        self.checkZipFileUpload(zf, pf)
 
     def testZipFileUploadInsideFolderUrl(self):
         self.ZipFileUploadInsideFolderUrl(makezip2, "foo2.zip")
     def testZipFileValidate(self):
         try:
             dummydir = os.path.join(settings.MEDIA_ROOT, settings.BIXFILE_UPLOAD, "dummydir")
-            zf, zu = self.addTestZipFile()
+            zf = self.addTestZipFile()
             if not os.path.isdir(dummydir):
                 os.mkdir(dummydir)
             response = self.c.post(reverse('validate'))
 
     def testMd5sum(self):
         from utils import  md5_for_file_object, md5_for_file_object_simple, md5_for_django_file_object, md5_for_django_file_object_simple
-        zf, zu = self.addTestZipFile(makezip5)
+        zf = self.addTestZipFile(makezip5)
         fu = FileUpload.objects.all()[0]
         filepath = os.path.join(settings.MEDIA_ROOT, fu.upload.name)
         self.assertEqual(md5_for_file_object(filepath, chunk_size=10), md5_for_file_object_simple(filepath))
         self.assertFalse(os.path.exists(os.path.join(settings.TMP_JUPLOAD, name)))
         self.assertEqual(len(FileJupload.objects.all()), 0)
 
-    def testZipFileUploadDelete(self):
-        zf, zu = self.addTestZipFile()
-        dirname = os.path.dirname(zu.upload.name)
-        self.assertTrue(os.path.exists(os.path.join(settings.ZIP_UPLOAD, dirname)))
-        self.assertEqual(len(ZipFileUpload.objects.all()), 1)
-        zu.delete()
-        self.assertFalse(os.path.exists(os.path.join(settings.ZIP_UPLOAD, dirname)))
-        self.assertEqual(len(ZipFileUpload.objects.all()), 0)
-        zf, zu = self.addTestZipFile()
-        dirname = os.path.dirname(zu.upload.name)
-        self.assertTrue(os.path.exists(os.path.join(settings.ZIP_UPLOAD, dirname)))
-        self.assertEqual(len(ZipFileUpload.objects.all()), 1)
-        ZipFileUpload.objects.all().delete()
-        self.assertFalse(os.path.exists(os.path.join(settings.ZIP_UPLOAD, dirname)))
-        self.assertEqual(len(ZipFileUpload.objects.all()), 0)
-
 class DBTest(unittest.TestCase):
     def setUp(self):
         self.c = Client()
     return hashlib.md5(content).hexdigest()
 
 def clean(pathlist):
-    return sorted([os.path.normpath(path) for path in pathlist])
+    return unique(sorted([os.path.normpath(path) for path in pathlist]))
 
 def walk(dirpath=None):
     "Files and directories under dirpath"