Commits

holger krekel committed 4999b00

introduce a stage.get_doczip method so we can get back docfiles

Comments (0)

Files changed (2)

server/devpi_server/db.py

             return entry
 
     def store_doczip(self, name, content):
+        """ unzip doc content for the specified "name" project. """
         assert content
         key = self.keyfs.STAGEDOCS(user=self.user, index=self.index, name=name)
 
-        # XXX collission checking
-        #
-        unzipfile = py.std.zipfile.ZipFile(py.io.BytesIO(content))
-
-        # XXX locking?
+        # XXX locking (unzipping could happen concurrently in theory)
         tempdir = self.keyfs.mkdtemp(name)
-        members = unzipfile.namelist()
-        for name in members:
-            fpath = tempdir.join(name, abs=True)
-            if not fpath.relto(tempdir):
-                raise ValueError("invalid path name:" + name)
-            if name.endswith(os.sep):
-                fpath.ensure(dir=1)
-            else:
-                fpath.dirpath().ensure(dir=1)
-                with fpath.open("wb") as f:
-                    f.write(unzipfile.read(name))
+        unzip_to_dir(content, tempdir)
         keypath = key.filepath
         if keypath.check():
             old = keypath.new(basename="old-" + keypath.basename)
             tempdir.move(keypath)
         return keypath
 
+    def get_doczip(self, name):
+        """ get zip file content of the current docs or None. """
+        key = self.keyfs.STAGEDOCS(user=self.user, index=self.index, name=name)
+        basedir = key.filepath
+        if not basedir.check():
+            return
+        content = create_zipfile(basedir)
+        return content
+
+
+def create_zipfile(basedir):
+    f = py.io.BytesIO()
+    zip = py.std.zipfile.ZipFile(f, "w")
+    _writezip(zip, basedir)
+    zip.close()
+    return f.getvalue()
+
+def _writezip(zip, basedir):
+    for p in basedir.visit():
+        if p.check(dir=1):
+            if not p.listdir():
+                path = p.relto(basedir) + "/"
+                zipinfo = py.std.zipfile.ZipInfo(path)
+                zip.writestr(zipinfo, "")
+        else:
+            path = p.relto(basedir)
+            zip.writestr(path, p.read("rb"))
+
+def unzip_to_dir(content, basedir):
+    unzipfile = py.std.zipfile.ZipFile(py.io.BytesIO(content))
+    members = unzipfile.namelist()
+    for name in members:
+        fpath = basedir.join(name, abs=True)
+        if not fpath.relto(basedir):
+            raise ValueError("invalid path name:" + name)
+        if name.endswith(os.sep):
+            fpath.ensure(dir=1)
+        else:
+            fpath.dirpath().ensure(dir=1)
+            with fpath.open("wb") as f:
+                f.write(unzipfile.read(name))

server/test_devpi_server/test_db.py

 import os
 import pytest
 
+from devpi_server.db import unzip_to_dir
+
 
 @pytest.fixture(params=[(), ("root/pypi",)])
 def bases(request):
         assert filepath.join("_static").check(dir=1)
         assert filepath.join("_templ", "x.css").check(file=1)
 
+    def test_getdoczip(self, stage, bases, tmpdir):
+        assert not stage.get_doczip("pkg1")
+        content = create_zipfile({"index.html": "<html/>",
+            "_static": {}, "_templ": {"x.css": ""}})
+        filepath = stage.store_doczip("pkg1", content)
+        doczip_content = stage.get_doczip("pkg1")
+        assert doczip_content
+        unzip_to_dir(doczip_content, tmpdir)
+        assert tmpdir.join("index.html").read() == "<html/>"
+        assert tmpdir.join("_static").check(dir=1)
+        assert tmpdir.join("_templ", "x.css").check(file=1)
+
     def test_storedoczipfile(self, stage, bases):
         content = create_zipfile({"index.html": "<html/>",
             "_static": {}, "_templ": {"x.css": ""}})