Source

rope_py3k / rope / base / oi / objectinfo.py

import warnings

from rope.base import exceptions, resourceobserver
from rope.base.oi import objectdb, memorydb, transform


class ObjectInfoManager(object):
    """Stores object information

    It uses an instance of `objectdb.ObjectDB` for storing
    information.

    """

    def __init__(self, project):
        self.project = project
        self.to_textual = transform.PyObjectToTextual(project)
        self.to_pyobject = transform.TextualToPyObject(project)
        self.doi_to_pyobject = transform.DOITextualToPyObject(project)
        self._init_objectdb()
        if project.prefs.get('validate_objectdb', False):
            self._init_validation()

    def _init_objectdb(self):
        dbtype = self.project.get_prefs().get('objectdb_type', None)
        persist = None
        if dbtype is not None:
            warnings.warn(
                '"objectdb_type" project config is deprecated;\n'
                'Use "save_objectdb" instead in your project '
                'config file.\n(".ropeproject/config.py" by default)\n',
                DeprecationWarning)
            if dbtype != 'memory' and self.project.ropefolder is not None:
                persist = True
        self.validation = TextualValidation(self.to_pyobject)
        db = memorydb.MemoryDB(self.project, persist=persist)
        self.objectdb = objectdb.ObjectDB(db, self.validation)

    def _init_validation(self):
        self.objectdb.validate_files()
        observer = resourceobserver.ResourceObserver(
            changed=self._resource_changed, moved=self._resource_moved,
            removed=self._resource_moved)
        files = []
        for path in self.objectdb.get_files():
            resource = self.to_pyobject.path_to_resource(path)
            if resource is not None and resource.project == self.project:
                files.append(resource)
        self.observer = resourceobserver.FilteredResourceObserver(observer,
                                                                  files)
        self.objectdb.add_file_list_observer(_FileListObserver(self))
        self.project.add_observer(self.observer)

    def _resource_changed(self, resource):
        try:
            self.objectdb.validate_file(
                self.to_textual.resource_to_path(resource))
        except exceptions.ModuleSyntaxError:
            pass

    def _resource_moved(self, resource, new_resource=None):
        self.observer.remove_resource(resource)
        if new_resource is not None:
            old = self.to_textual.resource_to_path(resource)
            new = self.to_textual.resource_to_path(new_resource)
            self.objectdb.file_moved(old, new)
            self.observer.add_resource(new_resource)

    def get_returned(self, pyobject, args):
        result = self.get_exact_returned(pyobject, args)
        if result is not None:
            return result
        path, key = self._get_scope(pyobject)
        if path is None:
            return None
        for call_info in self.objectdb.get_callinfos(path, key):
            returned = call_info.get_returned()
            if returned and returned[0] not in ('unknown', 'none'):
                result = returned
                break
            if result is None:
                result = returned
        if result is not None:
            return self.to_pyobject(result)

    def get_exact_returned(self, pyobject, args):
        path, key = self._get_scope(pyobject)
        if path is not None:
            returned = self.objectdb.get_returned(
                path, key, self._args_to_textual(pyobject, args))
            if returned is not None:
                return self.to_pyobject(returned)

    def _args_to_textual(self, pyfunction, args):
        parameters = list(pyfunction.get_param_names(special_args=False))
        arguments = args.get_arguments(parameters)[:len(parameters)]
        textual_args = tuple([self.to_textual(arg)
                              for arg in arguments])
        return textual_args

    def get_parameter_objects(self, pyobject):
        path, key = self._get_scope(pyobject)
        if path is None:
            return None
        arg_count = len(pyobject.get_param_names(special_args=False))
        unknowns = arg_count
        parameters = [None] * arg_count
        for call_info in self.objectdb.get_callinfos(path, key):
            args = call_info.get_parameters()
            for index, arg in enumerate(args[:arg_count]):
                old = parameters[index]
                if self.validation.is_more_valid(arg, old):
                    parameters[index] = arg
                    if self.validation.is_value_valid(arg):
                        unknowns -= 1
            if unknowns == 0:
                break
        if unknowns < arg_count:
            return [self.to_pyobject(parameter)
                    for parameter in parameters]

    def get_passed_objects(self, pyfunction, parameter_index):
        path, key = self._get_scope(pyfunction)
        if path is None:
            return []
        result = []
        for call_info in self.objectdb.get_callinfos(path, key):
            args = call_info.get_parameters()
            if len(args) > parameter_index:
                parameter = self.to_pyobject(args[parameter_index])
                if parameter is not None:
                    result.append(parameter)
        return result

    def doa_data_received(self, data):
        def doi_to_normal(textual):
            pyobject = self.doi_to_pyobject(textual)
            return self.to_textual(pyobject)
        function = doi_to_normal(data[0])
        args = tuple([doi_to_normal(textual) for textual in data[1]])
        returned = doi_to_normal(data[2])
        if function[0] == 'defined' and len(function) == 3:
            self._save_data(function, args, returned)

    def function_called(self, pyfunction, params, returned=None):
        function_text = self.to_textual(pyfunction)
        params_text = tuple([self.to_textual(param)
                             for param in params])
        returned_text = ('unknown',)
        if returned is not None:
            returned_text = self.to_textual(returned)
        self._save_data(function_text, params_text, returned_text)

    def save_per_name(self, scope, name, data):
        path, key = self._get_scope(scope.pyobject)
        if path is not None:
            self.objectdb.add_pername(path, key, name, self.to_textual(data))

    def get_per_name(self, scope, name):
        path, key = self._get_scope(scope.pyobject)
        if path is not None:
            result = self.objectdb.get_pername(path, key, name)
            if result is not None:
                return self.to_pyobject(result)

    def _save_data(self, function, args, returned=('unknown',)):
        self.objectdb.add_callinfo(function[1], function[2], args, returned)

    def _get_scope(self, pyobject):
        resource = pyobject.get_module().get_resource()
        if resource is None:
            return None, None
        textual = self.to_textual(pyobject)
        if textual[0] == 'defined':
            path = textual[1]
            if len(textual) == 3:
                key = textual[2]
            else:
                key = ''
            return path, key
        return None, None

    def sync(self):
        self.objectdb.sync()

    def __str__(self):
        return str(self.objectdb)


class TextualValidation(object):

    def __init__(self, to_pyobject):
        self.to_pyobject = to_pyobject

    def is_value_valid(self, value):
        # ???: Should none and unknown be considered valid?
        if value is None or value[0] in ('none', 'unknown'):
            return False
        return self.to_pyobject(value) is not None

    def is_more_valid(self, new, old):
        if old is None:
            return True
        return new[0] not in ('unknown', 'none')

    def is_file_valid(self, path):
        return self.to_pyobject.path_to_resource(path) is not None

    def is_scope_valid(self, path, key):
        if key == '':
            textual = ('defined', path)
        else:
            textual = ('defined', path, key)
        return self.to_pyobject(textual) is not None


class _FileListObserver(object):

    def __init__(self, object_info):
        self.object_info = object_info
        self.observer = self.object_info.observer
        self.to_pyobject = self.object_info.to_pyobject

    def removed(self, path):
        resource = self.to_pyobject.path_to_resource(path)
        if resource is not None:
            self.observer.remove_resource(resource)

    def added(self, path):
        resource = self.to_pyobject.path_to_resource(path)
        if resource is not None:
            self.observer.add_resource(resource)