1. Chad Dombrova
  2. django-denormalize

Commits

Chad Dombrova  committed 8150e10

An alternate organizational structure, separated by framework instead of component

  • Participants
  • Parent commits ef20379
  • Branches normalize_alt

Comments (0)

Files changed (26)

File spiner/denormalize/backend/__init__.py

Empty file removed.

File spiner/denormalize/backend/base.py

-import weakref
-
-from ..context import get_current_context
-
-import logging
-from collections import defaultdict
-from ...models import DocumentCollectionBase
-
-log = logging.getLogger(__name__)
-
-
-class BackendBase(object):
-
-    collections = None
-
-    # This will be shared by all subclasses. It keeps track of all active
-    # backends.
-    _registry = weakref.WeakValueDictionary()
-
-    def __init__(self, name=None, listener_class=None):
-        """
-
-        :param name: Name for backend instance. This is used to refer to
-            the backend instance from management commands. If not given,
-            a name will be generated based on the id().
-        :type name:
-        :return:
-        :rtype:
-        """
-        if name is None:
-            name = '_{0}'.format(id(self))
-        if name in self._registry:
-            raise ValueError("A backend with name '{0}' has already been "
-                "registered. Please pass another 'name' during "
-                "initialization.".format(name))
-        self._registry[name] = self
-        self.collections = {}
-        self.listener_class = listener_class
-        self._listeners = defaultdict(list)
-
-    def register(self, collection):
-        """
-        :type collection: denormalize.models.DocumentCollectionBase
-        :return:
-        :rtype:
-        """
-        if collection.name in self.collections:
-            raise ValueError("A collection with name '{0}' has already "
-                "been registered to this backend.".format(collection.name))
-        self.collections[collection.name] = collection
-        self._setup_listeners(collection)
-
-    def deregister(self, collection):
-        """
-        :type collection: denormalize.models.DocumentCollectionBase
-        :return:
-        :rtype:
-        """
-        if collection.name not in self.collections:
-            raise ValueError("No collection with name '{0}' has "
-                "been registered to this backend.".format(collection.name))
-        self.collections[collection.name] = collection
-        self._remove_listeners(collection)
-
-    def deregister_all(self):
-        for collection in self.collections.values():
-            self.deregister(collection)
-
-    def _get_collection(self, collection_name_or_obj):
-        if isinstance(collection_name_or_obj, DocumentCollectionBase):
-            name = collection_name_or_obj.name
-        elif isinstance(collection_name_or_obj, basestring):
-            name = collection_name_or_obj
-        else:
-            raise KeyError("Invalid collection specified")
-        # Can raise KeyError
-        return self.collections[name]
-
-    def fix_value(self, value):
-        return value
-
-    def _fix_document(self, doc):
-        """A utility for use on `BackendBase` sub-classes to adjust the types
-        and values of document entries for compatibility with the backend.
-
-        To use, override `BackendBase.fix_value`.
-        """
-        if not hasattr(self, 'fix_value'):
-            return doc
-
-        if isinstance(doc, dict):
-            for key, value in doc.items():
-                doc[key] = self._fix_document(value)
-            return doc
-        if isinstance(doc, (list, tuple)):
-            return [self._fix_document(item) for item in doc]
-        else:
-            return self.fix_value(doc)
-
-    def _setup_listeners(self, collection):
-        """
-        :type collection: denormalize.models.DocumentCollectionBase
-        """
-        for model, filter_paths in collection.get_related_models():
-            self._add_listener(collection, model, filter_paths)
-
-    def _add_listener(self, collection, model, filter_paths):
-        """Connect the Django ORM signals to given dependency
-
-        :type collection: denormalize.models.DocumentCollectionBase
-        :param model: dependency model to watch for changes
-        :type model: denormalize.orms.ModelInspector
-        :param filter_path: chain of relations from `collection.model` to
-            `submodel`
-        :type filter_path: list of denormalize.orms.RelationProperty, or None
-        """
-        # TODO: this does not handle changing foreignkeys well. The object
-        #       will be added to the new root, but not deleted from the old
-        #       root object. Maybe solve by also adding a pre_save? The
-        #       database will still contain the old connection.
-        # TODO: Consider moving all of this to a Collection or something
-        #       separate and just listen to signals from there.
-        if self.listener_class:
-            listener_class = self.listener_class
-        else:
-            listener_class = collection.model.default_listener()
-        listener = listener_class(self, collection, model, filter_paths)
-        listener.connect()
-        self._listeners[collection.name].append(listener)
-
-    def _remove_listeners(self, collection):
-        """
-        :type collection: denormalize.models.DocumentCollectionBase
-        """
-        listeners = self._listeners.pop(collection.name)
-        for listener in listeners:
-            listener.disconnect()
-
-    def _queue_deleted(self, collection, doc_id):
-        """Queue a deleted notification"""
-        assert isinstance(doc_id, tuple)
-        context = get_current_context()
-        if context is not None:
-            log.debug("Queued delete for %s %s", collection, doc_id)
-            # FIXME: use a set here.  why use a dictionary when the key is the
-            #        same as the value?
-            key = (self, collection, doc_id)
-            context.deleted[key] = (self, collection, doc_id)
-        else:
-            self._call_deleted(collection, doc_id)
-
-    def _queue_added(self, collection, doc_id):
-        """Queue an added notification"""
-        assert isinstance(doc_id, tuple)
-        context = get_current_context()
-        if context is not None:
-            log.debug("Queued add for %s %s", collection, doc_id)
-            # FIXME: use a set here.  why use a dictionary when the key is the
-            #        same as the value?
-            key = (self, collection, doc_id)
-            context.added[key] = (self, collection, doc_id)
-        else:
-            self._call_added(collection, doc_id)
-
-    def _queue_changed(self, collection, doc_id):
-        """Queue a changed notification"""
-        assert isinstance(doc_id, tuple)
-        context = get_current_context()
-        if context is not None:
-            log.debug("Queued change for %s %s", collection, doc_id)
-            # FIXME: use a set here.  why use a dictionary when the key is the
-            #        same as the value?
-            key = (self, collection, doc_id)
-            context.changed[key] = (self, collection, doc_id)
-        else:
-            self._call_changed(collection, doc_id)
-
-    def _call_deleted(self, collection, doc_id):
-        self.deleted(collection, doc_id)
-        # TODO: signals
-
-    def _call_added(self, collection, doc_id):
-        doc = collection.dump_id(doc_id)
-        doc = self._fix_document(doc)
-        self.added(collection, doc_id, doc)
-        # TODO: signals
-
-    def _call_changed(self, collection, doc_id):
-        doc = collection.dump_id(doc_id)
-        doc = self._fix_document(doc)
-        self.changed(collection, doc_id, doc)
-        # TODO: signals
-
-
-    # Implement these for your backend. The code above will call these with
-    # ORM updates you need to commit to the backend.
-
-    def deleted(self, collection, doc_id):
-        """Called when the root object for a document was deleted in the ORM"""
-        raise NotImplementedError()
-
-    def added(self, collection, doc_id, doc):
-        """Called when the root object for a document was added in the ORM"""
-        raise NotImplementedError()
-
-    def changed(self, collection, doc_id, doc):
-        """Called when any data in a document was changed in the ORM"""
-        raise NotImplementedError()
-
-    def get_doc(self, collection, doc_id):
-        """Fetch data from a collection (if supported by the backend))"""
-        raise NotImplementedError()
-
-    def sync_collection(self, collection):
-        """Sync all data in the collections from the ORM to the backend
-
-        This does not make sense for all backends.
-        """
-        raise NotImplementedError()
-

File spiner/denormalize/backend/cache.py

-import time
-
-from django.core import cache
-from django.conf import settings
-
-from .base import BackendBase
-
-import logging
-log = logging.getLogger(__name__)
-
-
-class CacheBackend(BackendBase):
-    """Backend that uses the Django cache API
-
-    This is useful to speedup your Django application. This backend will
-    try to find the requested document in the Django cache, and update the
-    cache on a cache miss. Changes to the underlying data will be detected
-    immediately, and the cache updated accordingly, so you do not have to
-    worry about cache invalidation yourself.
-    """
-
-    cache_prefix = __name__
-    cache_backend = 'default'
-    # Long enough to do real caching, short enough to notice problems
-    # during development
-    cache_timeout = getattr(settings, 'DENORMALIZE_CACHE_TIMEOUT', 3600)
-
-    # Log get_doc timings at the DEBUG level during development to find
-    # performance bottlenecks. Fetching data from a memcached server
-    # requires both a network roundtrip and unpickling the data, so it
-    # might still be useful to add a short cache (say 5 seconds) in
-    # local memory for often used documents.
-    log_timings = getattr(settings, 'DEBUG', False)
-
-    # Only used by get_doc_cached for an extra performance boost
-    locmem_speedup_timeout = getattr(settings,
-        'DENORMALIZE_CACHE_LOCMEM_SPEEDUP_TIMEOUT', 5)
-    _locmem_speedup_cache = None
-
-    # TODO: instead of updating a cache with a new doc, maybe just unset it
-    #       and update it on the first access. Maybe make this behavior
-    #       configurable
-
-    def __init__(self, name="default", timeout=None, listener_class=None):
-        super(CacheBackend, self).__init__(name=name,
-                                           listener_class=listener_class)
-        self.cache = cache.get_cache(self.cache_backend)
-        if timeout:
-            self.cache_timeout = timeout
-        self._locmem_speedup_cache = {}
-
-    def _cache_key(self, collection, doc_id):
-        return ':'.join((self.cache_prefix, collection.name,
-            str(collection.version), str(doc_id)))
-
-    def deleted(self, collection, doc_id):
-        log.debug('deleted: %s %s', collection.name, doc_id)
-        key = self._cache_key(collection, doc_id)
-        # We use an empty document to distinguish this from a cache miss
-        self.cache.set(key, {}, self.cache_timeout)
-        self._invalidate_locmem_speedup_cache(collection, doc_id)
-
-    def added(self, collection, doc_id, doc):
-        log.debug('added: %s %s', collection.name, doc_id)
-        key = self._cache_key(collection, doc_id)
-        self.cache.set(key, doc, self.cache_timeout)
-        self._invalidate_locmem_speedup_cache(collection, doc_id)
-
-    def changed(self, collection, doc_id, doc):
-        log.debug('changed: %s %s', collection.name, doc_id)
-        key = self._cache_key(collection, doc_id)
-        self.cache.set(key, doc, self.cache_timeout)
-        self._invalidate_locmem_speedup_cache(collection, doc_id)
-
-    def get_doc(self, collection, doc_id):
-        if not isinstance(doc_id, tuple):
-            doc_id = (doc_id,)
-
-        # In case it's passed by name, convert to the object
-        collection = self._get_collection(collection)
-
-        key = self._cache_key(collection, doc_id)
-        log_timings = self.log_timings
-        if log_timings: t0 = time.time()
-        doc = self.cache.get(key, None)
-        if log_timings: t1 = time.time()
-
-        if doc is None:
-            # Update cache
-            if log_timings:
-                ut0 = time.time()
-            try:
-                doc = collection.dump_id(doc_id)
-            except collection.model.DoesNotExist:
-                # Cache the fact that it does not exist
-                self.deleted(collection, doc_id)
-                return None
-
-            self.added(collection, doc_id, doc)
-            if log_timings:
-                ut1 = time.time()
-                udt = (ut1 - ut0) * 1000
-                log.debug("Cache collection update for %s:%s took %.1f ms",
-                    collection.name, doc_id, udt)
-            return doc
-
-        elif not doc:
-            # Does not exist
-            return None
-
-        else:
-            # Found it
-            if log_timings:
-                dt = (t1 - t0) * 1000
-                log.debug("Cache get for %s:%s took %.1f ms",
-                    collection.name, doc_id, dt)
-            return doc
-
-    def get_doc_cached(self, collection, doc_id, timeout=None):
-        """Like get_doc(), but backed by an extra local memory cache layer.
-
-        Fetching data from a memcached server requires both a network
-        roundtrip and unpickling the data, so it can be useful to add
-        a short cache in local memory for often used documents. This
-        method does just that.
-
-        Any updates done in the current process will immediately invalidate
-        this local cache layer, but updates done by other processes/servers
-        cannot be detected, so keep this cache time low. Using this will
-        basically add latency to cache invalidation.
-
-        The default timeout is 5 seconds and can be overridden in your
-        settings, or by subclassing this backend, or by passing the timeout
-        explicitly to this method. If you have 10 requests per second for
-        a document, this will save you a memcached roundtrip and unpickle
-        for 98% of your requests, at a cost of a 5 second delay in
-        invalidation.
-        """
-        if not isinstance(doc_id, tuple):
-            doc_id = (doc_id,)
-
-        collection_name = collection if isinstance(collection, basestring) \
-                          else collection.name
-
-        col_cache = self._locmem_speedup_cache.setdefault(collection_name, {})
-        cached = col_cache.get(doc_id, None)
-        if cached is not None:
-            ts, doc = cached
-            timeout = timeout or self.locmem_speedup_timeout
-            if ts + timeout >= time.time():
-                # Cached value is still valid
-                return doc
-
-        # Update locmem speedup cache
-        doc = self.get_doc(collection, doc_id)
-        col_cache[doc_id] = time.time(), doc
-        return doc
-
-    def _invalidate_locmem_speedup_cache(self, collection, doc_id):
-        # Invalidates any locmem speedup cache for given collection and doc_id,
-        # so that the local process immediately sees changes made locally.
-        try:
-            col_cache = self._locmem_speedup_cache[collection.name]
-        except KeyError:
-            pass
-        else:
-            try:
-                del col_cache[doc_id]
-            except KeyError:
-                pass
-

File spiner/denormalize/backend/locmem.py

-import time
-import threading
-from .base import BackendBase
-
-import logging
-log = logging.getLogger(__name__)
-
-class SyncInProgress(RuntimeError):
-    pass
-
-
-class LocMemBackend(BackendBase):
-    # All documents are stored in this dict
-    # data[collection_name][object_id] = document
-    data = None
-
-    # Used to keep collections that are currently being synced consistent,
-    # even if an update happens during dump_collection.
-    # WARNING: This still does not cover all possible race conditions,
-    #          especially not if database transactions are enabled!
-    _dirty = None
-    _lock = None
-
-    def __init__(self, name=None, listener_class=None):
-        super(LocMemBackend, self).__init__(name=name,
-                                            listener_class=listener_class)
-        self.data = {}
-        self._dirty = {}
-        self._lock = threading.Lock()
-
-    def deleted(self, collection, doc_id):
-        log.debug('deleted: %s %s', collection.name, doc_id)
-        with self._lock:
-            coldata = self.data.setdefault(collection.name, {})
-            try:
-                del coldata[doc_id]
-            except KeyError:
-                pass
-            else:
-                self._mark_dirty(collection.name, doc_id)
-
-    def added(self, collection, doc_id, doc):
-        log.debug('added: %s %s', collection.name, doc_id)
-        with self._lock:
-            coldata = self.data.setdefault(collection.name, {})
-            coldata[doc_id] = doc
-            self._mark_dirty(collection.name, doc_id)
-
-    def changed(self, collection, doc_id, doc):
-        log.debug('changed: %s %s', collection.name, doc_id)
-        with self._lock:
-            coldata = self.data.setdefault(collection.name, {})
-            coldata[doc_id] = doc
-            self._mark_dirty(collection.name, doc_id)
-
-    def _mark_dirty(self, collection_name, doc_id):
-        assert isinstance(doc_id, tuple)
-        if collection_name in self._dirty:
-            try:
-                self._dirty[collection_name].add(doc_id)
-            except KeyError:
-                pass
-
-    def get_doc(self, collection, doc_id):
-        if not isinstance(doc_id, tuple):
-            doc_id = (doc_id,)
-        return self.data[collection.name][doc_id]
-
-    _sync_collection_before_handling_dirty = None # method
-
-    def sync_collection(self, collection):
-        log.info("Starting full sync for collection %s", collection.name)
-        t0 = time.time()
-
-        # TODO: there is a slight race condition here
-        if collection.name in self._dirty:
-            log.error("There is already a full sync running for collection %s!",
-                collection.name)
-            raise SyncInProgress("There is already a sync in progress")
-
-        docs = {}
-        has_lock = False
-        try:
-            # Lock collection for sync
-            # This will make all the signal handlers register all changes
-            # crom now on. We want updates to continue, because this can
-            # take a long time, and we do not want to block other code.
-            self._dirty[collection.name] = set()
-            for doc in collection.dump_collection():
-                doc_id = tuple([doc[pk.accessor] for pk in
-                               collection.model.primary_keys()])
-                docs[doc_id] = doc
-
-            # Used for testing only
-            if self._sync_collection_before_handling_dirty is not None:
-                self._sync_collection_before_handling_dirty()
-
-            # Now handle the changes that occurred during the full dump.
-            # We will acquire a long for this update, as it only touches
-            # local memory and will be very fast.
-            dirty_ids = self._dirty[collection.name]
-            log.info("%i dirty objects for collection %s",
-                len(dirty_ids), collection.name)
-
-            # Lock the data during this update
-            has_lock = True
-            self._lock.acquire()
-
-            for doc_id in dirty_ids:
-                # Patch up new data with updated local data
-                doc = self.data[collection.name].get(doc_id, None)
-                if doc:
-                    docs[doc_id] = doc
-                elif doc_id in docs:
-                    del docs[doc_id]
-
-            # Now overwrite the current collection data
-            self.data[collection.name] = docs
-
-        finally:
-            # Unlock
-            del self._dirty[collection.name]
-            if has_lock:
-                self._lock.release()
-            t1 = time.time()
-            log.info("Full sync for collection %s completed in %.3fs",
-                collection.name, t1-t0)
-

File spiner/denormalize/backend/mongodb.py

-import pymongo
-
-from .base import BackendBase
-
-import time
-import datetime
-import decimal
-import logging
-log = logging.getLogger(__name__)
-
-
-class MongoBackend(BackendBase):
-    connection_uri = "mongodb://localhost"
-    db_name = "test_denormalize"
-    # TODO: catch connection errors
-
-    # References to the connection and database objects after connecting
-    connection = None
-    db = None
-
-    def __init__(self, name=None, db_name=None, connection_uri=None,
-                 listener_class=None):
-        super(MongoBackend, self).__init__(name=name,
-                                           listener_class=listener_class)
-        if db_name:
-            self.db_name = db_name
-        if connection_uri:
-            self.connection_uri = connection_uri
-        self.connect()
-
-    def _get_id(self, doc_id):
-        """create a valid mongo _id from `doc_id`.
-        `doc_id` may be a tuple if the model's primary key is compound.
-        """
-        if isinstance(doc_id, tuple):
-            return ','.join(str(x) for x in doc_id)
-        return doc_id
-
-    def fix_value(self, value):
-        # changes made here need to be losslessly reversible if the document
-        # is intended to make a round trip from RDBMS -> mongo -> RDBMS.
-        # If it is not, the value modified here will end up clobbering the
-        # original value in the RDBMS, when it is re-normalized.
-        if isinstance(value, datetime.date):
-            return datetime.datetime(value.year, value.month, value.day)
-        if isinstance(value, decimal.Decimal):
-            # Initial testing with sqlalchemy and mysql/sqlite shows that this
-            # should be reversible: even if the mongo document holds a float
-            # value which was converted from a Decimal, mysql/sqlite will
-            # evaluate that float as equal to its original decimal.
-            # e.g. this will work:
-            # SELECT id from mytable WHERE numeric_field = float_value
-            return float(value)
-        elif isinstance(value, basestring):
-            # FIXME: this is not lossless
-            return value.decode('utf-8', 'ignore')
-        return value
-
-    def connect(self):
-        self.connection = pymongo.Connection(self.connection_uri, safe=True)
-        self.db = getattr(self.connection, self.db_name)
-
-    def deleted(self, collection, doc_id):
-        doc_id = self._get_id(doc_id)
-        log.debug('deleted: %s %s', collection.name, doc_id)
-        col = getattr(self.db, collection.name)
-        col.remove({'_id': doc_id})
-
-    def added(self, collection, doc_id, doc):
-        doc_id = self._get_id(doc_id)
-        log.debug('added: %s %s', collection.name, doc_id)
-        col = getattr(self.db, collection.name)
-        # Replace any existing document
-        doc['_id'] = doc_id
-        col.update({'_id': doc_id}, doc, upsert=True)
-        del doc['_id']
-
-    def changed(self, collection, doc_id, doc):
-        doc_id = self._get_id(doc_id)
-        log.debug('changed: %s %s', collection.name, doc_id)
-        col = getattr(self.db, collection.name)
-        # We are not allowed to update _id
-        if '_id' in doc:
-            del doc['_id']
-        # Only update the documents fields. We keep any other fields
-        # added by other code intact, as long as they are set on the
-        # document root.
-        col.update({'_id': doc_id}, {'$set': doc}, upsert=True)
-
-    def get_doc(self, collection, doc_id):
-        doc_id = self._get_id(doc_id)
-        col = getattr(self.db, collection.name)
-        return col.find_one({'_id': doc_id})
-
-    def sync_collection(self, collection):
-        import pstats, cProfile, sys
-        import cStringIO as StringIO
-
-        log.info("Starting full sync for collection %s", collection.name)
-        t0 = time.time()
-
-        mongo_collection = self.db[collection.name]
-        primary_key = collection.model.primary_keys()
-
-        mongo_collection.drop()
-        bulk = mongo_collection.initialize_unordered_bulk_op()
-
-        batch_size = 100  # Records per batch
-        batch_execute_opts = dict(w=0, j=False)
-
-        counter = 0
-        for doc in collection.dump_collection():
-            #pr = cProfile.Profile()
-            #pr.enable()
-
-            doc_id = ','.join([str(doc[pk.accessor]) for pk in primary_key])
-            fixed_doc = self._fix_document(doc)
-            fixed_doc['_id'] = doc_id
-            bulk.insert(fixed_doc)
-            counter += 1
-
-            if counter % batch_size == 0:
-                log.debug('%s checkpoint %s' % (collection.name, counter))
-                bulk.execute(batch_execute_opts)
-                bulk = mongo_collection.initialize_unordered_bulk_op()
-
-            #pr.disable()
-
-            # s = StringIO.StringIO()
-            # ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
-            # ps.print_stats()
-            # print s.getvalue()
-            #
-            # sys.exit(1)
-
-        log.info('%s checkpoint: final' % collection.name)
-        bulk.execute(batch_execute_opts)
-
-        t1 = time.time()
-        log.info("Full sync for collection %s completed in %.3fs",
-            collection.name, t1-t0)
-

File spiner/denormalize/backends/__init__.py

Empty file added.

File spiner/denormalize/backends/base.py

View file
+import weakref
+
+from ..context import get_current_context
+
+import logging
+from collections import defaultdict
+from ...models import DocumentCollectionBase
+
+log = logging.getLogger(__name__)
+
+
+class BackendBase(object):
+
+    collections = None
+
+    # This will be shared by all subclasses. It keeps track of all active
+    # backends.
+    _registry = weakref.WeakValueDictionary()
+
+    def __init__(self, name=None, listener_class=None):
+        """
+
+        :param name: Name for backend instance. This is used to refer to
+            the backend instance from management commands. If not given,
+            a name will be generated based on the id().
+        :type name:
+        :return:
+        :rtype:
+        """
+        if name is None:
+            name = '_{0}'.format(id(self))
+        if name in self._registry:
+            raise ValueError("A backend with name '{0}' has already been "
+                "registered. Please pass another 'name' during "
+                "initialization.".format(name))
+        self._registry[name] = self
+        self.collections = {}
+        self.listener_class = listener_class
+        self._listeners = defaultdict(list)
+
+    def register(self, collection):
+        """
+        :type collection: denormalize.models.DocumentCollectionBase
+        :return:
+        :rtype:
+        """
+        if collection.name in self.collections:
+            raise ValueError("A collection with name '{0}' has already "
+                "been registered to this backend.".format(collection.name))
+        self.collections[collection.name] = collection
+        self._setup_listeners(collection)
+
+    def deregister(self, collection):
+        """
+        :type collection: denormalize.models.DocumentCollectionBase
+        :return:
+        :rtype:
+        """
+        if collection.name not in self.collections:
+            raise ValueError("No collection with name '{0}' has "
+                "been registered to this backend.".format(collection.name))
+        self.collections[collection.name] = collection
+        self._remove_listeners(collection)
+
+    def deregister_all(self):
+        for collection in self.collections.values():
+            self.deregister(collection)
+
+    def _get_collection(self, collection_name_or_obj):
+        if isinstance(collection_name_or_obj, DocumentCollectionBase):
+            name = collection_name_or_obj.name
+        elif isinstance(collection_name_or_obj, basestring):
+            name = collection_name_or_obj
+        else:
+            raise KeyError("Invalid collection specified")
+        # Can raise KeyError
+        return self.collections[name]
+
+    def fix_value(self, value):
+        return value
+
+    def _fix_document(self, doc):
+        """A utility for use on `BackendBase` sub-classes to adjust the types
+        and values of document entries for compatibility with the backend.
+
+        To use, override `BackendBase.fix_value`.
+        """
+        if not hasattr(self, 'fix_value'):
+            return doc
+
+        if isinstance(doc, dict):
+            for key, value in doc.items():
+                doc[key] = self._fix_document(value)
+            return doc
+        if isinstance(doc, (list, tuple)):
+            return [self._fix_document(item) for item in doc]
+        else:
+            return self.fix_value(doc)
+
+    def _setup_listeners(self, collection):
+        """
+        :type collection: denormalize.models.DocumentCollectionBase
+        """
+        for model, filter_paths in collection.get_related_models():
+            self._add_listener(collection, model, filter_paths)
+
+    def _add_listener(self, collection, model, filter_paths):
+        """Connect the Django ORM signals to given dependency
+
+        :type collection: denormalize.models.DocumentCollectionBase
+        :param model: dependency model to watch for changes
+        :type model: denormalize.orms.ModelInspector
+        :param filter_path: chain of relations from `collection.model` to
+            `submodel`
+        :type filter_path: list of denormalize.orms.RelationProperty, or None
+        """
+        # TODO: this does not handle changing foreignkeys well. The object
+        #       will be added to the new root, but not deleted from the old
+        #       root object. Maybe solve by also adding a pre_save? The
+        #       database will still contain the old connection.
+        # TODO: Consider moving all of this to a Collection or something
+        #       separate and just listen to signals from there.
+        if self.listener_class:
+            listener_class = self.listener_class
+        else:
+            listener_class = collection.model.default_listener()
+        listener = listener_class(self, collection, model, filter_paths)
+        listener.connect()
+        self._listeners[collection.name].append(listener)
+
+    def _remove_listeners(self, collection):
+        """
+        :type collection: denormalize.models.DocumentCollectionBase
+        """
+        listeners = self._listeners.pop(collection.name)
+        for listener in listeners:
+            listener.disconnect()
+
+    def _queue_deleted(self, collection, doc_id):
+        """Queue a deleted notification"""
+        assert isinstance(doc_id, tuple)
+        context = get_current_context()
+        if context is not None:
+            log.debug("Queued delete for %s %s", collection, doc_id)
+            # FIXME: use a set here.  why use a dictionary when the key is the
+            #        same as the value?
+            key = (self, collection, doc_id)
+            context.deleted[key] = (self, collection, doc_id)
+        else:
+            self._call_deleted(collection, doc_id)
+
+    def _queue_added(self, collection, doc_id):
+        """Queue an added notification"""
+        assert isinstance(doc_id, tuple)
+        context = get_current_context()
+        if context is not None:
+            log.debug("Queued add for %s %s", collection, doc_id)
+            # FIXME: use a set here.  why use a dictionary when the key is the
+            #        same as the value?
+            key = (self, collection, doc_id)
+            context.added[key] = (self, collection, doc_id)
+        else:
+            self._call_added(collection, doc_id)
+
+    def _queue_changed(self, collection, doc_id):
+        """Queue a changed notification"""
+        assert isinstance(doc_id, tuple)
+        context = get_current_context()
+        if context is not None:
+            log.debug("Queued change for %s %s", collection, doc_id)
+            # FIXME: use a set here.  why use a dictionary when the key is the
+            #        same as the value?
+            key = (self, collection, doc_id)
+            context.changed[key] = (self, collection, doc_id)
+        else:
+            self._call_changed(collection, doc_id)
+
+    def _call_deleted(self, collection, doc_id):
+        self.deleted(collection, doc_id)
+        # TODO: signals
+
+    def _call_added(self, collection, doc_id):
+        doc = collection.dump_id(doc_id)
+        doc = self._fix_document(doc)
+        self.added(collection, doc_id, doc)
+        # TODO: signals
+
+    def _call_changed(self, collection, doc_id):
+        doc = collection.dump_id(doc_id)
+        doc = self._fix_document(doc)
+        self.changed(collection, doc_id, doc)
+        # TODO: signals
+
+
+    # Implement these for your backend. The code above will call these with
+    # ORM updates you need to commit to the backend.
+
+    def deleted(self, collection, doc_id):
+        """Called when the root object for a document was deleted in the ORM"""
+        raise NotImplementedError()
+
+    def added(self, collection, doc_id, doc):
+        """Called when the root object for a document was added in the ORM"""
+        raise NotImplementedError()
+
+    def changed(self, collection, doc_id, doc):
+        """Called when any data in a document was changed in the ORM"""
+        raise NotImplementedError()
+
+    def get_doc(self, collection, doc_id):
+        """Fetch data from a collection (if supported by the backend))"""
+        raise NotImplementedError()
+
+    def sync_collection(self, collection):
+        """Sync all data in the collections from the ORM to the backend
+
+        This does not make sense for all backends.
+        """
+        raise NotImplementedError()
+

File spiner/denormalize/backends/locmem.py

View file
+import time
+import threading
+from .base import BackendBase
+
+import logging
+log = logging.getLogger(__name__)
+
+class SyncInProgress(RuntimeError):
+    pass
+
+
+class LocMemBackend(BackendBase):
+    # All documents are stored in this dict
+    # data[collection_name][object_id] = document
+    data = None
+
+    # Used to keep collections that are currently being synced consistent,
+    # even if an update happens during dump_collection.
+    # WARNING: This still does not cover all possible race conditions,
+    #          especially not if database transactions are enabled!
+    _dirty = None
+    _lock = None
+
+    def __init__(self, name=None, listener_class=None):
+        super(LocMemBackend, self).__init__(name=name,
+                                            listener_class=listener_class)
+        self.data = {}
+        self._dirty = {}
+        self._lock = threading.Lock()
+
+    def deleted(self, collection, doc_id):
+        log.debug('deleted: %s %s', collection.name, doc_id)
+        with self._lock:
+            coldata = self.data.setdefault(collection.name, {})
+            try:
+                del coldata[doc_id]
+            except KeyError:
+                pass
+            else:
+                self._mark_dirty(collection.name, doc_id)
+
+    def added(self, collection, doc_id, doc):
+        log.debug('added: %s %s', collection.name, doc_id)
+        with self._lock:
+            coldata = self.data.setdefault(collection.name, {})
+            coldata[doc_id] = doc
+            self._mark_dirty(collection.name, doc_id)
+
+    def changed(self, collection, doc_id, doc):
+        log.debug('changed: %s %s', collection.name, doc_id)
+        with self._lock:
+            coldata = self.data.setdefault(collection.name, {})
+            coldata[doc_id] = doc
+            self._mark_dirty(collection.name, doc_id)
+
+    def _mark_dirty(self, collection_name, doc_id):
+        assert isinstance(doc_id, tuple)
+        if collection_name in self._dirty:
+            try:
+                self._dirty[collection_name].add(doc_id)
+            except KeyError:
+                pass
+
+    def get_doc(self, collection, doc_id):
+        if not isinstance(doc_id, tuple):
+            doc_id = (doc_id,)
+        return self.data[collection.name][doc_id]
+
+    _sync_collection_before_handling_dirty = None # method
+
+    def sync_collection(self, collection):
+        log.info("Starting full sync for collection %s", collection.name)
+        t0 = time.time()
+
+        # TODO: there is a slight race condition here
+        if collection.name in self._dirty:
+            log.error("There is already a full sync running for collection %s!",
+                collection.name)
+            raise SyncInProgress("There is already a sync in progress")
+
+        docs = {}
+        has_lock = False
+        try:
+            # Lock collection for sync
+            # This will make all the signal handlers register all changes
+            # crom now on. We want updates to continue, because this can
+            # take a long time, and we do not want to block other code.
+            self._dirty[collection.name] = set()
+            for doc in collection.dump_collection():
+                doc_id = tuple([doc[pk.accessor] for pk in
+                               collection.model.primary_keys()])
+                docs[doc_id] = doc
+
+            # Used for testing only
+            if self._sync_collection_before_handling_dirty is not None:
+                self._sync_collection_before_handling_dirty()
+
+            # Now handle the changes that occurred during the full dump.
+            # We will acquire a long for this update, as it only touches
+            # local memory and will be very fast.
+            dirty_ids = self._dirty[collection.name]
+            log.info("%i dirty objects for collection %s",
+                len(dirty_ids), collection.name)
+
+            # Lock the data during this update
+            has_lock = True
+            self._lock.acquire()
+
+            for doc_id in dirty_ids:
+                # Patch up new data with updated local data
+                doc = self.data[collection.name].get(doc_id, None)
+                if doc:
+                    docs[doc_id] = doc
+                elif doc_id in docs:
+                    del docs[doc_id]
+
+            # Now overwrite the current collection data
+            self.data[collection.name] = docs
+
+        finally:
+            # Unlock
+            del self._dirty[collection.name]
+            if has_lock:
+                self._lock.release()
+            t1 = time.time()
+            log.info("Full sync for collection %s completed in %.3fs",
+                collection.name, t1-t0)
+

File spiner/denormalize/listeners.py

View file
+from collections import defaultdict
+import logging
+log = logging.getLogger(__name__)
+
+class CollectionListener(object):
+    """Helps shuttle changes made to a relational database into to a backend
+    queue.
+
+    When a `DocumentCollection` is registered with a backend, the backend
+    creates a `CollectionListener` for each model on which the
+    collection is dependent.  The `CollectionListener` provides a set of change
+    callbacks -- pre- and post- insert, update, and delete -- which handle the
+    logic of finding instances of the collection which should regenerated new
+    documents in the backend.
+
+    A `CollectionListener` sub-class should call these callbacks based on
+    change notifications from the database, for example, via an ORM's
+    signal/event system.
+    """
+    def __init__(self, backend, collection, model, filter_paths):
+        """
+        :param backend: the backend in which to queue changes
+        :type backend: denormalize.backend.base.BackendBase
+        :type collection: denormalize.models.DocumentCollection
+        :param model: dependency model to watch for changes
+        :type model: denormalize.orms.ModelInspector
+        :param filter_paths: chain of relations from `collection.model` to
+            `model`
+        :type filter_paths: list of tuples of denormalize.orms.RelationProperty
+        """
+        self.backend = backend
+        self.collection = collection
+        self.model = model
+        if self.collection.model == self.model:
+            self.is_root_model = True
+            self.filter_paths = [x for x in filter_paths if x != ()]
+        else:
+            self.is_root_model = False
+            self.filter_paths = filter_paths
+        self._cache = {'update': defaultdict(set),
+                       'delete': defaultdict(set)}
+
+    def connect(self):
+        """connect the listener to the necessary signals/events"""
+        raise NotImplementedError()
+
+    def disconnect(self):
+        """disconnect the listener from any signals/events setup by
+        `connect()`"""
+        raise NotImplementedError()
+
+    def _get_affected_instance_ids(self, primary_key, add_self=False):
+        """"Find all affected root model instance ids, given the primary key
+        of a related model.
+        """
+        # if book1 contains book2 through some relation (e.g.
+        # publisher/books),  when book2.id changes, we always add its
+        # primary_key to the `affected` set, since the book2 document will
+        # need to be updated. this is handled in each callback...
+
+        # ... we also issue a query to find Books related to book2 along
+        # a specific join path, which will find book1.
+        # FIXME: taking collection.queryset out of the equation for now
+        #        since it is not cross-orm compatible
+        affected = self.collection.model.get_objects_affected_by(
+            self.filter_paths, self.model, primary_key)
+
+        if add_self and self.is_root_model:
+            if isinstance(primary_key, (list, set)):
+                affected.update(primary_key)
+            else:
+                affected.add(primary_key)
+        return affected
+
+    def _set_affected(self, ns, primary_key, affected_set):
+        """Used in pre_* handlers. Annotates an instance with
+        the ids of the root objects it affects, so that we can include
+        these in the final updates in the post_* handlers.
+        """
+        self._cache[ns][primary_key].update(affected_set)
+
+    def _get_affected(self, ns, primary_key):
+        """See _set_affected for an explanation."""
+        return self._cache[ns].pop(primary_key, set())
+
+    # -- callbacks
+
+    def pre_update(self, primary_key):
+        """
+        :param primary_key: primary key of the watched model
+        :type primary_key: data type of the model's primary key, or a tuple
+            of values, if the key is compound
+        """
+        log.debug('pre_update: collection %s, %s with id %s',
+                  self.collection.name, self.model.name, primary_key)
+
+        affected = self._get_affected_instance_ids(primary_key)
+
+        log.debug("pre_update:   affected documents: %r", affected)
+
+        # Instead of calling the update functions directly, we make
+        # sure it is queued for after the operation. Otherwise
+        # the changes are not reflected in the database yet.
+        self._set_affected('update', primary_key, affected)
+
+    def post_update(self, primary_key):
+        log.debug('post_update: collection %s, %s with id %s',
+                  self.collection.name, self.model.name, primary_key)
+
+        affected = self._get_affected_instance_ids(primary_key, add_self=True)
+
+        log.debug("post_update:   affected documents: %r", affected)
+
+        # This can be passed by the pre_save handler
+        extra_affected = self._get_affected('update', primary_key)
+        if extra_affected:
+            affected.update(extra_affected)
+
+        for doc_id in self.collection.map_affected(affected):
+            self.backend._queue_changed(self.collection, doc_id)
+
+    def pre_insert(self, primary_key):
+        # Not used, does not need to be called/connected
+        pass
+
+    def post_insert(self, primary_key):
+        log.debug('post_insert: collection %s, %s  with id %s',
+                  self.collection.name, self.model.name, primary_key)
+
+        affected = self._get_affected_instance_ids(primary_key, add_self=True)
+
+        for doc_id in self.collection.map_affected(affected):
+            self.backend._queue_added(self.collection, doc_id)
+
+    def pre_delete(self, primary_key):
+        log.debug('pre_delete: collection %s, %s with id %s being deleted',
+                  self.collection.name, self.model.name, primary_key)
+
+        affected = self._get_affected_instance_ids(primary_key)
+
+        log.debug("pre_delete:   affected documents: %r", affected)
+
+        # Instead of calling the update functions directly, we make
+        # sure it is queued for after the operation. Otherwise
+        # the changes are not reflected in the database yet.
+        self._set_affected('delete', primary_key, affected)
+
+    def post_delete(self, primary_key):
+        log.debug('post_delete: collection %s, %s with id %s was deleted',
+                  self.collection.name, self.model.name, primary_key)
+
+        affected = self._get_affected('delete', primary_key)
+
+        if self.is_root_model:
+            # if `primary_key` represents an instance of the root model which
+            # has been deleted, queue its deletion from the backend
+            if isinstance(primary_key, tuple):
+                primary_key = set(primary_key)
+            for doc_id in self.collection.map_affected(primary_key):
+                self.backend._queue_deleted(self.collection, doc_id)
+
+        # if any dependent model instance have been deleted, queue a change
+        # of the affected root models in the backend
+        for doc_id in self.collection.map_affected(affected):
+            self.backend._queue_changed(self.collection, doc_id)

File spiner/denormalize/listeners/__init__.py

Empty file removed.

File spiner/denormalize/listeners/base.py

-from collections import defaultdict
-import logging
-log = logging.getLogger(__name__)
-
-class CollectionListener(object):
-    """Helps shuttle changes made to a relational database into to a backend
-    queue.
-
-    When a `DocumentCollection` is registered with a backend, the backend
-    creates a `CollectionListener` for each model on which the
-    collection is dependent.  The `CollectionListener` provides a set of change
-    callbacks -- pre- and post- insert, update, and delete -- which handle the
-    logic of finding instances of the collection which should regenerated new
-    documents in the backend.
-
-    A `CollectionListener` sub-class should call these callbacks based on
-    change notifications from the database, for example, via an ORM's
-    signal/event system.
-    """
-    def __init__(self, backend, collection, model, filter_paths):
-        """
-        :param backend: the backend in which to queue changes
-        :type backend: denormalize.backend.base.BackendBase
-        :type collection: denormalize.models.DocumentCollection
-        :param model: dependency model to watch for changes
-        :type model: denormalize.orms.ModelInspector
-        :param filter_paths: chain of relations from `collection.model` to
-            `model`
-        :type filter_paths: list of tuples of denormalize.orms.RelationProperty
-        """
-        self.backend = backend
-        self.collection = collection
-        self.model = model
-        if self.collection.model == self.model:
-            self.is_root_model = True
-            self.filter_paths = [x for x in filter_paths if x != ()]
-        else:
-            self.is_root_model = False
-            self.filter_paths = filter_paths
-        self._cache = {'update': defaultdict(set),
-                       'delete': defaultdict(set)}
-
-    def connect(self):
-        """connect the listener to the necessary signals/events"""
-        raise NotImplementedError()
-
-    def disconnect(self):
-        """disconnect the listener from any signals/events setup by
-        `connect()`"""
-        raise NotImplementedError()
-
-    def _get_affected_instance_ids(self, primary_key, add_self=False):
-        """"Find all affected root model instance ids, given the primary key
-        of a related model.
-        """
-        # if book1 contains book2 through some relation (e.g.
-        # publisher/books),  when book2.id changes, we always add its
-        # primary_key to the `affected` set, since the book2 document will
-        # need to be updated. this is handled in each callback...
-
-        # ... we also issue a query to find Books related to book2 along
-        # a specific join path, which will find book1.
-        # FIXME: taking collection.queryset out of the equation for now
-        #        since it is not cross-orm compatible
-        affected = self.collection.model.get_objects_affected_by(
-            self.filter_paths, self.model, primary_key)
-
-        if add_self and self.is_root_model:
-            if isinstance(primary_key, (list, set)):
-                affected.update(primary_key)
-            else:
-                affected.add(primary_key)
-        return affected
-
-    def _set_affected(self, ns, primary_key, affected_set):
-        """Used in pre_* handlers. Annotates an instance with
-        the ids of the root objects it affects, so that we can include
-        these in the final updates in the post_* handlers.
-        """
-        self._cache[ns][primary_key].update(affected_set)
-
-    def _get_affected(self, ns, primary_key):
-        """See _set_affected for an explanation."""
-        return self._cache[ns].pop(primary_key, set())
-
-    # -- callbacks
-
-    def pre_update(self, primary_key):
-        """
-        :param primary_key: primary key of the watched model
-        :type primary_key: data type of the model's primary key, or a tuple
-            of values, if the key is compound
-        """
-        log.debug('pre_update: collection %s, %s with id %s',
-                  self.collection.name, self.model.name, primary_key)
-
-        affected = self._get_affected_instance_ids(primary_key)
-
-        log.debug("pre_update:   affected documents: %r", affected)
-
-        # Instead of calling the update functions directly, we make
-        # sure it is queued for after the operation. Otherwise
-        # the changes are not reflected in the database yet.
-        self._set_affected('update', primary_key, affected)
-
-    def post_update(self, primary_key):
-        log.debug('post_update: collection %s, %s with id %s',
-                  self.collection.name, self.model.name, primary_key)
-
-        affected = self._get_affected_instance_ids(primary_key, add_self=True)
-
-        log.debug("post_update:   affected documents: %r", affected)
-
-        # This can be passed by the pre_save handler
-        extra_affected = self._get_affected('update', primary_key)
-        if extra_affected:
-            affected.update(extra_affected)
-
-        for doc_id in self.collection.map_affected(affected):
-            self.backend._queue_changed(self.collection, doc_id)
-
-    def pre_insert(self, primary_key):
-        # Not used, does not need to be called/connected
-        pass
-
-    def post_insert(self, primary_key):
-        log.debug('post_insert: collection %s, %s  with id %s',
-                  self.collection.name, self.model.name, primary_key)
-
-        affected = self._get_affected_instance_ids(primary_key, add_self=True)
-
-        for doc_id in self.collection.map_affected(affected):
-            self.backend._queue_added(self.collection, doc_id)
-
-    def pre_delete(self, primary_key):
-        log.debug('pre_delete: collection %s, %s with id %s being deleted',
-                  self.collection.name, self.model.name, primary_key)
-
-        affected = self._get_affected_instance_ids(primary_key)
-
-        log.debug("pre_delete:   affected documents: %r", affected)
-
-        # Instead of calling the update functions directly, we make
-        # sure it is queued for after the operation. Otherwise
-        # the changes are not reflected in the database yet.
-        self._set_affected('delete', primary_key, affected)
-
-    def post_delete(self, primary_key):
-        log.debug('post_delete: collection %s, %s with id %s was deleted',
-                  self.collection.name, self.model.name, primary_key)
-
-        affected = self._get_affected('delete', primary_key)
-
-        if self.is_root_model:
-            # if `primary_key` represents an instance of the root model which
-            # has been deleted, queue its deletion from the backend
-            if isinstance(primary_key, tuple):
-                primary_key = set(primary_key)
-            for doc_id in self.collection.map_affected(primary_key):
-                self.backend._queue_deleted(self.collection, doc_id)
-
-        # if any dependent model instance have been deleted, queue a change
-        # of the affected root models in the backend
-        for doc_id in self.collection.map_affected(affected):
-            self.backend._queue_changed(self.collection, doc_id)

File spiner/denormalize/listeners/django.py

-from __future__ import absolute_import
-from .base import CollectionListener
-from django.db.models import signals
-import logging
-log = logging.getLogger(__name__)
-
-
-class DjangoCollectionListener(CollectionListener):
-    def connect(self):
-        model = self.model._model
-        signals.post_save.connect(self._post_save, sender=model)
-        signals.post_delete.connect(self._post_delete, sender=model)
-        if self.model.foreign_keys():
-            # FIXME: we can do better: check the foreign keys against the
-            # filter paths
-            signals.pre_save.connect(self._pre_save, sender=model)
-            signals.pre_delete.connect(self._pre_delete, sender=model)
-
-        # M2M handling
-        for filter_path in self.filter_paths:
-            if filter_path and filter_path[-1].association_model:
-                through_model = filter_path[-1].association_model._model
-                signals.m2m_changed.connect(self._m2m_changed,
-                                            sender=through_model)
-
-    def disconnect(self):
-        model = self.model._model
-        signals.post_save.disconnect(self._post_save, sender=model)
-        signals.post_delete.disconnect(self._post_delete, sender=model)
-        if self.model.foreign_keys():
-            signals.pre_save.disconnect(self._pre_save, sender=model)
-            signals.pre_delete.disconnect(self._pre_delete, sender=model)
-
-        # M2M handling
-        for filter_path in self.filter_paths:
-            if filter_path and filter_path[-1].association_model:
-                through_model = filter_path[-1].association_model._model
-                signals.m2m_changed.disconnect(self._m2m_changed,
-                                               sender=through_model)
-
-    # -- callbacks
-
-    def _pre_save(self, sender, instance, raw, **kwargs):
-        # Used to detect FK changes
-        created_guess = not instance.pk
-
-        # From the Django docs: "True if the model is saved exactly as
-        # presented (i.e. when loading a fixture). One should not
-        # query/modify other records in the database as the database
-        # might not be in a consistent state yet."
-        if raw:
-            log.warn("pre_save: raw=True, so no document sync performed!")
-            return
-
-        # We only need this to monitor FK *changes*, so no need to do
-        # anything for new objects.
-        if created_guess:
-            log.debug("pre_save:   looks new, no action needed")
-            # note needed:
-            # self.post_insert()
-            return
-
-        self.pre_update((instance.pk,))
-
-    def _post_save(self, sender, instance, created, raw, **kwargs):
-        # From the Django docs: "True if the model is saved exactly as
-        # presented (i.e. when loading a fixture). One should not
-        # query/modify other records in the database as the database
-        # might not be in a consistent state yet."
-        if raw:
-            log.warn("post_save: raw=True, so no document sync performed!")
-            return
-
-        if created:
-            self.post_insert((instance.pk,))
-        else:
-            self.post_update((instance.pk,))
-
-    def _pre_delete(self, sender, instance, **kwargs):
-        self.pre_delete((instance.pk,))
-
-    def _post_delete(self, sender, instance, **kwargs):
-        self.post_delete((instance.pk,))
-
-    def _m2m_changed(self, sender, instance, action, reverse, model, pk_set,
-                     **kwargs):
-        """
-        :param sender: The intermediate model class describing the
-            ManyToManyField. This class is automatically created when a
-            many-to-many field is defined; you can access it using the through
-            attribute on the many-to-many field.
-        :param instance: The instance whose many-to-many relation is updated.
-            This can be an instance of the sender, or of the class the
-            ManyToManyField is related to.
-        :param action: A string indicating the type of update that is done on
-            the relation.
-        :param reverse: Indicates which side of the relation is updated (i.e.,
-            if it is the forward or reverse relation that is being modified).
-        :param model: The class of the objects that are added to, removed from
-            or cleared from the relation.
-            e.g. `Book` in `category.books.add(book)`
-        :param pk_set: For the pre_add, post_add, pre_remove and post_remove
-            actions, this is a set of primary key values that have been added
-            to or removed from the relation. For the pre_clear and post_clear
-            actions, this is None.
-
-        For example, given::
-
-            category.books.add(book1)
-            category.books.add(book2)
-
-        The function arguments would be:
-            - instance: category
-            - model: Book
-            - pk_set: set([book1.pk, book2.pk])
-            - reverse: False
-
-        Given::
-
-            book1.category_set.add(category)
-
-        The function arguments would be:
-            - instance: book1
-            - model: Category
-            - pk_set: set([category.pk])
-            - reverse: True?
-        """
-        # An m2m change affects both sides: if an Author is added to a Book,
-        # the Author will also have a new book on its side. It does not
-        # matter however in which direction we are looking at this relation,
-        # since any check on the instance passed will lead us to the root
-        # object that must be marked as changed.
-
-        # Opposite side of `model`
-        instance_model = instance.__class__
-
-        # FIXME: debug level
-        log.debug('m2m_changed: collection %s, %s with %s.id=%s m2m '
-            '%s on %s side (reverse=%s), pk_set=%s',
-            self.collection.name, self.model.name, instance_model.__name__,
-            instance.pk, action, model.__name__, reverse, pk_set)
-
-        # Optimization: if either side is our root object, send out changes
-        # without querying.
-        if instance_model is self.collection.model._model:
-            self.backend._queue_changed(self.collection, (instance.pk,))
-            return
-
-        elif model is self.collection.model._model and pk_set:
-            for pk in pk_set:
-                self.backend._queue_changed(self.collection, (pk,))
-            return
-
-        # Otherwise figure out which side is equal to the model we are
-        # registering handlers for, and use that for querying.
-        if instance_model is self.model._model:
-            affected = self._get_affected_instance_ids((instance.pk,))
-
-        elif model is self.model._model and pk_set:
-            # FIXME: this is wrong! setting tags does not affect all other
-            #        book that have the same tag!
-            pk_set = set([(x,) for x in pk_set])
-            affected = self._get_affected_instance_ids(pk_set)
-        else:
-            return
-
-        for doc_id in self.collection.map_affected(affected):
-            self.backend._queue_changed(self.collection, doc_id)
-
-
-        # FIXME: how to handle the clear signals (pre and post) in this form?
-        # Tag (publisher__tags) with Publisher.id=1 m2m pre_clear on Tag side (reverse=False), pk_set=None
-        # We don't have any ID for Tag, only for the other side
-        # --> strip last item from filter path
-
-        # The type of action does not matter, we simply do a lookup for
-        # given model and
-        #affected = getattr(instance, '_denormalize_m2m_affected', set())
-        #if filter_path:
-        #    for doc_id in collection.map_affected(affected):
-        #        backend._queue_changed(collection, doc_id)
-        #else:
-        #    for doc_id in collection.map_affected(set([instance.id])):
-        #        backend._queue_deleted(collection, doc_id)

File spiner/denormalize/listeners/sqlalchemy.py

-from __future__ import absolute_import
-from .base import CollectionListener
-from sqlalchemy import event
-import logging
-log = logging.getLogger(__name__)
-
-
-class SqlAlchemyCollectionListener(CollectionListener):
-    """
-    Collection listener that is bound to SqlAlchemy events.
-
-    These events will only be fired on changes that happen in-process, through
-    SqlAlchemy models, and won't pay attention to events that occur externally.
-    """
-    def connect(self):
-        model = self.model._model
-        event.listen(model, 'after_update', self._after_update)
-        event.listen(model, 'after_insert', self._after_insert)
-        event.listen(model, 'after_delete', self._after_delete)
-        if self.model.foreign_keys():
-            # FIXME: we can do better: check the foreign keys against the
-            # filter paths
-            event.listen(model, 'before_update', self._before_update)
-            event.listen(model, 'before_delete', self._before_delete)
-
-        # Does not seem necessary:
-        # M2M handling
-        # if self.relation and self.relation.association_model is not None:
-        #     event.listen(self.relation.obj, 'append', self.m2m_changed)
-
-    def disconnect(self):
-        model = self.model._model
-        event.remove(model, 'after_update', self._after_update)
-        event.remove(model, 'after_insert', self._after_insert)
-        event.remove(model, 'after_delete', self._after_delete)
-
-        if self.model.foreign_keys():
-            event.remove(model, 'before_update', self._before_update)
-            event.remove(model, 'before_delete', self._before_delete)
-
-    # -- callbacks
-
-    def _before_update(self, mapper, connection, instance):
-        self.pre_update(self.model.get_primary_key_value(instance))
-
-    def _after_update(self, mapper, connection, instance):
-        self.post_update(self.model.get_primary_key_value(instance))
-
-    def _after_insert(self, mapper, connection, instance):
-        self.post_insert(self.model.get_primary_key_value(instance))
-
-    def _before_delete(self, mapper, connection, instance):
-        self.pre_delete(self.model.get_primary_key_value(instance))
-
-    def _after_delete(self, mapper, connection, instance):
-        self.post_delete(self.model.get_primary_key_value(instance))
-
-    def _m2m_changed(self, mapper, connection, instance):
-        print "sql m2m", mapper, instance

File spiner/inspectors.py

View file
+from fnmatch import fnmatch
+import logging
+
+CARDINALITIES = ('one-to-one', 'one-to-many', 'many-to-one', 'many-to-many',
+                 '*-to-one', '*-to-many', 'many-to-*', 'one-to-*', '*')
+
+log = logging.getLogger(__name__)
+
+
+class Skip(Exception):
+    """Skip serializing a value"""
+    pass
+
+
+class MultipleValues(dict):
+    """Used to rename keys, or split data into multiple keys"""
+    pass
+
+
+class ModelInstancePair(object):
+    """Used to store a model instance result from `ModelInspector.get_value`"""
+    def __init__(self, model, instance):
+        """
+        :param model: the model
+        :type model: `ModelInspector`
+        :param instance: instance of the model
+        """
+        self.model = model
+        self.instance = instance
+
+    def __repr__(self):
+        return str((self.model, self.instance))
+
+
+# TODO: protect all attributes behind read-only @property
+class Property(object):
+    """Base property class"""
+    def __init__(self, native_obj, name, model, accessor=None):
+        self.model = model
+        self.obj = native_obj
+        self.name = name
+        self.accessor = accessor if accessor else name
+
+    @property
+    def fullname(self):
+        return '%s.%s' % (self.model.name, self.accessor)
+
+    @property
+    def attr(self):
+        """return the value of the class attribute on the model, as accessed
+        by this property's accessor.
+
+        usually returns a python descriptor object
+        """
+        # NOTE: auto-created django id fields do not have a descriptor:
+        # the 'id' accessor only works from an instance. It is up to the user
+        # to deal with the resulting AttributeError
+        return getattr(self.model._model, self.accessor)
+
+    def __repr__(self):
+        return '%s(<%s>)' % (self.__class__.__name__, self.fullname)
+
+    def __eq__(self, other):
+        return (self.obj == other.obj) and (self.model == other.model)
+
+    def __hash__(self):
+        return hash(self.obj)
+
+
+class FieldProperty(Property):
+    """Intermediate representation of a database field (aka column)"""
+    def __init__(self, native_obj, name, model, accessor=None,
+                 is_primary_key=False, foreign_key_relation=None):
+        super(FieldProperty, self).__init__(native_obj, name, model, accessor)
+        self.is_primary_key = is_primary_key
+        self.foreign_key_relation = foreign_key_relation
+
+    def is_field(self):
+        return True
+
+    def is_relation(self):
+        return False
+
+    @property
+    def is_foreign_key(self):
+        return self.foreign_key_relation is not None
+
+
+class RelationProperty(Property):
+    """Intermediate representation of a relationship between two models
+
+    Conceptually a relationship is shared between two models, but it is
+    important to understand that the the values on this class are specific to
+    one model (the one that returns it).  Consider a one-to-many relationship:
+    a `RelationProperty` returned from one side of this relationship will have
+    a 'has one' cardinality, while the same relationship accessed from the
+    other side will have a 'has many' cardinality.
+    """
+    def __init__(self, native_obj, name, cardinality, model, related_model,
+                 is_direct, association_model=None, foreign_key_field=None,
+                 accessor=None):
+        """
+        :param native_obj: orm-specfic object representing the relationship
+        :param name: name of the relationship
+        :type name: str
+        :param cardinality: type of relationship
+        :type cardinality: str {'one-to-one', 'one-to-many', 'many-to-one',
+            'many-to-many', '*-to-one', '*-to-many', 'many-to-*', 'one-to-*',
+            '*'}
+        :param model: the model from which this relationship was returned
+        :type model: `ModelInspector`
+        :param related_model: the model on the other side of this relationship
+        :type related_model: `ModelInspector`
+        :param is_direct: whether `model` is the owner of this relationship. if
+            False, `related_model` is the owner
+        :type is_direct: bool
+        :param association_model: if the relationship is 'many-to-many', the
+            model used to relate `model` to the `related_model`
+        :param foreign_key_field: if the relationship is a foreign key, the
+            name of the field holding that id
+        :type foreign_key_field: str
+        :param accessor: the attribute name on the model class used to access
+            the relationship's value.  if None, defaults to `name`
+        :type accessor: str
+        """
+        super(RelationProperty, self).__init__(native_obj, name, model,
+                                               accessor)
+        assert related_model is not None
+        self.related_model = related_model
+        self.is_direct = is_direct
+        self.association_model = association_model
+        self.cardinality = cardinality
+        self.foreign_key_field = foreign_key_field
+
+    def __repr__(self):
+        return '%s(<%s>)' % (self.__class__.__name__, self.fullname)
+
+    def has_one(self):
+        return self.cardinality.endswith('-to-one')
+
+    def has_many(self):
+        return self.cardinality.endswith('-to-many')
+
+    def is_field(self):
+        return False
+
+    def is_relation(self):
+        return True
+
+    @property
+    def is_foreign_key(self):
+        return self.foreign_key_field is not None
+
+class ModelInspector(object):
+    """Provides a common interface for inspecting an ORM model class"""
+    _inspectors = []
+    collection_listener = None
+
+    def __init__(self, model):
+        self._model = model
+        self._fields = None
+        self._relations = None
+
+    def __repr__(self):
+        return "{0}({1})".format(self.__class__.__name__, repr(self._model))
+
+    @classmethod
+    def register(cls):
+        """register a `ModelInspector` sub-class with the inspection system"""
+        cls._inspectors.append(cls)
+
+    @property
+    def name(self):
+        """name of the model"""
+        return self._model.__name__
+
+    @property
+    def table_name(self):
+        """name of the table that the model maps to"""
+        raise NotImplementedError()
+
+    def is_compatible_instance(self, obj):
+        """whether the passed `obj` is compatible with the model stored on the
+        inspector instance"""
+        return isinstance(obj, self._model)
+
+    @staticmethod
+    def is_compatible_class(klass):
+        """whether the passed `klass` is compatible with the model stored on
+        the inspector instance"""
+        raise NotImplementedError()
+
+    def properties(self):
+        return self.fields() + \
+            self.relations()
+
+    def properity_map(self, by_accessor=True):
+        """dictionary of field name to `FieldProperty`
+
+        :param by_accessor: whether the property's name or model accessor
+            should be used as the key in the dictionary.
+        :type by_accessor: bool
+        :rtype: dict
+        """
+        props = self.properties()
+        if by_accessor:
+            return dict((prop.accessor, prop) for prop in props)
+        else:
+            return dict((prop.name, prop) for prop in props)
+
+    def iter_fields(self):
+        """iterate over the fields (aka columns) on the model
+
+        :rtype: iterator of `FieldProperty`
+        """
+        raise NotImplementedError()
+
+    def fields(self, foreign_keys=True):
+        """list the fields (aka columns) on the model
+
+        :param foreign_keys: whether to include foreign keys in the list of
+            fields
+        :type foreign_keys: bool
+        :rtype: list of `FieldProperty`
+        """
+        if self._fields is None:
+            self._fields = list(self.iter_fields())
+        return [field for field in self._fields
+                if foreign_keys or not field.is_foreign_key]
+
+    def foreign_keys(self):
+        """list the foreign key fields on the model
+
+        :rtype: list of `FieldProperty`
+        """
+        if self._fields is None:
+            self._fields = list(self.iter_fields())
+        return [field for field in self._fields if field.is_foreign_key]
+
+    def primary_keys(self):
+        """Get a list of primary key fields
+
+        :rtype: list of `FieldProperty`
+        """
+        # we want to make sure the order is right, so can't rely on
+        # iter_fields
+        raise NotImplementedError()
+
+    def field_map(self, by_accessor=True, **kwargs):
+        """dictionary of field name to `FieldProperty`
+
+        :param by_accessor: whether the fields's name or model accessor
+            should be used as the key in the dictionary.
+        :type by_accessor: bool
+        :rtype: dict
+        """
+        fields = self.fields(**kwargs)
+        if by_accessor:
+            return dict((field.accessor, field) for field in fields)
+        else:
+            return dict((field.name, field) for field in fields)
+
+    def iter_relations(self):
+        """iterate over the relationships on the model
+
+        :rtype: iterator of `RelationProperty`
+        """
+        raise NotImplementedError()
+
+    def relations(self, cardinality='*'):
+        """list the relationships on the model
+
+        :rtype: list of `RelationProperty`
+        """
+        if cardinality not in CARDINALITIES:
+            raise TypeError("cardinality must be one of %s"
+                            % ', '.join([repr(x) for x in CARDINALITIES]))
+
+        if self._relations is None:
+            self._relations = list(self.iter_relations())
+        return [rel for rel in self._relations
+                if fnmatch(rel.cardinality, cardinality)]
+
+    def relation_map(self, by_accessor=True):
+        """dictionary of relation name to `RelationProperty`
+
+        :param by_accessor: whether the relation's name or model accessor
+            should be used as the key in the dictionary.
+        :type by_accessor: bool
+        :rtype: dict
+        """
+        rels = self.relations()
+        if by_accessor:
+            return dict((rel.accessor, rel) for rel in rels)
+        else:
+            return dict((rel.name, rel) for rel in rels)
+
+    def walk_relations(self):
+        """recursively walk relationships
+
+        :returns: an iterator of relation paths.  a relation path is a tuple of
+            nested relations rooted at the inspected model
+        :rtype: list of `RelationProperty`
+        """
+        parents = []
+        path = []
+        # we provide our own local iterator so that we can track parents
+        # to detect cycles
+
+        def it(model):
+            parents.append(model)
+            for rel in model.relations():
+                yield tuple(path + [rel])
+                submodel = rel.related_model
+                if submodel in parents:
+                    # break cycle
+                    continue
+                path.append(rel)
+                for subpath in it(submodel):
+                    yield subpath
+                path.pop(-1)
+            parents.pop(-1)
+
+        return it(self)
+
+    def get_value(self, instance, accessor):
+        """get the value of the property on the passed `instance`
+
+        :returns:
+            for 'has one' relationships: `ModelInstancePair`
+            for 'has many' relationships: list of `ModelInstancePair`
+            for fields: basic data type
+        """
+        raise NotImplementedError()
+
+    def get_primary_key_value(self, instance):
+        """get the value of the primary key
+
+        If the primary key is composed of multiple columns, this will return
+        a tuple.
+        """
+        raise NotImplementedError()
+
+    def __eq__(self, other):
+        return self._model == other._model
+
+    def __hash__(self):
+        return hash(self._model)
+
+
+def inspector(model):
+    """Find the appropriate `ModelInspector` sub-class and return an instance
+    wrapping the passed `model` class.
+    """
+    if isinstance(model, ModelInspector):
+        return model
+
+    # FIXME: more automated registration of inspectors/orms
+    try:
+        from . import django
+    except:
+        pass
+    try:
+        from . import sqlalchemy
+    except:
+        pass
+
+    for insp in ModelInspector._inspectors:
+        if insp.is_compatible_class(model):
+            return insp(model)
+    else:
+        raise ValueError("Could not find inspector for %r" % model)

File spiner/inspectors/__init__.py

Empty file removed.

File spiner/inspectors/base.py

-from fnmatch import fnmatch
-import logging
-
-CARDINALITIES = ('one-to-one', 'one-to-many', 'many-to-one', 'many-to-many',
-                 '*-to-one', '*-to-many', 'many-to-*', 'one-to-*', '*')
-
-log = logging.getLogger(__name__)
-
-
-class Skip(Exception):
-    """Skip serializing a value"""
-    pass
-
-
-class MultipleValues(dict):
-    """Used to rename keys, or split data into multiple keys"""
-    pass
-
-
-class ModelInstancePair(object):
-    """Used to store a model instance result from `ModelInspector.get_value`"""