Lenard Lindstrom avatar Lenard Lindstrom committed fb3f977

trackmod: Added more features for use with unittest

Comments (0)

Files changed (7)

-A preliminary module tracker.
+A preliminary module import tracker.
 
-This package tracks module imports and generates a report on program
+This package tracks module imports and can generate a report on program
 completion. It has been added to Pygame's SVN with the purpose of
-incorporating it into the unit tests. To that end it need testing
-and report feature replaced some kind of runtime inspection mechanism.
-Possibly the report generation can be optional. The data collection
-sets in reporter can also be exposed with an api. Finally, should
-it be a single module?
+incorporating it into the unit tests. The package still needs testing.
+The package has been successfully tried on a zipped package. Still
+should have a way to limit what modules will be tracked.
+
 
 See the playmus.py example for how it is currently used.
+
+rev 1740: - Report generation now optional.
+          - Added a report end feature.
+
+rev 1741: - Added a data inspection api.
+          - Now have a data collection stop function.
+          - For testing, reload resets the entire package.
+          - Readied for capture of all module attribute accesses.
+
+import sys
+
+import trackmod
+trackmod.begin('playmus_rep.txt')
+import pygame
+
+MusicEnd = pygame.USEREVENT
+
+def main():
+    pygame.mixer.init()
+    pygame.mixer.music.set_endevent(MusicEnd)
+    pygame.mixer.music.load(sys.argv[1])
+    pygame.mixer.music.play()
+    while pygame.mixer.music.get_busy():
+        pygame.time.wait(100)
+    pygame.mixer.quit()
+
+if __name__ == '__main__':
+    main()
+

trackmod/__init__.py

 # package trackmod
+# For Python 2.4 and up.
 
 """A package for tracking module use
 
-Exports begin(repfilepth=None).
+Exports:
+    begin(repfile=None) ==> None
+    end() ==> None
+    get_previous_imports() ==> List of names
+    get_my_imports() ==> List of names
+    get_imports() ==> List of names
+    get_unaccessed_modules() ==> List of names
+    get_accessed_modules() ==> List of names
+    get_accesses() ==> Dictionary of attribute names by module name
+    write_report(repfile) ==> None
 
 """
 
-from trackmod import reporter  # Want this first
+from trackmod import reporter  # Keep this first.
 import sys
 import atexit
 
     installed
 except NameError:
     installed = False
+else:
+    # reloaded; reload submodules.
+    reload(importer)  # implicit reporter reload.
 
-def generate_report(repfilepth):
+def print_(*args, **kwds):
+    stream = kwds.get('file', sys.stdout)
+    sep = kwds.get('sep', ' ')
+    end = kwds.get('end', '\n')
+
+    if args:
+        stream.write(sep.join([str(arg) for arg in args]))
+    if end:
+        stream.write(end)
+
+def _write_report(repfile):
+    def report(*args, **kwds):
+        print_(file=repfile, *args, **kwds)
+
+    report("=== module usage report ===")
+    report("\n-- modules already imported (ignored) --")
+    for name in get_previous_imports():
+        report(name)
+    report("\n-- modules added by", __name__.split('.')[0], "(ignored) --")
+    for name in get_my_imports():
+        report(name)
+    report("\n-- modules imported but not accessed --")
+    for name in get_unaccessed_modules():
+        report(name)
+    report("\n-- modules accessed --")
+    accesses = sorted(get_accesses().iteritems())
+    for name, attrs in accesses:
+        report(name, " (", ', '.join(attrs), ")", sep='')
+    report("\n=== end of report ===")
+
+def get_previous_imports():
+    """Return a new sorted name list of previously imported modules"""
+    return reporter.get_previous_imports()
+
+def get_my_imports():
+    """Return a new sorted name list of module imported by this package"""
+    return reporter.get_my_imports()
+
+def get_imports():
+    """Return a new sorted name list of imported modules"""
+    return reporter.get_imports()
+
+def get_unaccessed_modules():
+    """Return a new sorted name list of unaccessed imported modules"""
+    return reporter.get_unaccessed_modules()
+    
+def get_accessed_modules():
+    """Return a new sorted name list of accessed modules"""
+    return reporter.get_accessed_modules()
+
+def get_accesses():
+    """Return a new dictionary of lists of attributes by module name"""
+    return reporter.get_accesses()
+
+def write_report(repfile=None):
+    """Write a module import and access report to repfile
+
+    repfile may be an open file object of a file path. If not previded
+    then writes to standard output. Data collection is terminated if not
+    already stopped by an end() call. If no data is collected, begin() not
+    called, then a runtime error is raised.
+
+    """
     try:
-        repfile = open(repfilepth, 'w')
-    except:
-        return
-    try:
-        reporter.write_report(repfile)
-    finally:
-        repfile.close()
-    
-def begin(repfilepth=None):
-    global installed
+        if collecting:
+            end()
+    except NameError:
+        raise RuntimeError("No import data was collected")
+    if repfile is None:
+        _write_report(sys.stdout)
+    else:
+        try:
+            repfile.write
+        except AttributeError:
+            rf = open(repfile, 'w')
+            try:
+                _write_report(rf)
+            finally:
+                rf.close()
+        else:
+            _write_report(repfile)
+
+def begin(repfile=None):
+    """Start collecting import and module access information
+
+    repfile, if provided, is the destination for an end-of-run module import
+    and access report. It can be either a file path or an open file object.
+
+    """
+    global installed, collecting
 
     if not installed:
         sys.meta_path.insert(0, importer)
         installed = True
-        if repfilepth is not None:
-            atexit.register(generate_report, repfilepth)
+        if repfile is not None:
+            atexit.register(write_report, repfile)
+    try:
+        if collecting:
+            return
+    except NameError:
+        collecting = True
 
 def end():
+    global collecting
+    collecting = False
     reporter.end()
     importer.end()
 

trackmod/importer.py

 
 import sys
 
-from trackmod import loader
-from trackmod import module
+from trackmod import module, reporter
 
+try:
+    collect_data
+except NameError:
+    pass
+else:
+    # reload: reload imported modules.
+    reload(module)  # implicit reload of reporter
 
 collect_data = True
 
+class Loader(object):
+    def __init__(self, fullname, module):
+        self.fullname = fullname
+        self.module = module
+
+    def load_module(self, fullname):
+        assert fullname == self.fullname, (
+            "loader called with wrong module %s: expecting %s" %
+              (fullname, self.fullname))
+        sys.modules[fullname] = self.module
+        return self.module
+
 def find_module(fullname, path=None):
     if collect_data and fullname not in sys.modules:
-        m = module.Module(fullname)
+        # Put this first so the order of inserts follows the order of calls to
+        # find_module: package, subpackage, etc.
+        reporter.add_import(fullname)
+
+        # reload doesn't get any tracked TrackerModule attributes.
+        m = module.TrackerModule(fullname)
+
+        # Add m to modules so reload works and to prevent infinite recursion.
         sys.modules[fullname] = m
         try:
             try:
                 reload(m)
-            except ImportError:
+            except ImportError, e:
+                reporter.remove_import(fullname)
                 return None;
         finally:
             del sys.modules[fullname]
-        m.__class__ = module.TrackerModule
-        loader.add(fullname, m)
-        return loader
+
+        # Add parent package access.
+        parts = fullname.rsplit('.', 1)
+        if len(parts) == 2:
+            try:
+                pkg = sys.modules[parts[0]]
+                try:
+                    getattr(pkg, parts[1])
+                except AttributeError:
+                    pass
+            except KeyError:
+                pass
+
+        return Loader(fullname, m)
     else:
         return None
 

trackmod/loader.py

-# module trackmod.loader
-
-"""A sys.meta_path loader for module usage tracking."""
-
-import sys
-import threading
-
-from trackmod import reporter
-
-module_table = {}
-module_table_lock = threading.Lock()
-
-def add(name, module):
-    module_table_lock.acquire()
-    try:
-        module_table[name] = module
-    finally:
-        module_table_lock.release()
-
-def pop(name):
-    module_table_lock.acquire()
-    try:
-        return module_table.pop(name)
-    finally:
-        module_table_lock.release()
-
-def load_module(fullname):
-    m = pop(fullname)
-    sys.modules[fullname] = m
-    reporter.add_loaded(fullname)
-    return m
-

trackmod/module.py

 from trackmod import keylock
 from trackmod import reporter
 
+try:
+    ModuleType
+except NameError:
+    pass
+else:
+    # reload; reload imported modules
+    reload(keylock)
+    reload(reporter)
 
 ModuleType = type(reporter)
 
+
 class Module(ModuleType):
     # A heap subtype of the module type.
     #
     name = module.__name__  # Safe: no recursive call on __name__.
     lock = keylock.Lock(name)
     try:
-        reporter.add_accessed(name, attr)
+        reporter.add_access(name, attr)
         ModuleType.__setattr__(module, '__class__', Module)
     finally:
         lock.free()

trackmod/reporter.py

+# module trackmod.reporter
+
 # Keep this first.
 def listmods():
     return [n for n, m in sys.modules.iteritems() if m is not None]
 
 import sys
-already_loaded = listmods()
+previous_imports = listmods()  #  Keep this after sys but before other imports.
 import threading
 
+# This module is does not need explicit thread protection since all calls
+# to the data entry methods are made while the import lock is acquired.
 collect_data = True
+my_imports = None
+accesses = None
+failed_imports = None
 
-def print_(*args, **kwds):
-    stream = kwds.get('file', sys.stdout)
-    sep = kwds.get('sep', ' ')
-    end = kwds.get('end', '\n')
-
-    if args:
-        stream.write(sep.join([str(arg) for arg in args]))
-    if end:
-        stream.write(end)
 
 def process_accessed():
     acc_names = dict(accessed)
                 acc_names[subname] = parts[i]
     return set(acc_names.iteritems())
 
-def write_report(repfile):
-    def rep(*args, **kwds):
-        print_(file=repfile, *args, **kwds)
-
-    accessed = process_accessed()
-    rep("=== module usage report ===")
-    rep("\n-- modules already imported (ignored) --")
-    already_loaded.sort()
-    for name in already_loaded:
-        rep(name)
-    rep("\n-- modules added by trackmod (ignored) --")
-    added_by_trackmod.sort()
-    for name in added_by_trackmod:
-        rep(name)
-    rep("\n-- modules imported but not accessed --")
-    acc = set([n for n, ignored in accessed])
-    unaccessed = list(loaded - acc)
-    unaccessed.sort()
-    for name in unaccessed:
-        rep(name)
-    rep("\n-- modules accessed --")
-    acc = list(accessed)
-    acc.sort()
-    for name, attr in acc:
-        rep(name, "(%s)" % attr)
-    rep("\n=== end of report ===")
-
 def begin():
-    global already_loaded, loaded, accessed, data_lock, added_by_trackmod
-    added_by_trackmod = list(set(listmods()) - set(already_loaded))
-    loaded = set()
-    accessed = set()
-    data_lock = threading.Lock()
+    global previous_imports, my_imports, accesses, failed_imports
+    my_imports = list(set(listmods()) - set(previous_imports))
+    accesses = {}
+    failed_imports = set()
 
 def end():
     global collect_data
     collect_data = False
 
-def add_loaded(name):
+def add_import(name):
+    """Add a module to the import list
+
+    Expects to be called in the order in which modules are created:
+    package, submodule, etc.
+
+    """
     if collect_data:
-        data_lock.acquire()
-        try:
-            loaded.add(name)
-        finally:
-            data_lock.release()
+        accesses[name] = set()
+ 
+def remove_import(name):
+    del accesses[name]
+    failed_imports.add(name)
 
-def add_accessed(name, attr):
+def add_access(name, attr):
     if collect_data:
-        data_lock.acquire()
-        try:
-            accessed.add((name, attr))
-        finally:
-            data_lock.release()
+        accesses[name].add(attr)
 
+def get_previous_imports():
+    """Return a new sorted name list of previously imported modules"""
+    return sorted(previous_imports)
 
+def get_my_imports():
+    """Return a new sorted name list of module imported by this package"""
+    return sorted(my_imports)
+
+def get_imports():
+    """Return a new sorted name list of imported modules"""
+    return sorted(accesses.iterkeys())
+
+def get_unaccessed_modules():
+    """Return a new sorted name list of unaccessed imported modules"""
+    return sorted(n for n, a in accesses.iteritems() if not a)
+    
+def get_accessed_modules():
+    """Return a new sorted name list of accessed modules"""
+    return sorted(n for n, a in accesses.iteritems() if a)
+
+def get_accesses():
+    """Return a new dictionary of sorted lists of attributes by module name"""
+    return dict((n, sorted(a)) for n, a in accesses.iteritems() if a)
+
+
+
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.