Commits

Paul Moore  committed 4442cc7

Added a new implementation, and a unittest-based test suite

  • Participants
  • Parent commits 1b6162b

Comments (0)

Files changed (2)

+# -*- coding: utf8 -*-
+""" PEP 376
+"""
+from __future__ import with_statement
+import os
+from os.path import join, splitext, isdir
+from os import listdir
+from string import maketrans
+import csv
+import sys
+import re
+import threading
+from zipfile import is_zipfile, ZipFile
+
+from distutils.dist  import  DistributionMetadata
+from distutils.errors import DistutilsError
+
+SEP_TRANS = maketrans('/', os.path.sep)
+SPACE_TRANS = maketrans(' ', '.')
+DASH_TRANS = maketrans('-', '_')
+
+
+#
+# Utilities
+#
+
+def metadata_dirname(name, version):
+    """Returns the metadata directory name of a project.
+
+    ``name`` is converted to a standard distribution name any runs of
+    non-alphanumeric characters are replaced with a single '-'. ``version``
+    is converted to a standard version string. Spaces become dots, and all other
+    non-alphanumeric characters become dashes, with runs of multiple dashes
+    condensed to a single dash. Both attributes are then converted into their
+    filename-escaped form. Any '-' characters are currently replaced with '_'.
+    """
+    name = re.sub('[^A-Za-z0-9.]+', '_', name)
+    version = version.translate(SPACE_TRANS)
+    version = re.sub('[^A-Za-z0-9.]+', '_', version)
+    return '%s-%s.egg-info' % (name.translate(DASH_TRANS),
+                               version.translate(DASH_TRANS))
+
+#
+# distutils.dist.DistributionMetadata new version
+#
+class _DistributionMetadata(DistributionMetadata):
+    """distutils.dist.DistributionMetadata new version
+
+    That can load an existing PKG-INFO file
+    """
+    def __init__ (self, pkg_info=None):
+        if pkg_info is not None:
+            self.read_pkg_info(pkg_info)
+        else:
+            self.name = None
+            self.version = None
+            self.author = None
+            self.author_email = None
+            self.maintainer = None
+            self.maintainer_email = None
+            self.url = None
+            self.license = None
+            self.description = None
+            self.long_description = None
+            self.keywords = None
+            self.platforms = None
+            self.classifiers = None
+            self.download_url = None
+            # PEP 314
+            self.provides = None
+            self.requires = None
+            self.obsoletes = None
+
+    def read_pkg_info(self, pkg_info):
+        """Reads from a PKG-INFO string and initialize the instance.
+        """
+        re_options = re.I|re.DOTALL|re.M
+
+        def _extract(fieldname):
+            if fieldname == 'Description':
+                # crappy, need to be reworked
+                pattern = r'^Description: (.*)'
+                res = re.findall(pattern, pkg_info , re_options)
+                if len(res) == 0:
+                    return 'UNKNOWN'
+                else:
+                    res = res[0].split('\n' + 8*' ')
+                    res = [r for r in res if not r.startswith('\n')]
+                    return '\n'.join(res) + '\n'
+
+            pattern = r'^%s: (.*?)$' % fieldname
+            res = re.findall(pattern, pkg_info , re_options)
+            if fieldname in ('Classifier', 'Requires', 'Provides',
+                             'Obsoletes'):
+                return res
+            if len(res) == 0:
+                return 'UNKNOWN'
+            return res[0]
+
+        version = _extract('Metadata-Version')
+        self.name = _extract('Name')
+        self.version = _extract('Version')
+        self.summary = _extract('Summary')
+        self.url = _extract('Home-page')
+        self.author = _extract('Author')
+        self.author_email = _extract('Author-email')
+        self.license = _extract('License')
+        self.download_url = _extract('Download-URL')
+        self.long_description = _extract('Description')
+        self.keywords = _extract('Keywords').split(',')
+        self.classifiers = _extract('Classifier')
+        self.platform = _extract('Platform')
+
+        # PEP 314
+        if version == '1.1':
+            self.requires = _extract('Requires')
+            self.provides = _extract('Provides')
+            self.obsoletes = _extract('Obsoletes')
+        else:
+            self.requires = None
+            self.provides = None
+            self.obsoletes = None
+
+
+#
+# function used to detect a PEP 376 metadata directory
+#
+def is_metadata_dir(path):
+    """Returns True if `path` is an metadata directory.
+
+    Also makes sure it doesn't pick older versions by checking
+    the presence of `RECORD` and `PKG-INFO`.
+    """
+    if not (splitext(path)[-1].lower() == '.egg-info' and isdir(path)):
+        return False
+    content = os.listdir(path)
+    return 'PKG-INFO' in content and 'RECORD' in content
+
+#
+# Distribution class (with DistributionMetadata in it)
+#
+class Distribution(object):
+    # Needs self.distname, self._local_path, self.get_metadata_file
+
+    def __init__(self):
+        self._metadata = None
+        self._files = None
+
+    @property
+    def name(self):
+        return self.metadata.name
+
+    @property
+    def metadata(self):
+        if not self._metadata:
+            pkginfo = self.get_metadata_file('PKG-INFO')
+            self._metadata = _DistributionMetadata(pkginfo.read())
+            pkginfo.close()
+        return self._metadata
+
+    def __str__(self):
+        return "%s('%s')" % (self.__class__.__name__, self.name)
+
+    @property
+    def files(self):
+        """Reads RECORD."""
+        if self._files is None:
+            files = []
+            record = self.get_metadata_file('RECORD')
+            for row in csv.reader(record):
+                if row == []:
+                    continue
+                location = row[0]
+                md5 = len(row) > 1 and row[1] or None
+                size = len(row) > 2 and row[2] or None
+                files.append((location, md5, size ))
+            self._files = files
+        return self._files
+
+    def get_metadata_files(self):
+        """Iterates over the list of files located in the `.egg-info`
+        directory.
+
+        If local is True, translates the cross-platform path for each
+        path into a local absolute path.
+        """
+        for location, md5, size  in self.files:
+            prefix, sep, name = location.partition('/')
+            if prefix == self.distname and sep == '/':
+                yield name
+
+    def get_installed_files(self):
+        """Iterates over the RECORD entries.
+
+        Returns a (location, md5, size) tuple.
+        If local is True, translates the cross-platform path for each
+        path into a local absolute path.
+        """
+        for location, md5, size  in self.files:
+            location = self._local_path(location)
+            yield location, md5, size
+
+    def uses(self, path):
+        """Returns True if the path is listed in the RECORD file.
+
+        e.g. if the project uses this file.
+        """
+        for location, md5, size in self.get_installed_files():
+            # Case sensitivity?
+            if location == path:
+                return True
+        return False
+
+
+class FilesystemDistribution(Distribution):
+    def __init__(self, container, name):
+        super(FilesystemDistribution, self).__init__()
+        # Container is the sys.path entry containing this distribution
+        # Name is the distribution name (full egginfo filename)
+        self.container = container
+        self.distname = name
+
+    def _local_path(self, path):
+        """Transforms a '/'-separated path to an absolute path,
+        using the local separator."""
+        # Does not handle absolue paths!
+        path = path.split('/')
+        return join(self.container, *path)
+
+    def get_metadata_file(self, path, binary=False):
+        """Returns a file instance on the path.
+
+        If binary is True, opens the file in binary mode.
+        """
+        path = path.split('/')
+        local_path = join(self.container, self.distname, *path)
+        return open(local_path, binary and 'rb' or 'r')
+
+class ZippedDistribution(Distribution):
+
+    def __init__(self, zipfile, name):
+        super(ZippedDistribution, self).__init__()
+        self.zipfile = zipfile
+        self.container = self.zipfile.filename
+        self.distname = name
+
+    def _local_path(self, path):
+        path = path.split('/')
+        # Does not handle absolue paths!
+        return os.path.join(self.container, *path)
+
+    def get_metadata_file(self, path, binary=False):
+        """Returns a file instance on the path.
+
+        If binary is True, opens the file in binary mode.
+        """
+        path = self.distname + '/' + path
+        return self.zipfile.open(path, binary and 'r' or 'rU')
+
+# PEP 302 support routines
+
+class FSFinder(object):
+    """A dummy finder for filesystem paths.
+
+    This finder only implements the metadata search APIs,
+        list_distributions()
+        get_metadata()
+    """
+    def __init__(self, path):
+        self.path = path
+        self.distributions = {}
+        for name in os.listdir(self.path):
+            if is_metadata_dir(os.path.join(self.path, name)):
+                dist = name.split('-',1)[0]
+                self.distributions[dist] = name
+    def list_distributions(self):
+        for dist in self.distributions:
+            yield dist
+    def get_metadata(self, dist):
+        if dist not in self.distributions:
+            return None
+        return FilesystemDistribution(self.path, self.distributions[dist])
+
+import zipimport
+class ZipFinder(zipimport.zipimporter):
+    """A finder for zipfile paths.
+
+    This finder extends zipimporter with the metadata search APIs,
+        list_distributions()
+        get_metadata()
+    """
+    def __init__(self, path):
+        super(ZipFinder, self).__init__(path)
+        self.path = path
+        self.distributions = {}
+        self.zip = ZipFile(self.path)
+        for name in self.zip.namelist():
+            head, sep, _ = name.partition('/')
+            if sep and head.endswith('egg-info'):
+                dist, sep, _ = head.partition('-')
+                self.distributions[dist] = head
+    def list_distributions(self):
+        for dist in self.distributions:
+            yield dist
+    def get_metadata(self, dist):
+        if dist not in self.distributions:
+            return None
+        return ZippedDistribution(self.zip, self.distributions[dist])
+
+def all_finders():
+    for finder in sys.meta_path:
+        yield finder
+    for elem in sys.path:
+        finder = sys.path_importer_cache.get(elem, None)
+        if finder:
+            yield finder
+        else:
+            for hook in sys.path_hooks:
+                try:
+                    finder = hook(elem)
+                except ImportError:
+                    continue
+                sys.path_importer_cache[elem] = finder
+                yield finder
+                break
+            else:
+                yield FSFinder(elem)
+
+#
+# high level APIs
+#
+
+def get_distributions():
+    """Provides an iterator that returns Distribution instances.
+
+    Looks for `.egg-info` directories in `sys.path` and returns Distribution
+    instances for each one of them.
+    """
+    for finder in all_finders():
+        if not hasattr(finder, 'list_distributions'):
+            continue
+        if not hasattr(finder, 'get_metadata'):
+            continue
+
+        dists = finder.list_distributions()
+
+        for dist in dists:
+            yield finder.get_metadata(dist)
+
+def get_distribution(name):
+    """Returns a ``Distribution`` instance for ``name``.
+
+    Scans all elements in `sys.path` and looks for all directories ending
+    with `.egg-info`. Returns a ``Distribution`` instance corresponding to
+    the `.egg-info` directory that contains a `PKG-INFO` that matches
+    ``name`` for the ``name`` metadata.
+
+    Notice that there should be at most one result. The first result
+    founded will be returned. If the directory is not found, returns ``None``.
+    """
+    for d in get_distributions():
+        # Case sensitivity?
+        if d.name == name:
+            return d
+    return None
+
+def get_file_users(path):
+    """Iterates over all distributions to find out which distribution uses
+    ``path``.
+
+    ``path`` can be a local absolute path or a relative '/'-separated path.
+    """
+    for d in get_distributions():
+        if d.uses(path):
+            yield d
+

File test_pep376.py

+import unittest
+import pkgutil2
+from pkgutil2 import *
+import sys
+import os
+
+SITE_PACKAGES = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'site-packages')
+
+class Helpers:
+    def assertIn(self, container, elem):
+        self.assertTrue(container.__contains__(elem),
+                '%r not in %r' % (elem, container))
+    def abspath(self, rel):
+        parts = rel.split('/')
+        return os.path.abspath(os.path.join(self.container, *parts))
+
+class Distributions:
+    def test_list_distributions(self):
+        projects = list(get_distributions())
+        self.assertEqual(len(projects), 2)
+        names = [p.name for p in projects]
+        self.assertIn(names, 'mercurial')
+        self.assertIn(names, 'processing')
+    def test_get_distribution(self):
+        self.assertEqual(get_distribution('unknown'), None)
+        self.assertEqual(get_distribution('mercurial').name, 'mercurial')
+        self.assertEqual(get_distribution('processing').name, 'processing')
+    def test_get_file_users(self):
+        path = self.abspath('processing/__init__.py')
+        users = list(get_file_users(path))
+        self.assertEqual(len(users), 1)
+        self.assertEqual(users[0].name, 'processing')
+        # Note: mercurial/filelog.py is in both RECORD files!
+        path = self.abspath('mercurial/filelog.py')
+        users = list(get_file_users(path))
+        self.assertEqual(len(users), 2)
+        self.assertEqual(sorted(u.name for u in users),
+                ['mercurial', 'processing'])
+    def test_distribution_metadata_10(self):
+        mercurial = get_distribution('mercurial')
+        self.assertEqual(mercurial.name, 'mercurial')
+        self.assertEqual(mercurial.metadata.version, '1.0.1')
+        self.assertEqual(mercurial.metadata.license, 'GNU GPL')
+    def test_distribution_metadata_11(self):
+        processing = get_distribution('processing')
+        self.assertEqual(processing.name, 'processing')
+        self.assertEqual(processing.metadata.version, '0.52')
+        self.assertEqual(processing.metadata.author, 'R Oudkerk')
+        self.assertEqual(len(processing.metadata.provides), 2)
+        self.assertEqual(len(processing.metadata.requires), 2)
+        self.assertEqual(len(processing.metadata.obsoletes), 3)
+    def test_distribution_metadata_files(self):
+        mercurial = get_distribution('mercurial')
+        # Note - deliberate discrepancy - the file is called PKG-INFO, but the
+        # RECORD file holds the name PKG_INFO - we should see what is in the
+        # RECORD file, not the actual file name!!!
+        self.assertEqual(sorted(mercurial.get_metadata_files()),
+                ['PKG_INFO', 'RECORD'])
+
+class TestFilesystem(Distributions, Helpers, unittest.TestCase):
+    container = SITE_PACKAGES
+    def setUp(self):
+        self.path = sys.path
+        sys.path = [ self.container ]
+    def tearDown(self):
+        sys.path = self.path
+
+class TestZipfile(Distributions, Helpers, unittest.TestCase):
+    container = SITE_PACKAGES + '.zip'
+    def setUp(self):
+        self.path = sys.path
+        self.path_hooks = sys.path_hooks
+        self.path_importer_cache = sys.path_importer_cache
+        sys.path = [ self.container ]
+        sys.path_hooks = [ pkgutil2.ZipFinder ]
+        sys.path_importer_cache = {}
+    def tearDown(self):
+        sys.path = self.path
+        sys.path_hooks = self.path_hooks
+        sys.path_importer_cache = self.path_importer_cache
+
+class TestMetadata(unittest.TestCase):
+    def test_metadata_dirname(self):
+        self.assertEqual(metadata_dirname('zlib', '2.5.2'), 'zlib-2.5.2.egg-info')
+        self.assertEqual(metadata_dirname('python-ldap', '2.5'), 'python_ldap-2.5.egg-info')
+        self.assertEqual(metadata_dirname('python-ldap', '2.5 a---5'), 'python_ldap-2.5.a_5.egg-info')
+
+    def test_null_metadata(self):
+        m = pkgutil2._DistributionMetadata()
+        self.assertEqual(m.name, None)
+
+if __name__ == '__main__':
+    unittest.main()