pep376 / pkgutil2.py

# -*- coding: utf8 -*-
""" PEP 376
"""
from __future__ import with_statement
import os
from os.path import join, splitext, isdir
from os import listdir
from string import maketrans
import csv
import sys
import re
import threading
from zipfile import is_zipfile, ZipFile

from distutils.dist  import  DistributionMetadata
from distutils.errors import DistutilsError

SEP_TRANS = maketrans('/', os.path.sep)
SPACE_TRANS = maketrans(' ', '.')
DASH_TRANS = maketrans('-', '_')


#
# Utilities
#

def metadata_dirname(name, version):
    """Returns the metadata directory name of a project.

    ``name`` is converted to a standard distribution name any runs of
    non-alphanumeric characters are replaced with a single '-'. ``version``
    is converted to a standard version string. Spaces become dots, and all other
    non-alphanumeric characters become dashes, with runs of multiple dashes
    condensed to a single dash. Both attributes are then converted into their
    filename-escaped form. Any '-' characters are currently replaced with '_'.
    """
    name = re.sub('[^A-Za-z0-9.]+', '_', name)
    version = version.translate(SPACE_TRANS)
    version = re.sub('[^A-Za-z0-9.]+', '_', version)
    return '%s-%s.egg-info' % (name.translate(DASH_TRANS),
                               version.translate(DASH_TRANS))

#
# distutils.dist.DistributionMetadata new version
#
class _DistributionMetadata(DistributionMetadata):
    """distutils.dist.DistributionMetadata new version

    That can load an existing PKG-INFO file
    """
    def __init__ (self, pkg_info=None):
        if pkg_info is not None:
            self.read_pkg_info(pkg_info)
        else:
            self.name = None
            self.version = None
            self.author = None
            self.author_email = None
            self.maintainer = None
            self.maintainer_email = None
            self.url = None
            self.license = None
            self.description = None
            self.long_description = None
            self.keywords = None
            self.platforms = None
            self.classifiers = None
            self.download_url = None
            # PEP 314
            self.provides = None
            self.requires = None
            self.obsoletes = None

    def read_pkg_info(self, pkg_info):
        """Reads from a PKG-INFO string and initialize the instance.
        """
        re_options = re.I|re.DOTALL|re.M

        def _extract(fieldname):
            if fieldname == 'Description':
                # crappy, need to be reworked
                pattern = r'^Description: (.*)'
                res = re.findall(pattern, pkg_info , re_options)
                if len(res) == 0:
                    return 'UNKNOWN'
                else:
                    res = res[0].split('\n' + 8*' ')
                    res = [r for r in res if not r.startswith('\n')]
                    return '\n'.join(res) + '\n'

            pattern = r'^%s: (.*?)$' % fieldname
            res = re.findall(pattern, pkg_info , re_options)
            if fieldname in ('Classifier', 'Requires', 'Provides',
                             'Obsoletes'):
                return res
            if len(res) == 0:
                return 'UNKNOWN'
            return res[0]

        version = _extract('Metadata-Version')
        self.name = _extract('Name')
        self.version = _extract('Version')
        self.summary = _extract('Summary')
        self.url = _extract('Home-page')
        self.author = _extract('Author')
        self.author_email = _extract('Author-email')
        self.license = _extract('License')
        self.download_url = _extract('Download-URL')
        self.long_description = _extract('Description')
        self.keywords = _extract('Keywords').split(',')
        self.classifiers = _extract('Classifier')
        self.platform = _extract('Platform')

        # PEP 314
        if version == '1.1':
            self.requires = _extract('Requires')
            self.provides = _extract('Provides')
            self.obsoletes = _extract('Obsoletes')
        else:
            self.requires = None
            self.provides = None
            self.obsoletes = None


#
# function used to detect a PEP 376 metadata directory
#
def is_metadata_dir(path):
    """Returns True if `path` is an metadata directory.

    Also makes sure it doesn't pick older versions by checking
    the presence of `RECORD` and `PKG-INFO`.
    """
    if not (splitext(path)[-1].lower() == '.egg-info' and isdir(path)):
        return False
    content = os.listdir(path)
    return 'PKG-INFO' in content and 'RECORD' in content

#
# Distribution class (with DistributionMetadata in it)
#
class Distribution(object):
    # Needs self.distname, self._local_path, self.get_metadata_file

    def __init__(self):
        self._metadata = None
        self._files = None

    @property
    def name(self):
        return self.metadata.name

    @property
    def metadata(self):
        if not self._metadata:
            pkginfo = self.get_metadata_file('PKG-INFO')
            self._metadata = _DistributionMetadata(pkginfo.read())
            pkginfo.close()
        return self._metadata

    def __str__(self):
        return "%s('%s')" % (self.__class__.__name__, self.name)

    @property
    def files(self):
        """Reads RECORD."""
        if self._files is None:
            files = []
            record = self.get_metadata_file('RECORD')
            for row in csv.reader(record):
                if row == []:
                    continue
                location = row[0]
                md5 = len(row) > 1 and row[1] or None
                size = len(row) > 2 and row[2] or None
                files.append((location, md5, size ))
            self._files = files
        return self._files

    def get_metadata_files(self):
        """Iterates over the list of files located in the `.egg-info`
        directory.

        If local is True, translates the cross-platform path for each
        path into a local absolute path.
        """
        for location, md5, size  in self.files:
            prefix, sep, name = location.partition('/')
            if prefix == self.distname and sep == '/':
                yield name

    def get_installed_files(self):
        """Iterates over the RECORD entries.

        Returns a (location, md5, size) tuple.
        If local is True, translates the cross-platform path for each
        path into a local absolute path.
        """
        for location, md5, size  in self.files:
            location = self._local_path(location)
            yield location, md5, size

    def uses(self, path):
        """Returns True if the path is listed in the RECORD file.

        e.g. if the project uses this file.
        """
        for location, md5, size in self.get_installed_files():
            # Case sensitivity?
            if location == path:
                return True
        return False


class FilesystemDistribution(Distribution):
    def __init__(self, container, name):
        super(FilesystemDistribution, self).__init__()
        # Container is the sys.path entry containing this distribution
        # Name is the distribution name (full egginfo filename)
        self.container = container
        self.distname = name

    def _local_path(self, path):
        """Transforms a '/'-separated path to an absolute path,
        using the local separator."""
        # Does not handle absolue paths!
        path = path.split('/')
        return join(self.container, *path)

    def get_metadata_file(self, path, binary=False):
        """Returns a file instance on the path.

        If binary is True, opens the file in binary mode.
        """
        path = path.split('/')
        local_path = join(self.container, self.distname, *path)
        return open(local_path, binary and 'rb' or 'r')

class ZippedDistribution(Distribution):

    def __init__(self, zipfile, name):
        super(ZippedDistribution, self).__init__()
        self.zipfile = zipfile
        self.container = self.zipfile.filename
        self.distname = name

    def _local_path(self, path):
        path = path.split('/')
        # Does not handle absolue paths!
        return os.path.join(self.container, *path)

    def get_metadata_file(self, path, binary=False):
        """Returns a file instance on the path.

        If binary is True, opens the file in binary mode.
        """
        path = self.distname + '/' + path
        return self.zipfile.open(path, binary and 'r' or 'rU')

# PEP 302 support routines

class FSFinder(object):
    """A dummy finder for filesystem paths.

    This finder only implements the metadata search APIs,
        list_distributions()
        get_metadata()
    """
    def __init__(self, path):
        self.path = path
        self.distributions = {}
        for name in os.listdir(self.path):
            if is_metadata_dir(os.path.join(self.path, name)):
                dist = name.split('-',1)[0]
                self.distributions[dist] = name
    def list_distributions(self):
        for dist in self.distributions:
            yield dist
    def get_metadata(self, dist):
        if dist not in self.distributions:
            return None
        return FilesystemDistribution(self.path, self.distributions[dist])

import zipimport
class ZipFinder(zipimport.zipimporter):
    """A finder for zipfile paths.

    This finder extends zipimporter with the metadata search APIs,
        list_distributions()
        get_metadata()
    """
    def __init__(self, path):
        super(ZipFinder, self).__init__(path)
        self.path = path
        self.distributions = {}
        self.zip = ZipFile(self.path)
        for name in self.zip.namelist():
            head, sep, _ = name.partition('/')
            if sep and head.endswith('egg-info'):
                dist, sep, _ = head.partition('-')
                self.distributions[dist] = head
    def list_distributions(self):
        for dist in self.distributions:
            yield dist
    def get_metadata(self, dist):
        if dist not in self.distributions:
            return None
        return ZippedDistribution(self.zip, self.distributions[dist])

def all_finders():
    for finder in sys.meta_path:
        yield finder
    for elem in sys.path:
        finder = sys.path_importer_cache.get(elem, None)
        if finder:
            yield finder
        else:
            for hook in sys.path_hooks:
                try:
                    finder = hook(elem)
                except ImportError:
                    continue
                sys.path_importer_cache[elem] = finder
                yield finder
                break
            else:
                yield FSFinder(elem)

#
# high level APIs
#

def get_distributions():
    """Provides an iterator that returns Distribution instances.

    Looks for `.egg-info` directories in `sys.path` and returns Distribution
    instances for each one of them.
    """
    for finder in all_finders():
        if not hasattr(finder, 'list_distributions'):
            continue
        if not hasattr(finder, 'get_metadata'):
            continue

        dists = finder.list_distributions()

        for dist in dists:
            yield finder.get_metadata(dist)

def get_distribution(name):
    """Returns a ``Distribution`` instance for ``name``.

    Scans all elements in `sys.path` and looks for all directories ending
    with `.egg-info`. Returns a ``Distribution`` instance corresponding to
    the `.egg-info` directory that contains a `PKG-INFO` that matches
    ``name`` for the ``name`` metadata.

    Notice that there should be at most one result. The first result
    founded will be returned. If the directory is not found, returns ``None``.
    """
    for d in get_distributions():
        # Case sensitivity?
        if d.name == name:
            return d
    return None

def get_file_users(path):
    """Iterates over all distributions to find out which distribution uses
    ``path``.

    ``path`` can be a local absolute path or a relative '/'-separated path.
    """
    for d in get_distributions():
        if d.uses(path):
            yield d
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.