Commits

Vinay Sajip  committed f965995

Added update functionality to wheels.

  • Participants
  • Parent commits 71d6024

Comments (0)

Files changed (2)

File distlib/wheel.py

 from .database import InstalledDistribution
 from .metadata import Metadata, METADATA_FILENAME
 from .util import (FileOperator, convert_path, CSVReader, CSVWriter, Cache,
-                   cached_property, get_cache_base, read_exports)
-
+                   cached_property, get_cache_base, read_exports, tempdir)
+from .version import NormalizedVersion, UnsupportedVersionError
 
 logger = logging.getLogger(__name__)
 
                         raise DistlibException('digest mismatch for '
                                                '%s' % arcname)
 
+    def update(self, modifier, dest_dir=None, **kwargs):
+        """
+        Update the contents of a wheel in a generic way. The modifier should
+        be a callable which expects a dictionary argument: its keys are
+        archive-entry paths, and its values are absolute filesystem paths
+        where the contents the corresponding archive entries can be found. The
+        modifier is free to change the contents of the files pointed to, add
+        new entries and remove entries, before returning. This method will
+        extract the entire contents of the wheel to a temporary location, call
+        the modifier, and then use the passed (and possibly updated)
+        dictionary to write a new wheel. If ``dest_dir`` is specified, the new
+        wheel is written there -- otherwise, the original wheel is overwritten.
+
+        The modifier should return True if it updated the wheel, else False.
+        This method returns the same value the modifier returns.
+        """
+
+        def get_version(path_map, info_dir):
+            version = path = None
+            key = '%s/%s' % (info_dir, METADATA_FILENAME)
+            if key not in path_map:
+                key = '%s/PKG-INFO' % info_dir
+            if key in path_map:
+                path = path_map[key]
+                version = Metadata(path=path).version
+            return version, path
+
+        def update_version(version, path):
+            updated = None
+            try:
+                v = NormalizedVersion(version)
+                i = version.find('-')
+                if i < 0:
+                    updated = '%s-1' % version
+                else:
+                    parts = [int(s) for s in version[i + 1:].split('.')]
+                    parts[-1] += 1
+                    updated = '%s-%s' % (version[:i],
+                                         '.'.join(str(i) for i in parts))
+            except UnsupportedVersionError:
+                logger.debug('Cannot update non-compliant (PEP-440) '
+                             'version %r', version)
+            if updated:
+                md = Metadata(path=path)
+                md.version = updated
+                legacy = not path.endswith(METADATA_FILENAME)
+                md.write(path=path, legacy=legacy)
+                logger.debug('Version updated from %r to %r', version,
+                             updated)
+
+        pathname = os.path.join(self.dirname, self.filename)
+        name_ver = '%s-%s' % (self.name, self.version)
+        info_dir = '%s.dist-info' % name_ver
+        record_name = posixpath.join(info_dir, 'RECORD')
+        with tempdir() as workdir:
+            with ZipFile(pathname, 'r') as zf:
+                path_map = {}
+                for zinfo in zf.infolist():
+                    arcname = zinfo.filename
+                    if isinstance(arcname, text_type):
+                        u_arcname = arcname
+                    else:
+                        u_arcname = arcname.decode('utf-8')
+                    if u_arcname == record_name:
+                        continue
+                    if '..' in u_arcname:
+                        raise DistlibException('invalid entry in '
+                                               'wheel: %r' % u_arcname)
+                    zf.extract(zinfo, workdir)
+                    path = os.path.join(workdir, convert_path(u_arcname))
+                    path_map[u_arcname] = path
+
+            # Remember the version.
+            original_version, _ = get_version(path_map, info_dir)
+            # Files extracted. Call the modifier.
+            modified = modifier(path_map, **kwargs)
+            if modified:
+                # Something changed - need to build a new wheel.
+                current_version, path = get_version(path_map, info_dir)
+                if current_version and (current_version == original_version):
+                    # Add or update local version to signify changes.
+                    update_version(current_version, path)
+                # Decide where the new wheel goes.
+                if dest_dir is None:
+                    fd, newpath = tempfile.mkstemp(suffix='.whl',
+                                                   prefix='wheel-update-',
+                                                   dir=workdir)
+                    os.close(fd)
+                else:
+                    if not os.path.isdir(dest_dir):
+                        raise DistlibException('Not a directory: %r' % dest_dir)
+                    newpath = os.path.join(dest_dir, self.filename)
+                archive_paths = list(path_map.items())
+                distinfo = os.path.join(workdir, info_dir)
+                info = distinfo, info_dir
+                self.write_records(info, workdir, archive_paths)
+                self.build_zip(newpath, archive_paths)
+                if dest_dir is None:
+                    shutil.copyfile(newpath, pathname)
+        return modified
+
 def compatible_tags():
     """
     Return (pyver, abi, arch) tuples compatible with this Python.

File tests/test_wheel.py

         md.write(path=mdpath)
         return True
 
+    def wheel_modifier_ver(self, path_map):
+        mdpath = path_map['dummy-0.1.dist-info/pydist.json']
+        md = Metadata(path=mdpath)
+        md.version = '0.1-123'
+        md.write(path=mdpath)
+        return True
+
+    def test_update(self):
+        workdir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, workdir)
+        fn = 'dummy-0.1-py27-none-any.whl'
+        sfn = os.path.join(HERE, fn)
+        dfn = os.path.join(workdir, fn)
+        shutil.copyfile(sfn, dfn)
+        mtime = os.stat(dfn).st_mtime
+        w = Wheel(dfn)
+        modified = w.update(self.wheel_modifier_nop)
+        self.assertFalse(modified)
+        self.assertEqual(mtime, os.stat(dfn).st_mtime)
+        modified = w.update(self.wheel_modifier)
+        self.assertTrue(modified)
+        self.assertLess(mtime, os.stat(dfn).st_mtime)
+        w = Wheel(dfn)
+        w.verify()
+        md = w.metadata
+        self.assertEqual(md.run_requires, [{'requires': ['numpy']}])
+        self.assertEquals(md.version, '0.1-1')
+
+        modified = w.update(self.wheel_modifier_ver)
+        self.assertTrue(modified)
+        self.assertLess(mtime, os.stat(dfn).st_mtime)
+        w = Wheel(dfn)
+        w.verify()
+        md = w.metadata
+        self.assertEqual(md.run_requires, [{'requires': ['numpy']}])
+        self.assertEquals(md.version, '0.1-123')
+
     def test_info(self):
         fn = os.path.join(HERE, 'dummy-0.1-py27-none-any.whl')
         w = Wheel(fn)