Commits

Vladimir Mihailenco committed de1865e Merge

Merge with upstream

  • Participants
  • Parent commits c73e6ca, faacb50

Comments (0)

Files changed (5)

File django/db/backends/__init__.py

     supports_stddev = None
     can_introspect_foreign_keys = None
 
+    supports_batching = False
+
     def __init__(self, connection):
         self.connection = connection
 

File django/db/models/__init__.py

 from django.db import connection
 from django.db.models.loading import get_apps, get_app, get_models, get_model, register_models
 from django.db.models.query import Q
+from django.db.models.sql.subqueries import BatchOperation
+from django.db.models.sql.subqueries import BatchQuery
 from django.db.models.expressions import F
 from django.db.models.manager import Manager
 from django.db.models.base import Model

File django/db/models/base.py

     _deferred = False
 
     def __init__(self, *args, **kwargs):
-        self._entity_exists = kwargs.pop('__entity_exists', False)
+        self.__entity_exists = kwargs.pop('__entity_exists', False)
+
         signals.pre_init.send(sender=self.__class__, args=args, kwargs=kwargs)
 
         # Set up the storage for instance state
 
     save.alters_data = True
 
-    def save_base(self, raw=False, cls=None, origin=None, force_insert=False,
-            force_update=False, using=None):
-        """
-        Does the heavy-lifting involved in saving. Subclasses shouldn't need to
-        override this method. It's separate from save() in order to hide the
-        need for overrides of save() to pass around internal-only parameters
-        ('raw', 'cls', and 'origin').
-        """
-        using = using or router.db_for_write(self.__class__, instance=self)
-        entity_exists = bool(self._entity_exists and self._original_pk == self.pk)
-        connection = connections[using]
-        assert not (force_insert and force_update)
+    def get_parents_cascade(self, cls=None):
         if cls is None:
             cls = self.__class__
-            meta = cls._meta
-            if not meta.proxy:
-                origin = cls
+
+        while cls._meta.proxy:
+            cls =  cls._meta.proxy_for_model
+
+        groups = []
+
+        if cls._meta.parents:
+            for parent, field in cls._meta.parents.items():
+                # At this point, parent's primary key field may be unknown
+                # (for example, from administration form which doesn't fill
+                # this field). If so, fill it.
+                if (field and getattr(self, parent._meta.pk.attname) is None
+                        and getattr(self, field.attname) is not None):
+                    setattr(self, parent._meta.pk.attname, getattr(self, field.attname))
+
+                groups.extend(self.get_parents_cascade(cls=parent))
+
+            groups.append(cls._meta.parents)
+
+        return groups
+
+    def _set_entity_exists(self, value):
+        self.__entity_exists = value
+        self._original_pk = self.pk
+
+    def _get_entity_exists(self):
+        return bool(self.__entity_exists and self._original_pk == self.pk)
+
+    _entity_exists = property(_get_entity_exists, _set_entity_exists)
+
+    def pre_save(self, cls, raw=False, using=None, **kwargs):
+        if not cls._meta.auto_created:
+            signals.pre_save.send(sender=cls, instance=self, raw=raw, using=using, **kwargs)
+
+    def post_save(self, cls, created=True, raw=False, using=None, **kwargs):
+        if not cls._meta.auto_created:
+            signals.post_save.send(sender=cls, instance=self,
+                                   created=created, raw=raw, using=using, **kwargs)
+
+    def save_base(self, raw=False, cls=None, force_insert=False, force_update=False, using=None):
+        assert not (force_insert and force_update)
+
+        # cls is the model we are currently saving
+        # if cls is None then we are saving "real" model (not the parents)
+        if not cls:
+            cls = self.__class__
+            is_real_model = True
         else:
-            meta = cls._meta
+            is_real_model = False
 
-        if origin and not meta.auto_created:
-            signals.pre_save.send(sender=origin, instance=self, raw=raw, using=using)
+        # save original class to properly send signals for proxy models
+        origin = cls
+
+        while cls._meta.proxy:
+            cls =  cls._meta.proxy_for_model
+
+        using = using or router.db_for_write(origin, instance=self)
+        connection = connections[using]
+
+        self.pre_save(origin, raw=raw, using=using)
 
         # If we are in a raw save, save the object exactly as presented.
         # That means that we don't try to be smart about saving attributes
         # attributes we have been given to the class we have been given.
         # We also go through this process to defer the save of proxy objects
         # to their actual underlying model.
-        if not raw or meta.proxy:
-            if meta.proxy:
-                org = cls
+        if not raw and is_real_model:
+            # save ALL parents in right order once
+            for parents in self.get_parents_cascade(cls=cls):
+                for parent, field in parents.items():
+                    self.save_base(raw=raw, cls=parent, using=using)
+
+                    if field:
+                        setattr(self, field.attname, self._get_pk_val(parent._meta))
+
+        pk_val = self._get_pk_val(cls._meta)
+        pk_set = pk_val is not None
+
+        # TODO/NONREL: Some backends could emulate force_insert/_update
+        # with an optimistic transaction, but since it's costly we should
+        # only do it when the user explicitly wants it.
+        # By adding support for an optimistic locking transaction
+        # in Django (SQL: SELECT ... FOR UPDATE) we could even make that
+        # part fully reusable on all backends (the current .exists()
+        # check below isn't really safe if you have lots of concurrent
+        # requests. BTW, and neither is QuerySet.get_or_create).
+        if not connection.features.distinguishes_insert_from_update:
+            force_insert = True
+            force_update = False
+
+        manager = cls._base_manager
+        if pk_set and (force_update
+                       or (not force_insert and manager.using(using).filter(pk=pk_val).exists())):
+            record_exists = True
+
+            non_pks = [f for f in cls._meta.local_fields
+                       if not f.primary_key]
+
+            # It does already exist, so do an UPDATE.
+            if force_update or non_pks:
+                values = [(f, None, (raw and getattr(self, f.attname)
+                                     or f.pre_save(self, False)))
+                          for f in non_pks]
+                rows = manager.using(using).filter(pk=pk_val)._update(values)
+                if force_update and not rows:
+                    raise DatabaseError("Forced update did not affect any rows.")
+        else:
+            if not pk_set and force_update:
+                raise ValueError("Cannot force an update in save() with no primary key.")
+
+            if connection.features.distinguishes_insert_from_update:
+                record_exists = False
             else:
-                org = None
-            for parent, field in meta.parents.items():
-                # At this point, parent's primary key field may be unknown
-                # (for example, from administration form which doesn't fill
-                # this field). If so, fill it.
-                if field and getattr(self, parent._meta.pk.attname) is None and getattr(self, field.attname) is not None:
-                    setattr(self, parent._meta.pk.attname, getattr(self, field.attname))
+                record_exists = self._entity_exists
 
-                self.save_base(cls=parent, origin=org, using=using)
+            if cls._meta.order_with_respect_to:
+                # If this is a model with an order_with_respect_to
+                # autopopulate the _order field
+                field = cls._meta.order_with_respect_to
+                filter = {field.name: getattr(self, field.attname)}
+                order_value = manager.using(using).filter(**filter).count()
+                self._order = order_value
 
-                if field:
-                    setattr(self, field.attname, self._get_pk_val(parent._meta))
-            if meta.proxy:
-                return
+            if not pk_set:
+                if force_update:
+                    raise ValueError("Cannot force an update in save() with no primary key.")
+                values = [(f, f.get_db_prep_save(raw and getattr(self, f.attname)
+                                                 or f.pre_save(self, not record_exists), connection=connection))
+                          for f in cls._meta.local_fields
+                          if not isinstance(f, AutoField)]
+            else:
+                values = [(f, f.get_db_prep_save(raw and getattr(self, f.attname)
+                                                 or f.pre_save(self, not record_exists), connection=connection))
+                          for f in cls._meta.local_fields]
 
-        if not meta.proxy:
-            non_pks = [f for f in meta.local_fields if not f.primary_key]
+            update_pk = bool(cls._meta.has_auto_field and not pk_set)
+            if values:
+                result = manager._insert(values, return_id=update_pk, using=using)
+            else:
+                # Create a new record with defaults for everything.
+                result = manager._insert([(cls._meta.pk, connection.ops.pk_default_value())],
+                                         return_id=update_pk, raw_values=True, using=using)
 
-            # First, try an UPDATE. If that doesn't update anything, do an INSERT.
-            pk_val = self._get_pk_val(meta)
-            pk_set = pk_val is not None
-            record_exists = True
-            manager = cls._base_manager
-            # TODO/NONREL: Some backends could emulate force_insert/_update
-            # with an optimistic transaction, but since it's costly we should
-            # only do it when the user explicitly wants it.
-            # By adding support for an optimistic locking transaction
-            # in Django (SQL: SELECT ... FOR UPDATE) we could even make that
-            # part fully reusable on all backends (the current .exists()
-            # check below isn't really safe if you have lots of concurrent
-            # requests. BTW, and neither is QuerySet.get_or_create).
-            try_update = connection.features.distinguishes_insert_from_update
-            if not try_update:
-                record_exists = False
+            if update_pk:
+                setattr(self, cls._meta.pk.attname, result)
 
-            if try_update and pk_set:
-                # Determine whether a record with the primary key already exists.
-                if (force_update or (not force_insert and
-                        manager.using(using).filter(pk=pk_val).exists())):
-                    # It does already exist, so do an UPDATE.
-                    if force_update or non_pks:
-                        values = [(f, None, (raw and getattr(self, f.attname) or f.pre_save(self, False))) for f in non_pks]
-                        rows = manager.using(using).filter(pk=pk_val)._update(values)
-                        if force_update and not rows:
-                            raise DatabaseError("Forced update did not affect any rows.")
-                else:
-                    record_exists = False
-            if not pk_set or not record_exists:
-                if meta.order_with_respect_to:
-                    # If this is a model with an order_with_respect_to
-                    # autopopulate the _order field
-                    field = meta.order_with_respect_to
-                    order_value = manager.using(using).filter(**{field.name: getattr(self, field.attname)}).count()
-                    self._order = order_value
-
-                if connection.features.distinguishes_insert_from_update:
-                    add = True
-                else:
-                    add = not entity_exists
-
-                if not pk_set:
-                    if force_update:
-                        raise ValueError("Cannot force an update in save() with no primary key.")
-                    values = [(f, f.get_db_prep_save(raw and getattr(self, f.attname) or f.pre_save(self, add), connection=connection))
-                        for f in meta.local_fields if not isinstance(f, AutoField)]
-                else:
-                    values = [(f, f.get_db_prep_save(raw and getattr(self, f.attname) or f.pre_save(self, add), connection=connection))
-                        for f in meta.local_fields]
-
-                record_exists = False
-
-                update_pk = bool(meta.has_auto_field and not pk_set)
-                if values:
-                    # Create a new record.
-                    result = manager._insert(values, return_id=update_pk, using=using)
-                else:
-                    # Create a new record with defaults for everything.
-                    result = manager._insert([(meta.pk, connection.ops.pk_default_value())], return_id=update_pk, raw_values=True, using=using)
-
-                if update_pk:
-                    setattr(self, meta.pk.attname, result)
-            transaction.commit_unless_managed(using=using)
+        transaction.commit_unless_managed(using=using)
 
         # Store the database on which the object was saved
         self._state.db = using
         # Once saved, this is no longer a to-be-added instance.
         self._state.adding = False
+        # Once saved, this is no longer a to-be-added instance.
+        self._state.adding = False
+        self._entity_exists = True
 
-        # Signal that the save is complete
-        if origin and not meta.auto_created:
-            if connection.features.distinguishes_insert_from_update:
-                created = not record_exists
-            else:
-                created = not entity_exists
-            signals.post_save.send(sender=origin, instance=self,
-                created=created, raw=raw, using=using)
-
-        self._entity_exists = True
-        self._original_pk = self.pk
-
+        self.post_save(origin, created=not record_exists, raw=raw, using=using)
 
     save_base.alters_data = True
 
+    def pre_delete(self, cls, using=None, **kwargs):
+        if not cls._meta.auto_created:
+            signals.pre_delete.send(sender=cls, instance=self, using=using, **kwargs)
+
+    def post_delete(self, cls, using=None, **kwargs):
+        if not cls._meta.auto_created:
+            signals.post_delete.send(sender=cls, instance=self, using=using, **kwargs)
+
     def delete(self, using=None):
         using = using or router.db_for_write(self.__class__, instance=self)
         assert self._get_pk_val() is not None, "%s object can't be deleted because its %s attribute is set to None." % (self._meta.object_name, self._meta.pk.attname)
         collector.delete()
 
         self._entity_exists = False
-        self._original_pk = None
 
     delete.alters_data = True
 

File django/db/models/deletion.py

         # send pre_delete signals
         for model, obj in self.instances_with_model():
             if not model._meta.auto_created:
-                signals.pre_delete.send(
-                    sender=model, instance=obj, using=self.using
-                )
+                obj.pre_delete(model, using=self.using)
 
         # update fields
         for model, instances_for_fieldvalues in self.field_updates.iteritems():
         # send post_delete signals
         for model, obj in self.instances_with_model():
             if not model._meta.auto_created:
-                signals.post_delete.send(
-                    sender=model, instance=obj, using=self.using
-                )
+                obj.post_delete(model, using=self.using)
 
         # update collected instances
         for model, instances_for_fieldvalues in self.field_updates.iteritems():

File django/db/models/sql/subqueries.py

 
 from django.core.exceptions import FieldError
 from django.db import connections
+from django.db import router
+from django.db import DEFAULT_DB_ALIAS
 from django.db.models.fields import DateField, FieldDoesNotExist
 from django.db.models.sql.constants import *
 from django.db.models.sql.datastructures import Date
 
     def add_subquery(self, query, using):
         self.subquery, self.sub_params = query.get_compiler(using).as_sql(with_col_aliases=True)
+
+class BatchPool(object):
+    def __init__(self, size, batch_size):
+        self.max_size = size
+        self.max_batch_size = batch_size
+
+        self.reset()
+
+    def reset(self):
+        self.pool = []
+        self.pool_size = 0
+        self.batches_sizes = []
+
+    def append(self, instance, raw=False):
+        if raw:
+            groups = []
+        else:
+            groups = instance.get_parents_cascade()
+        groups.append({instance.__class__: None})
+
+        for idx, group in enumerate(groups):
+            try:
+                self.pool[idx]
+            except IndexError:
+                self.pool.append([])
+                self.batches_sizes.append(0)
+
+            for cls, field in group.items():
+                self.pool[idx].append((instance, cls, field, raw))
+
+            group_len = len(group)
+            self.pool_size += group_len
+            self.batches_sizes[idx] += group_len
+
+    def is_full(self):
+        if self.pool_size >= self.max_size:
+            return True
+        for batch_size in self.batches_sizes:
+            if batch_size >= self.max_batch_size:
+                return True
+        return False
+
+    def reverse(self):
+        self.pool.reverse()
+
+    def __iter__(self):
+        return iter(self.pool)
+
+class BatchQueryStub(object):
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type, value, traceback):
+        pass
+
+    def save(self, instance, **kwargs):
+        instance.save(**kwargs)
+
+    def delete(self, instance, **kwargs):
+        instance.delete(**kwargs)
+
+    def flush(self):
+        pass
+
+class BatchQuery(object):
+    compiler = 'SQLBatchCompiler'
+
+    def __init__(self, pool_size=None, batch_size=None,
+                 save_pool_size=None, save_batch_size=None,
+                 delete_pool_size=None, delete_batch_size=None,
+                 using=DEFAULT_DB_ALIAS, **kwargs):
+        self.using = using
+        self.connection = connections[using]
+        self.compiler = self.connection.ops.compiler(self.compiler)(self, self.connection, using,
+                                                                    **kwargs)
+        pool_size = pool_size or self.connection.features.batch_pool_size
+        batch_size = batch_size or self.connection.features.batch_size
+
+        save_pool_size = save_pool_size or pool_size
+        save_batch_size = save_batch_size or batch_size
+        self.save_pool = BatchPool(size=save_pool_size, batch_size=save_batch_size)
+
+        delete_pool_size = delete_pool_size or pool_size
+        delete_batch_size = delete_batch_size or batch_size
+        self.delete_pool = BatchPool(size=delete_pool_size, batch_size=delete_batch_size)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type, value, traceback):
+        self.flush()
+
+    def save(self, instance, using=None, raw=False, **kwargs):
+        instance.pre_save(instance.__class__, raw=raw, using=self.using)
+        self.save_pool.append(instance, raw=raw)
+        if self.save_pool.is_full():
+            print 'flushing'
+            self.flush_saves()
+
+    def delete(self, instance, using=None, **kwargs):
+        instance.pre_delete(instance.__class__, using=self.using)
+        self.delete_pool.append(instance)
+        if self.delete_pool.is_full():
+            self.flush_deletes()
+
+    def flush_saves(self):
+        for instance_cls_field_raw_list in self.save_pool:
+            instance_cls_values_raw_list = []
+            for instance, cls, field, raw in instance_cls_field_raw_list:
+                values = []
+                for local_field in cls._meta.local_fields:
+                    if raw:
+                        value = getattr(self, local_field.attname)
+                    else:
+                        value = local_field.pre_save(instance, not instance._entity_exists)
+                    value = local_field.get_db_prep_save(value, connection=self.connection)
+                    values.append((local_field, value))
+                instance_cls_values_raw_list.append((instance, cls, values, raw))
+
+            pk_vals = self.compiler.batch_save(instance_cls_values_raw_list)
+
+            for (instance, cls, field, raw), pk_val in zip(instance_cls_field_raw_list, pk_vals):
+                setattr(instance, cls._meta.pk.attname, pk_val)
+
+                if field:
+                    setattr(instance, field.attname, instance._get_pk_val(cls._meta))
+
+                created = (not instance._entity_exists)
+                instance._entity_exists = True
+                instance.post_save(cls, created=created, raw=raw,
+                                   using=self.using)
+        self.save_pool.reset()
+
+    def flush_deletes(self):
+        self.save_pool.reverse()
+        for instance_cls_field_raw_list in self.delete_pool:
+            instance_cls_list = [(instance, cls) for instance, cls, field, raw
+                            in instance_cls_field_raw_list]
+            self.compiler.batch_delete(instance_cls_list)
+
+            for instance, cls, field, _ in instance_cls_field_raw_list:
+                setattr(instance, cls._meta.pk.attname, None)
+                instance._entity_exists = False
+
+                if field:
+                    setattr(self, field.attname, None)
+
+                instance.post_delete(instance.__class__, using=self.using)
+        self.delete_pool.reset()
+
+    def flush(self):
+        self.flush_saves()
+        self.flush_deletes()
+
+class BatchOperation(object):
+    def __init__(self, queries_args={}):
+        self.queries_args = queries_args
+        self.queries = {}
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type, value, traceback):
+        for query in self.queries.values():
+            query.flush()
+
+    def get_query(self, instance, using=None):
+        using = using or router.db_for_write(instance.__class__, instance=instance)
+        connection = connections[using]
+
+        if using not in self.queries:
+            if using in self.queries_args:
+                kwargs = self.queries_args[using]
+            else:
+                kwargs = {}
+
+            kwargs['using'] = using
+
+            if connection.features.supports_batching:
+                cls = BatchQuery
+            else:
+                cls = BatchQueryStub
+            self.queries[using] = cls(**kwargs)
+        return self.queries[using]
+
+    def save(self, instance, using=None, **kwargs):
+        self.get_query(instance, using=using).save(instance, using=using, **kwargs)
+
+    def delete(self, instance, using=None, **kwargs):
+        self.get_query(instance, using=using).delete(instance, using=using, **kwargs)