Commits

Anonymous committed 88d5dd4

now using directories

  • Participants
  • Parent commits 491c98c
  • Branches directory

Comments (0)

Files changed (2)

 # -*- coding: utf8 -*-
 """ PEP 376
 """
+from __future__ import with_statement
 import os
 from os.path import join, splitext, isdir
 from os import listdir
             self.obsoletes = None
 
 
+#
+# function used to detect a PEP 376 .egg-info directory
+#
+def is_egg_info(path):
+    """Returns True if `path` is an egg-info directory.
+
+    Also makes sure it doesn't pick older versions by checking
+    the presence of `RECORD` and `PKG-INFO`.
+    """
+    if not (splitext(path)[-1].lower() == '.egg-info' and isdir(path)):
+        return False
+    content = os.listdir(path)
+    return 'PKG-INFO' in content and 'RECORD' in content
+
+
 SEP_TRANS = maketrans('/', os.path.sep)
 
 #
                 return True
         return False
 
-    def owns(self, path):
-        """Returns True is the path is listed in the RECORD file and nowhere
-        else.
-
-        e.g. if the project uses this file.
-        """
-        if not self.uses(path):
-            return False
-
-        for egg_info in get_egg_infos():
-            if egg_info is not self and egg_info.uses(path):
-                return False
-
-        return True
-
     def get_file(self, path, binary=False):
         """Returns a file instance on the path.
 
         return open(fullpath, binary and 'rb' or 'r')
 
 #
-# cache managment
+# Class for I/O caching
 #
 
-_EGG_INFO_CACHE = {}
-_CACHE_ENABLED = True
+class CacheException(Exception):
+    pass
 
-def purge_cache():
-    global _EGG_INFO_CACHE
-    _EGG_INFO_CACHE = {}
+class Cache(dict):
+    """Regular dict with state flags.
 
-def enable_cache():
-    global _CACHE_ENABLED
-    _CACHE_ENABLED = True
+      - completed : the cache is fully completed
+      - enabled: the cache is enabled/disabled
+    """
+    def __init__(self, *args, **kwargs):
+        self.completed = False
+        self.enabled = True
+        self._loading = False
+        super(Cache, self).__init__(*args, **kwargs)
 
-def disable_cache():
-    global _CACHE_ENABLED
-    _CACHE_ENABLED = True
+    def enable(self):
+        self.enabled = True
 
-def cache_enabled():
-    return _CACHE_ENABLED
+    def disable(self):
+        if self._loading:
+            raise CacheException('The cache is being loaded.')
+        self.enabled = False
+        self.clear()
+
+    def clear(self):
+        if self._loading:
+            raise CacheException('The cache is being loaded.')
+        self.completed = False
+        super(Cache).clear()
+
+    def __enter__(self):
+        self._loading = True
+        return self
+
+    def __exit__(self, *args):
+        self._loading = False
 
 #
-# .egg-info finders
+# Directory represents a directory that contains egg-info files
 #
+class Directory(object):
 
-def is_egg_info(path):
-    """Returns True if `path` is an egg-info directory.
+    def __init__(self, path, cache_enabled=True):
+        self.path = path
+        self._cache = Cache()
+        if not cache_enabled:
+            self._cache.disable()
 
-    Also makes sure it doesn't pick older versions by checking
-    the presence of `RECORD` and `PKG-INFO`.
-    """
-    if not (splitext(path)[-1].lower() == '.egg-info' and isdir(path)):
-        return False
-    content = os.listdir(path)
-    return 'PKG-INFO' in content and 'RECORD' in content
+    #
+    # cache toggling
+    #
+    def _set_cache_enabled(self, value):
+        if value:
+            self._cache.enable()
+        else:
+            self._cache.disable()
 
-def egg_info_dirs(path):
-    """Returns the EGG-INFO directories found in `path`."""
-    if is_egg_info(path):
-        yield path
-    elif isdir(path):
-        for element in os.listdir(path):
-            fullpath = join(path, element)
-            if is_egg_info(fullpath):
-                yield fullpath
+    def _get_cache_enabled(self):
+        return self._cache.enabled
 
-def get_egg_infos():
+    cache_enabled = property(_get_cache_enabled, _set_cache_enabled)
+
+    def _cache_ready(self):
+        return self.cache_enabled and self._cache.completed
+
+    #
+    # public APIs
+    #
+    def egg_info_dirs(self):
+        """Returns the EGG-INFO directories."""
+        if self._cache_ready():
+            for egg_info in self._cache.values():
+                yield egg_info
+        else:
+            # revisiting the directory
+            with self._cache as cache:
+
+                for element in os.listdir(self.path):
+                    fullpath = join(self.path, element)
+                    # see if the path was not previously loaded
+                    if cache.enabled and fullpath in cache:
+                        yield cache[fullpath]
+                    else:
+                        # no, let's load it
+                        if is_egg_info(fullpath):
+                            egg_info = EggInfo(fullpath)
+                            if cache.enabled:
+                                cache[fullpath] = egg_info
+                            yield egg_info
+                # the cache is fully created
+                cache.completed = True
+
+    __iter__ = egg_info_dirs
+
+    def file_users(self, path):
+        """Returns EggInfo instances for the projects that uses `path`."""
+        for egg_info in self.egg_info_dirs():
+            if egg_info.uses(path):
+                yield egg_info
+
+    def owner(self, path):
+        """Returns the owner of `path`."""
+        users = [egg_info for egg_info in self.egg_info_dirs()
+                 if egg_info.uses(path)]
+        if len(users) == 1:
+            return users[0]
+        return None
+
+#
+# Directories is a collection of directories, initialized with a
+# list of paths.
+#
+class Directories(list):
+    pass
+
+#
+# high-level APIs
+#
+def get_egg_infos(paths=sys.path):
     """Iterates on all .egg-info directories founded in sys.path.
 
     Each returned element is an EggInfo instance.
     Uses a memory cache to minimize I/O access.
     """
-    for path in sys.path:
-        if _CACHE_ENABLED and path in _EGG_INFO_CACHE:
-            for egg_info in _EGG_INFO_CACHE[path]:
-                yield egg_info
-        else:
-            egg_infos = []
-            for egg_info_dir in egg_info_dirs(path):
-                egg_info = EggInfo(egg_info_dir)
-                if _CACHE_ENABLED:
-                    egg_infos.append(egg_info)
-                yield egg_info
-            # the cache is created only if all elements have been
-            # iterated through
-            if _CACHE_ENABLED:
-                _EGG_INFO_CACHE[path] = egg_infos
+    for path in paths:
+        directory = Directory(path)
+        return directory.egg_info_dirs()
 
-def get_egg_info(project_name):
+def get_egg_info(project_name, paths=sys.path):
     """Returns an EggInfo instance for the given project name.
 
     If not found, returns None.
     """
-    for project in get_egg_infos():
+    for project in get_egg_infos(paths):
         if project.name == project_name:
             return project
 
-#
-# helper for the uninstall feature
-#
-
-def get_file_users(path):
+def get_file_users(path, paths=sys.path):
     """Iterates over all projects to find out which project uses the file.
 
     Return EggInfo instances.
     """
-    for egg_info in get_egg_infos():
-        if egg_info.uses(path):
+    for path_ in paths:
+        directory = Directory(path_)
+        for egg_info in directory.file_users(path):
             yield egg_info
 

File test_pkgutil.py

     del sys.old
 
 def test_get_egg_infos():
-    projects = list(get_egg_infos())
+    projects = list(get_egg_infos([SITE_PKG]))
     assert_equals(len(projects), 2)
 
 def test_get_egg_info():
-    assert_equals(get_egg_info('xxx'), None)
+    assert_equals(get_egg_info('xxx', [SITE_PKG]), None)
 
-    project = get_egg_info('mercurial')
+    project = get_egg_info('mercurial', [SITE_PKG])
     assert_equals(project.name, 'mercurial')
 
-    project = get_egg_info('processing')
+    project = get_egg_info('processing', [SITE_PKG])
     assert_equals(project.name, 'processing')
 
 def test_egg_info():
 
-    egg_info = get_egg_info('mercurial')
+    egg_info = get_egg_info('mercurial', [SITE_PKG])
     assert_equals(str(egg_info), "EggInfo('mercurial')")
 
     files = list(egg_info.get_installed_files())
 
     assert egg_info.uses('mercurial/filelog.py')
     assert not egg_info.uses('mercurial/filelog.sasasasa')
-    assert egg_info.owns('mercurial/filelog.pyc')
-    assert not egg_info.owns('mercurial/filelog.py')
+
+
+def test_directory():
+    dir = Directory(SITE_PKG)
+
+    egg_info = dir.owner('mercurial/filelog.pyc')
+    assert egg_info.name == 'mercurial'
+    assert dir.owner('mercurial/filelog.py') is None
 
 def test_get_file_users():
 
-    users = list(get_file_users('mercurial/filelog.py'))
+    users = list(get_file_users('mercurial/filelog.py', [SITE_PKG]))
     assert_equals(len(users), 2)