pep376 / pkgutil.py

# -*- coding: utf8 -*-
""" PEP 376
"""
from __future__ import with_statement
import os
from os.path import join, splitext, isdir
from os import listdir
from string import maketrans
import csv
import sys
import re
import threading

from distutils.dist  import  DistributionMetadata
from distutils.errors import DistutilsError

SEP_TRANS = maketrans('/', os.path.sep)
SPACE_TRANS = maketrans(' ', '.')
DASH_TRANS = maketrans('-', '_')


#
# Utilities
#

def egginfo_dirname(name, version):
    """Returns the egg-info directory name of a project.

    ``name`` is converted to a standard distribution name any runs of
    non-alphanumeric characters are replaced with a single '-'. ``version``
    is converted to a standard version string. Spaces become dots, and all other
    non-alphanumeric characters become dashes, with runs of multiple dashes
    condensed to a single dash. Both attributes are then converted into their
    filename-escaped form. Any '-' characters are currently replaced with '_'.
    """
    name = re.sub('[^A-Za-z0-9.]+', '_', name)
    version = version.translate(SPACE_TRANS)
    version = re.sub('[^A-Za-z0-9.]+', '_', version)
    return '%s-%s.egg-info' % (name.translate(DASH_TRANS),
                               version.translate(DASH_TRANS))

#
# distutils.dist.DistributionMetadata new version
#
class _DistributionMetadata(DistributionMetadata):
    """distutils.dist.DistributionMetadata new version

    That can load an existing PKG-INFO file
    """
    def __init__ (self, file=None):
        if file is not None:
            self.read_pkg_file(file)
        else:
            self.name = None
            self.version = None
            self.author = None
            self.author_email = None
            self.maintainer = None
            self.maintainer_email = None
            self.url = None
            self.license = None
            self.description = None
            self.long_description = None
            self.keywords = None
            self.platforms = None
            self.classifiers = None
            self.download_url = None
            # PEP 314
            self.provides = None
            self.requires = None
            self.obsoletes = None

    def read_pkg_file(self, file):
        """Reads from a PKG-INFO file object and initialize the instance.
        """
        if isinstance(file, str):
            file = open(file, 'rU')

        pkg_info = file.read()
        re_options = re.I|re.DOTALL|re.M

        def _extract(fieldname):
            if fieldname == 'Description':
                # crappy, need to be reworked
                pattern = r'^Description: (.*)'
                res = re.findall(pattern, pkg_info , re_options)
                if len(res) == 0:
                    return 'UNKNOWN'
                else:
                    res = res[0].split('\n' + 8*' ')
                    res = [r for r in res if not r.startswith('\n')]
                    return '\n'.join(res) + '\n'

            pattern = r'^%s: (.*?)$' % fieldname
            res = re.findall(pattern, pkg_info , re_options)
            if fieldname in ('Classifier', 'Requires', 'Provides',
                             'Obsolete'):
                return res
            if len(res) == 0:
                return 'UNKNOWN'
            return res[0]

        version = _extract('Metadata-Version')
        self.name = _extract('Name')
        self.version = _extract('Version')
        self.summary = _extract('Summary')
        self.url = _extract('Home-page')
        self.author = _extract('Author')
        self.author_email = _extract('Author-email')
        self.license = _extract('License')
        self.download_url = _extract('Download-URL')
        self.long_description = _extract('Description')
        self.keywords = _extract('Keywords').split(',')
        self.classifiers = _extract('Classifier')
        self.platform = _extract('Platform')

        # PEP 314
        if version == '1.1':
            self.requires = _extract('Requires')
            self.provides = _extract('Provides')
            self.obsoletes = _extract('Obsoletes')
        else:
            self.requires = None
            self.provides = None
            self.obsoletes = None


#
# function used to detect a PEP 376 .egg-info directory
#
def is_egginfo(path):
    """Returns True if `path` is an egg-info directory.

    Also makes sure it doesn't pick older versions by checking
    the presence of `RECORD` and `PKG-INFO`.
    """
    if not (splitext(path)[-1].lower() == '.egg-info' and isdir(path)):
        return False
    content = os.listdir(path)
    return 'PKG-INFO' in content and 'RECORD' in content


#
# Distribution class (with DistributionMetadata in it)
#
class Distribution(object):

    def __init__(self, path):
        self.info_path = path
        self.container = os.path.split(path)[0]
        self.pkg_info_path = join(path, 'PKG-INFO')
        self.record_path = join(path, 'RECORD')
        self.metadata = _DistributionMetadata(self.pkg_info_path)
        self.name = self.metadata.name
        self._files = None

    def __str__(self):
        return "Distribution('%s')" % self.name

    def _load_record(self):
        """Loads the RECORD file."""
        if self._files is None:
            self._files = self._read_record()

    def _read_record(self):
        """Reads RECORD."""
        files = []
        for row in csv.reader(open(self.record_path)):
            if row == []:
                continue
            location = row[0]
            md5 = len(row) > 1 and row[1] or None
            size = len(row) > 2 and row[2] or None
            files.append((location, md5, size ))
        return files

    def _local_path(self, path):
        """Transforms a '/'-separated path to an absolute path,
        using the local separator."""
        path = path.translate(SEP_TRANS)
        if path[0] == '/':
            return path
        return join(self.container, path)

    def get_installed_files(self, local=False):
        """Iterates over the RECORD entries.

        Returns a (location, md5, size) tuple.
        If local is True, translates the cross-platform path for each
        path into a local absolute path.
        """
        self._load_record()   # initializes self._files

        for location, md5, size  in self._files:
            if local:
                location = self._local_path(location)
            yield location, md5, size

    def get_egginfo_files(self, local=False):
        """Iterates over the list of files located in the `.egg-info`
        directory.

        If local is True, translates the cross-platform path for each
        path into a local absolute path.
        """
        self._load_record()  # initializes self._files

        for location, md5, size  in self._files:
            local_path = self._local_path(location)
            if not local_path.startswith(self.info_path):
                continue
            # the file is located under self.info_path
            if local:
                yield local_path
            else:
                yield location

    def uses(self, path):
        """Returns True if the path is listed in the RECORD file.

        e.g. if the project uses this file.
        """
        local = os.path.exists(path)
        for location, md5, size in self.get_installed_files(local):
            if location == path:
                return True
        return False

    def get_egginfo_file(self, path, binary=False):
        """Returns a file instance on the path.

        If binary is True, opens the file in binary mode.
        """
        path = path.translate(SEP_TRANS)
        if path[0] != '/':
            local_path = join(self.info_path, path)
        else:
            local_path = join(self.container, path)

            if not local_path.startswith(self.info_path):
                raise DistutilsError('%s is not located in %s' % \
                                     (path, self.info_path))
        return open(local_path, binary and 'rb' or 'r')

#
# DistributionDirectory represents a directory that contains egg-info files
#
class DistributionDirectory(set):

    def __init__(self, path):
        super(DistributionDirectory, self).__init__()
        self.path = path
        # filling the list once (see if it's the best way)
        # to minimize I/O
        for element in os.listdir(self.path):
            fullpath = join(self.path, element)
            if is_egginfo(fullpath):
                self.add(Distribution(fullpath))

    def __repr__(self):
        return 'DistributionDirectory("%s")' % self.path

    def add(self, dist):
        """Makes sure only Distribution instances are added."""
        if not isinstance(dist, Distribution):
            raise TypeError('DistributionDirectory manage only Distribution '
                            'instances')
        super(DistributionDirectory, self).add(dist)

    #
    # public APIs
    #
    def file_users(self, path):
        """Returns Distribution instances for the projects that uses `path`."""
        for dist in self:
            if dist.uses(path):
                yield dist

    def owner(self, path):
        """Returns the owner of `path`."""
        users = [dist for dist in self if dist.uses(path)]
        if len(users) == 1:
            return users[0]
        return None

#
# Directories is a collection of directories, initialized with a
# list of paths.
#

_CACHED_DIRS = {}

def purge_cache():
    _CACHED_DIRS.clear()


class DistributionDirectories(dict):

    def __init__(self, paths=None, use_cache=True):
        super(DistributionDirectories, self).__init__()
        if paths is not None:
            for path in paths:
                self.append(path)
        self.use_cache = use_cache

    def __setitem__(self, path, dir):
        """Controls the mapping deals only with path/DistributionDirectory"""
        if not isinstance(path, str):
            raise TypeError('The key needs to be a path')
        if not isinstance(dir, DistributionDirectory):
            raise TypeError('The value needs to be a DistributionDirectory '
                            'instance')
        super(DistributionDirectories, self).__setitem__(path, dir)

    #
    # public APIs
    #
    def append(self, path):
        """Appends a path and its corresponding DistributionDirectory."""
        if self.use_cache and path in _CACHED_DIRS:
            dist_dir = _CACHED_DIRS[path]
        else:
            dist_dir = DistributionDirectory(path)
            if self.use_cache:
                _CACHED_DIRS[path] = dist_dir

        self[path] = dist_dir

    def load(self, paths):
        """Loads the paths."""
        for path in paths:
            if path in self or not os.path.isdir(path):
                continue
            self.append(path)

    def reload(self):
        """Reload all the paths."""
        paths = self.keys()
        self.clear()
        self.load(paths)

    def get_distributions(self):
        """Returns an iterator over contained Distribution instances."""
        for directory in self.values():
            for dist in directory:
                yield dist

    def get_distribution(self, project_name):
        """Returns an Distribution instance for the given project name.

        If not found, returns None.
        """
        for directory in self.values():
            for dist in directory:
                if dist.name == project_name:
                    return dist

    def get_file_users(self, path):
        """Iterates over all projects to find out which project uses the file.

        Return Distribution instances.
        """
        for directory in self.values():
            for dist in directory.file_users(path):
                yield dist

#
# high level APIs with a global DistributionDirectories instance loaded
# on sys.path
#
_dist_dirs = DistributionDirectories()
_dist_dirs.load(sys.path)

def get_distributions():
    """Provides an iterator that returns Distribution instances.

    Looks for `.egg-info` directories in `sys.path` and returns Distribution
    instances for each one of them.
    """
    return _dist_dirs.get_distributions()

def get_distribution(name):
    """Returns a ``Distribution`` instance for ``name``.

    Scans all elements in `sys.path` and looks for all directories ending
    with `.egg-info`. Returns a ``Distribution`` instance corresponding to
    the `.egg-info` directory that contains a `PKG-INFO` that matches
    ``name`` for the ``name`` metadata.

    Notice that there should be at most one result. The first result
    founded will be returned. If the directory is not found, returns ``None``.
    """
    return _dist_dirs.get_distribution(project_name)

def get_file_users(path):
    """Iterates over all distributions to find out which distribution uses
    ``path``.

    ``path`` can be a local absolute path or a relative '/'-separated path.
    """
    return _dist_dirs.get_file_users(path)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.