Commits

Mike Bayer committed 572d4eb

- refactor query.update() and query.delete() to use a pure
template method pattern using new class hierarchy BulkUD
in sqlalchemy.orm.persistence

  • Participants
  • Parent commits 0d6ec4f

Comments (0)

Files changed (3)

lib/sqlalchemy/orm/persistence.py

 import operator
 from itertools import groupby
 
-from sqlalchemy import sql, util, exc as sa_exc
+from sqlalchemy import sql, util, exc as sa_exc, schema
 from sqlalchemy.orm import attributes, sync, \
-                        exc as orm_exc
+                        exc as orm_exc,\
+                        evaluator
 
-from sqlalchemy.orm.util import _state_mapper, state_str
+from sqlalchemy.orm.util import _state_mapper, state_str, _attr_as_key
+from sqlalchemy.sql import expression
 
 def save_obj(base_mapper, states, uowtransaction, single=False):
     """Issue ``INSERT`` and/or ``UPDATE`` statements for a list 
 def _organize_states_for_save(base_mapper, states, uowtransaction):
     """Make an initial pass across a set of states for INSERT or
     UPDATE.
-    
+
     This includes splitting out into distinct lists for
     each, calling before_insert/before_update, obtaining
     key information for each state including its dictionary,
     mapper, the connection to use for the execution per state,
     and the identity flag.
-    
+
     """
 
     states_to_insert = []
                                                 uowtransaction):
     """Make an initial pass across a set of states for UPDATE
     corresponding to post_update.
-    
+
     This includes obtaining key information for each state 
     including its dictionary, mapper, the connection to use for 
     the execution per state.
-    
+
     """
     return list(_connections_for_states(base_mapper, uowtransaction, 
                                             states))
 
 def _organize_states_for_delete(base_mapper, states, uowtransaction):
     """Make an initial pass across a set of states for DELETE.
-    
+
     This includes calling out before_delete and obtaining
     key information for each state including its dictionary,
     mapper, the connection to use for the execution per state.
-    
+
     """
     states_to_delete = []
 
                                                 states_to_insert):
     """Identify sets of values to use in INSERT statements for a
     list of states.
-    
+
     """
     insert = []
     for state, state_dict, mapper, connection, has_identity, \
                                 table, states_to_update):
     """Identify sets of values to use in UPDATE statements for a
     list of states.
-    
+
     This function works intricately with the history system
     to determine exactly what values should be updated
     as well as how the row should be matched within an UPDATE
                             states_to_insert, states_to_update):
     """finalize state on states that have been inserted or updated,
     including calling after_insert/after_update events.
-    
+
     """
     for state, state_dict, mapper, connection, has_identity, \
                     instance_key, row_switch in states_to_insert + \
 
 def _connections_for_states(base_mapper, uowtransaction, states):
     """Return an iterator of (state, state.dict, mapper, connection).
-    
+
     The states are sorted according to _sort_states, then paired
     with the connection they should be using for the given
     unit of work transaction.
-    
+
     """
     # if session has a connection callable,
     # organize individual states with the connection 
     return sorted(pending, key=operator.attrgetter("insert_order")) + \
                 sorted(persistent, key=lambda q:q.key[1])
 
+class BulkUD(object):
+    """Handle bulk update and deletes via a :class:`.Query`."""
 
+    def __init__(self, query):
+        self.query = query.enable_eagerloads(False)
+
+    @classmethod
+    def _factory(cls, lookup, synchronize_session, *arg):
+        try:
+            klass = lookup[synchronize_session]
+        except KeyError:
+            raise sa_exc.ArgumentError(
+                            "Valid strategies for session synchronization "
+                            "are %s" % (", ".join(sorted(repr(x) 
+                                for x in lookup.keys()))))
+        else:
+            return klass(*arg)
+
+    def exec_(self):
+        self._do_pre()
+        self._do_pre_synchronize()
+        self._do_exec()
+        self._do_post_synchronize()
+        self._do_post()
+
+    def _do_pre(self):
+        query = self.query
+        self.context = context = query._compile_context()
+        if len(context.statement.froms) != 1 or \
+                    not isinstance(context.statement.froms[0], schema.Table):
+            raise sa_exc.ArgumentError(
+                            "Only update via a single table query is "
+                            "currently supported")
+        self.primary_table = context.statement.froms[0]
+
+        session = query.session
+
+        if query._autoflush:
+            session._autoflush()
+
+    def _do_pre_synchronize(self):
+        pass
+
+    def _do_post_synchronize(self):
+        pass
+
+class BulkEvaluate(BulkUD):
+    """BulkUD which does the 'evaluate' method of session state resolution."""
+
+    def _additional_evaluators(self, evaluator_compiler):
+        pass
+
+    def _do_pre_synchronize(self):
+        query = self.query
+        try:
+            evaluator_compiler = evaluator.EvaluatorCompiler()
+            if query.whereclause is not None:
+                eval_condition = evaluator_compiler.process(
+                                                query.whereclause)
+            else:
+                def eval_condition(obj):
+                    return True
+
+            self._additional_evaluators(evaluator_compiler)
+
+        except evaluator.UnevaluatableError:
+            raise sa_exc.InvalidRequestError(
+                    "Could not evaluate current criteria in Python. "
+                    "Specify 'fetch' or False for the "
+                    "synchronize_session parameter.")
+        target_cls = query._mapper_zero().class_
+
+        #TODO: detect when the where clause is a trivial primary key match
+        self.matched_objects = [
+                            obj for (cls, pk),obj in
+                            query.session.identity_map.iteritems()
+                            if issubclass(cls, target_cls) and
+                            eval_condition(obj)]
+
+class BulkFetch(BulkUD):
+    """BulkUD which does the 'fetch' method of session state resolution."""
+
+    def _do_pre_synchronize(self):
+        query = self.query
+        session = query.session
+        select_stmt = self.context.statement.with_only_columns(
+                                            self.primary_table.primary_key)
+        self.matched_rows = session.execute(
+                                    select_stmt,
+                                    params=query._params).fetchall()
+
+class BulkUpdate(BulkUD):
+    """BulkUD which handles UPDATEs."""
+
+    def __init__(self, query, values):
+        super(BulkUpdate, self).__init__(query)
+        self.query._no_select_modifiers("update")
+        self.values = values
+
+    @classmethod
+    def factory(cls, query, synchronize_session, values):
+        return BulkUD._factory({
+            "evaluate":BulkUpdateEvaluate,
+            "fetch":BulkUpdateFetch,
+            False:BulkUpdate
+        }, synchronize_session, query, values)
+
+    def _do_exec(self):
+        update_stmt = sql.update(self.primary_table, 
+                            self.context.whereclause, self.values)
+
+        self.result = self.query.session.execute(
+                            update_stmt, params=self.query._params)
+        self.rowcount = self.result.rowcount
+
+    def _do_post(self):
+        session = self.query.session
+        session.dispatch.after_bulk_update(session, self.query, 
+                                self.context, self.result)
+
+class BulkDelete(BulkUD):
+    """BulkUD which handles DELETEs."""
+
+    def __init__(self, query):
+        super(BulkDelete, self).__init__(query)
+        self.query._no_select_modifiers("delete")
+
+    @classmethod
+    def factory(cls, query, synchronize_session):
+        return BulkUD._factory({
+            "evaluate":BulkDeleteEvaluate,
+            "fetch":BulkDeleteFetch,
+            False:BulkDelete
+        }, synchronize_session, query)
+
+    def _do_exec(self):
+        delete_stmt = sql.delete(self.primary_table, 
+                                    self.context.whereclause)
+
+        self.result = self.query.session.execute(delete_stmt, 
+                                    params=self.query._params)
+        self.rowcount = self.result.rowcount
+
+    def _do_post(self):
+        session = self.query.session
+        session.dispatch.after_bulk_delete(session, self.query, 
+                        self.context, self.result)
+
+class BulkUpdateEvaluate(BulkEvaluate, BulkUpdate):
+    """BulkUD which handles UPDATEs using the "evaluate" 
+    method of session resolution."""
+
+    def _additional_evaluators(self,evaluator_compiler):
+        self.value_evaluators = {}
+        for key,value in self.values.iteritems():
+            key = _attr_as_key(key)
+            self.value_evaluators[key] = evaluator_compiler.process(
+                                expression._literal_as_binds(value))
+
+    def _do_post_synchronize(self):
+        session = self.query.session
+        states = set()
+        evaluated_keys = self.value_evaluators.keys()
+        for obj in self.matched_objects:
+            state, dict_ = attributes.instance_state(obj),\
+                                    attributes.instance_dict(obj)
+
+            # only evaluate unmodified attributes
+            to_evaluate = state.unmodified.intersection(
+                                                    evaluated_keys)
+            for key in to_evaluate:
+                dict_[key] = self.value_evaluators[key](obj)
+
+            state.commit(dict_, list(to_evaluate))
+
+            # expire attributes with pending changes 
+            # (there was no autoflush, so they are overwritten)
+            state.expire_attributes(dict_,
+                            set(evaluated_keys).
+                                difference(to_evaluate))
+            states.add(state)
+        session._register_altered(states)
+
+class BulkDeleteEvaluate(BulkEvaluate, BulkDelete):
+    """BulkUD which handles DELETEs using the "evaluate" 
+    method of session resolution."""
+
+    def _do_post_synchronize(self):
+        self.query.session._remove_newly_deleted(
+                [attributes.instance_state(obj) 
+                    for obj in self.matched_objects])
+
+class BulkUpdateFetch(BulkFetch, BulkUpdate):
+    """BulkUD which handles UPDATEs using the "fetch" 
+    method of session resolution."""
+
+    def _do_post_synchronize(self):
+        session = self.query.session
+        target_mapper = self.query._mapper_zero()
+
+        states = set([
+            attributes.instance_state(session.identity_map[identity_key])
+            for identity_key in [
+                target_mapper.identity_key_from_primary_key(
+                                                        list(primary_key))
+                for primary_key in self.matched_rows
+            ]
+        ])
+        attrib = [_attr_as_key(k) for k in self.values]
+        for state in states:
+            session._expire_state(state, attrib)
+        session._register_altered(states)
+
+class BulkDeleteFetch(BulkFetch, BulkDelete):
+    """BulkUD which handles DELETEs using the "fetch" 
+    method of session resolution."""
+
+    def _do_post_synchronize(self):
+        session = self.query.session
+        target_mapper = self.query._mapper_zero()
+        for primary_key in self.matched_rows:
+            # TODO: inline this and call remove_newly_deleted
+            # once
+            identity_key = target_mapper.identity_key_from_primary_key(
+                                                        list(primary_key))
+            if identity_key in session.identity_map:
+                session._remove_newly_deleted(
+                    [attributes.instance_state(
+                        session.identity_map[identity_key]
+                    )]
+                )
+

lib/sqlalchemy/orm/query.py

 from sqlalchemy import sql, util, log, schema
 from sqlalchemy import exc as sa_exc
 from sqlalchemy.orm import exc as orm_exc
+from sqlalchemy.orm import persistence
 from sqlalchemy.sql import util as sql_util
 from sqlalchemy.sql import expression, visitors, operators
 from sqlalchemy.orm import (
         represented as a common table expression (CTE).
 
         The :meth:`.Query.cte` method is new in 0.7.6.
-        
+
         Parameters and usage are the same as those of the 
         :meth:`._SelectBase.cte` method; see that method for 
         further details.
-        
+
         Here is the `Postgresql WITH 
         RECURSIVE example <http://www.postgresql.org/docs/8.4/static/queries-with.html>`_.
         Note that, in this example, the ``included_parts`` cte and the ``incl_alias`` alias
                 group_by(included_parts.c.sub_part)
 
         See also:
-        
+
         :meth:`._SelectBase.cte`
 
         """
                                     selectable=None, 
                                     polymorphic_on=None):
         """Load columns for inheriting classes.
-        
+
         :meth:`.Query.with_polymorphic` applies transformations 
         to the "main" mapped class represented by this :class:`.Query`.
         The "main" mapped class here means the :class:`.Query`
         subclass are available in the query, both for the purposes
         of load-time efficiency as well as the ability to use 
         these columns at query time.
-        
+
         See the documentation section :ref:`with_polymorphic` for
         details on how this method is used.
-        
+
         As of 0.8, a new and more flexible function 
         :func:`.orm.with_polymorphic` supersedes 
         :meth:`.Query.with_polymorphic`, as it can apply the equivalent
     def get(self, ident):
         """Return an instance based on the given primary key identifier, 
         or ``None`` if not found.
-        
+
         E.g.::
-        
+
             my_user = session.query(User).get(5)
-            
+
             some_object = session.query(VersionedFoo).get((5, 10))
-        
+
         :meth:`~.Query.get` is special in that it provides direct 
         access to the identity map of the owning :class:`.Session`.
         If the given primary key identifier is present
         unless the object has been marked fully expired.
         If not present,
         a SELECT is performed in order to locate the object.
-        
+
         :meth:`~.Query.get` also will perform a check if 
         the object is present in the identity map and 
         marked as expired - a SELECT 
         is emitted to refresh the object as well as to
         ensure that the row is still present.
         If not, :class:`~sqlalchemy.orm.exc.ObjectDeletedError` is raised.
-        
+
         :meth:`~.Query.get` is only used to return a single
         mapped instance, not multiple instances or 
         individual column constructs, and strictly
         options via :meth:`~.Query.options` may be applied
         however, and will be used if the object is not
         yet locally present.
-        
+
         A lazy-loading, many-to-one attribute configured
         by :func:`.relationship`, using a simple
         foreign-key-to-primary-key criterion, will also use an 
         the target value from the local identity map
         before querying the database.  See :ref:`loading_toplevel`
         for further details on relationship loading.
-        
+
         :param ident: A scalar or tuple value representing
          the primary key.   For a composite primary key,
          the order of identifiers corresponds in most cases
          to the elements present in this collection.
 
         :return: The object instance, or ``None``.
-        
+
         """
 
         # convert composite types to individual args
         """Set the 'invoke all eagers' flag which causes joined- and
         subquery loaders to traverse into already-loaded related objects
         and collections.
-        
+
         Default is that of :attr:`.Query._invoke_all_eagers`.
 
         """
     def with_transformation(self, fn):
         """Return a new :class:`.Query` object transformed by
         the given function.
-        
+
         E.g.::
-        
+
             def filter_something(criterion):
                 def transform(q):
                     return q.filter(criterion)
                 return transform
-            
+
             q = q.with_transformation(filter_something(x==5))
-        
+
         This allows ad-hoc recipes to be created for :class:`.Query`
         objects.  See the example at :ref:`hybrid_transformers`.
 
 
             ``'read_nowait'`` - passes ``for_update='read_nowait'``, which 
             translates to ``FOR SHARE NOWAIT`` (supported by PostgreSQL).
-            
+
             New in 0.7.7: ``FOR SHARE`` and ``FOR SHARE NOWAIT`` (PostgreSQL)
         """
 
         of this :class:`.Query`, using SQL expressions.
 
         e.g.::
-        
+
             session.query(MyClass).filter(MyClass.name == 'some name')
-        
+
         Multiple criteria are joined together by AND (new in 0.7.5)::
-        
+
             session.query(MyClass).\\
                 filter(MyClass.name == 'some name', MyClass.id > 5)
-        
+
         The criterion is any SQL expression object applicable to the 
         WHERE clause of a select.   String expressions are coerced
         into SQL expression constructs via the :func:`.text` construct.
 
         See also:
-        
+
         :meth:`.Query.filter_by` - filter on keyword expressions.
 
         """
     def filter_by(self, **kwargs):
         """apply the given filtering criterion to a copy
         of this :class:`.Query`, using keyword expressions.
-        
+
         e.g.::
-        
+
             session.query(MyClass).filter_by(name = 'some name')
-        
+
         Multiple criteria are joined together by AND::
-        
+
             session.query(MyClass).\\
                 filter_by(name = 'some name', id = 5)
-        
+
         The keyword expressions are extracted from the primary 
         entity of the query, or the last entity that was the 
         target of a call to :meth:`.Query.join`.
-        
+
         See also:
-        
+
         :meth:`.Query.filter` - filter on SQL expressions.
-        
+
         """
 
         clauses = [_entity_descriptor(self._joinpoint_zero(), key) == value
     def join(self, *props, **kwargs):
         """Create a SQL JOIN against this :class:`.Query` object's criterion
         and apply generatively, returning the newly resulting :class:`.Query`.
-        
+
         **Simple Relationship Joins**
-        
+
         Consider a mapping between two classes ``User`` and ``Address``,
         with a relationship ``User.addresses`` representing a collection 
         of ``Address`` objects associated with each ``User``.   The most common 
         usage of :meth:`~.Query.join` is to create a JOIN along this
         relationship, using the ``User.addresses`` attribute as an indicator
         for how this should occur::
-            
+
             q = session.query(User).join(User.addresses)
-        
+
         Where above, the call to :meth:`~.Query.join` along ``User.addresses`` 
         will result in SQL equivalent to::
-        
+
             SELECT user.* FROM user JOIN address ON user.id = address.user_id
-        
+
         In the above example we refer to ``User.addresses`` as passed to
         :meth:`~.Query.join` as the *on clause*, that is, it indicates
         how the "ON" portion of the JOIN should be constructed.  For a 
         single-entity query such as the one above (i.e. we start by selecting only from
         ``User`` and nothing else), the relationship can also be specified by its 
         string name::
-        
+
             q = session.query(User).join("addresses")
-            
+
         :meth:`~.Query.join` can also accommodate multiple 
         "on clause" arguments to produce a chain of joins, such as below
         where a join across four related entities is constructed::
-        
+
             q = session.query(User).join("orders", "items", "keywords")
-            
+
         The above would be shorthand for three separate calls to :meth:`~.Query.join`,
         each using an explicit attribute to indicate the source entity::
-        
+
             q = session.query(User).\\
                     join(User.orders).\\
                     join(Order.items).\\
                     join(Item.keywords)
-        
+
         **Joins to a Target Entity or Selectable**
-        
+
         A second form of :meth:`~.Query.join` allows any mapped entity
         or core selectable construct as a target.   In this usage, 
         :meth:`~.Query.join` will attempt
         to create a JOIN along the natural foreign key relationship between
         two entities::
-        
+
             q = session.query(User).join(Address)
-            
+
         The above calling form of :meth:`.join` will raise an error if 
         either there are no foreign keys between the two entities, or if 
         there are multiple foreign key linkages between them.   In the
         above calling form, :meth:`~.Query.join` is called upon to 
         create the "on clause" automatically for us.  The target can
         be any mapped entity or selectable, such as a :class:`.Table`::
-        
+
             q = session.query(User).join(addresses_table)
-        
+
         **Joins to a Target with an ON Clause**
-        
+
         The third calling form allows both the target entity as well
         as the ON clause to be passed explicitly.   Suppose for 
         example we wanted to join to ``Address`` twice, using
         to it using the ``target, onclause`` form, so that the 
         alias can be specified explicitly as the target along with
         the relationship to instruct how the ON clause should proceed::
-        
+
             a_alias = aliased(Address)
-            
+
             q = session.query(User).\\
                     join(User.addresses).\\
                     join(a_alias, User.addresses).\\
                     filter(Address.email_address=='ed@foo.com').\\
                     filter(a_alias.email_address=='ed@bar.com')
-        
+
         Where above, the generated SQL would be similar to::
-        
+
             SELECT user.* FROM user 
                 JOIN address ON user.id = address.user_id
                 JOIN address AS address_1 ON user.id=address_1.user_id
                 WHERE address.email_address = :email_address_1
                 AND address_1.email_address = :email_address_2
-        
+
         The two-argument calling form of :meth:`~.Query.join` 
         also allows us to construct arbitrary joins with SQL-oriented
         "on clause" expressions, not relying upon configured relationships
         at all.  Any SQL expression can be passed as the ON clause
         when using the two-argument form, which should refer to the target
         entity in some way as well as an applicable source entity::
-        
+
             q = session.query(User).join(Address, User.id==Address.user_id)
-        
+
         .. note:: 
-        
+
            In SQLAlchemy 0.6 and earlier, the two argument form of 
            :meth:`~.Query.join` requires the usage of a tuple::
-           
+
                query(User).join((Address, User.id==Address.user_id))
-           
+
            This calling form is accepted in 0.7 and further, though
            is not necessary unless multiple join conditions are passed to
            a single :meth:`~.Query.join` call, which itself is also not
            generally necessary as it is now equivalent to multiple
            calls (this wasn't always the case).
-           
+
         **Advanced Join Targeting and Adaption**
 
         There is a lot of flexibility in what the "target" can be when using 
 
             q = session.query(User).\\
                         join(addresses_q, addresses_q.c.user_id==User.id)
-        
+
         :meth:`~.Query.join` also features the ability to *adapt* a 
         :meth:`~sqlalchemy.orm.relationship` -driven ON clause to the target selectable.
         Below we construct a JOIN from ``User`` to a subquery against ``Address``, allowing
         the relationship denoted by ``User.addresses`` to *adapt* itself
         to the altered target::
-        
+
             address_subq = session.query(Address).\\
                                 filter(Address.email_address == 'ed@foo.com').\\
                                 subquery()
 
             q = session.query(User).join(address_subq, User.addresses)
-        
+
         Producing SQL similar to::
-        
+
             SELECT user.* FROM user 
                 JOIN (
                     SELECT address.id AS id, 
                     FROM address 
                     WHERE address.email_address = :email_address_1
                 ) AS anon_1 ON user.id = anon_1.user_id
-        
+
         The above form allows one to fall back onto an explicit ON
         clause at any time::
-        
+
             q = session.query(User).\\
                     join(address_subq, User.id==address_subq.c.user_id)
-        
+
         **Controlling what to Join From**
-        
+
         While :meth:`~.Query.join` exclusively deals with the "right"
         side of the JOIN, we can also control the "left" side, in those
         cases where it's needed, using :meth:`~.Query.select_from`.
         make usage of ``User.addresses`` as our ON clause by instructing
         the :class:`.Query` to select first from the ``User`` 
         entity::
-        
+
             q = session.query(Address).select_from(User).\\
                             join(User.addresses).\\
                             filter(User.name == 'ed')
-        
+
         Which will produce SQL similar to::
-        
+
             SELECT address.* FROM user 
                 JOIN address ON user.id=address.user_id 
                 WHERE user.name = :name_1
-        
+
         **Constructing Aliases Anonymously**
-        
+
         :meth:`~.Query.join` can construct anonymous aliases
         using the ``aliased=True`` flag.  This feature is useful
         when a query is being joined algorithmically, such as
         when querying self-referentially to an arbitrary depth::
-        
+
             q = session.query(Node).\\
                     join("children", "children", aliased=True)
-        
+
         When ``aliased=True`` is used, the actual "alias" construct
         is not explicitly available.  To work with it, methods such as 
         :meth:`.Query.filter` will adapt the incoming entity to 
         the last join point::
-        
+
             q = session.query(Node).\\
                     join("children", "children", aliased=True).\\
                     filter(Node.name == 'grandchild 1')
-        
+
         When using automatic aliasing, the ``from_joinpoint=True``
         argument can allow a multi-node join to be broken into
         multiple calls to :meth:`~.Query.join`, so that
         each path along the way can be further filtered::
-        
+
             q = session.query(Node).\\
                     join("children", aliased=True).\\
                     filter(Node.name='child 1').\\
                     join("children", aliased=True, from_joinpoint=True).\\
                     filter(Node.name == 'grandchild 1')
-        
+
         The filtering aliases above can then be reset back to the
         original ``Node`` entity using :meth:`~.Query.reset_joinpoint`::
-        
+
             q = session.query(Node).\\
                     join("children", "children", aliased=True).\\
                     filter(Node.name == 'grandchild 1').\\
                     reset_joinpoint().\\
                     filter(Node.name == 'parent 1)
-        
+
         For an example of ``aliased=True``, see the distribution 
         example :ref:`examples_xmlpersistence` which illustrates
         an XPath-like query system using algorithmic joins.
-        
+
         :param *props: A collection of one or more join conditions, 
          each consisting of a relationship-bound attribute or string 
          relationship name representing an "on clause", or a single 
          of True here will cause the join to be from the most recent
          joined target, rather than starting back from the original 
          FROM clauses of the query.
-         
+
         See also:
-        
+
             :ref:`ormtutorial_joins` in the ORM tutorial.
 
             :ref:`inheritance_toplevel` for details on how :meth:`~.Query.join`
             is used for inheritance relationships.
-        
+
             :func:`.orm.join` - a standalone ORM-level join function,
             used internally by :meth:`.Query.join`, which in previous 
             SQLAlchemy versions was the primary ORM-level joining interface.
-             
+
         """
         aliased, from_joinpoint = kwargs.pop('aliased', False),\
                                     kwargs.pop('from_joinpoint', False)
     def reset_joinpoint(self):
         """Return a new :class:`.Query`, where the "join point" has
         been reset back to the base FROM entities of the query.
-        
+
         This method is usually used in conjunction with the
         ``aliased=True`` feature of the :meth:`~.Query.join`
         method.  See the example in :meth:`~.Query.join` for how 
 
         Mapped entities or plain :class:`~.Table` or other selectables
         can be sent here which will form the default FROM clause.
-        
+
         See the example in :meth:`~.Query.join` for a typical 
         usage of :meth:`~.Query.select_from`.
 
 
             SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL users.name AS users_name 
             FROM users
-        
+
         New in 0.7.7.
 
         """
         this :class:`.Query` - if these do not correspond, unchecked errors will occur.
 
         The 'load' argument is the same as that of :meth:`.Session.merge`.
-        
+
         For an example of how :meth:`~.Query.merge_result` is used, see
         the source code for the example :ref:`examples_caching`, where
         :meth:`~.Query.merge_result` is used to efficiently restore state
 
     def count(self):
         """Return a count of rows this Query would return.
-        
+
         This generates the SQL for this Query as follows::
-        
+
             SELECT count(1) AS count_1 FROM (
                 SELECT <rest of query follows...>
             ) AS anon_1
 
         Note the above scheme is newly refined in 0.7 
         (as of 0.7b3).
-        
+
         For fine grained control over specific columns 
         to count, to skip the usage of a subquery or
         otherwise control of the FROM clause,
         or to use other aggregate functions,
         use :attr:`~sqlalchemy.sql.expression.func` expressions in conjunction
         with :meth:`~.Session.query`, i.e.::
-        
+
             from sqlalchemy import func
-            
+
             # count User records, without
             # using a subquery.
             session.query(func.count(User.id))
-                        
+
             # return count of user "id" grouped
             # by "name"
             session.query(func.count(User.id)).\\
                     group_by(User.name)
 
             from sqlalchemy import distinct
-            
+
             # count distinct "name" values
             session.query(func.count(distinct(User.name)))
-            
+
         """
         col = sql.func.count(sql.literal_column('*'))
         return self.from_self(col).scalar()
         invokes :meth:`.SessionEvents.after_bulk_delete`.
 
         """
-        #TODO: lots of duplication and ifs - probably needs to be 
-        # refactored to strategies
         #TODO: cascades need handling.
 
-        if synchronize_session not in [False, 'evaluate', 'fetch']:
-            raise sa_exc.ArgumentError(
-                            "Valid strategies for session "
-                            "synchronization are False, 'evaluate' and "
-                            "'fetch'")
-        self._no_select_modifiers("delete")
-
-        self = self.enable_eagerloads(False)
-
-        context = self._compile_context()
-        if len(context.statement.froms) != 1 or \
-                    not isinstance(context.statement.froms[0], schema.Table):
-            raise sa_exc.ArgumentError("Only deletion via a single table "
-                                        "query is currently supported")
-        primary_table = context.statement.froms[0]
-
-        session = self.session
-
-        if self._autoflush:
-            session._autoflush()
-
-        if synchronize_session == 'evaluate':
-            try:
-                evaluator_compiler = evaluator.EvaluatorCompiler()
-                if self.whereclause is not None:
-                    eval_condition = evaluator_compiler.process(
-                                                            self.whereclause)
-                else:
-                    def eval_condition(obj):
-                        return True
-
-            except evaluator.UnevaluatableError:
-                raise sa_exc.InvalidRequestError(
-                    "Could not evaluate current criteria in Python.  "
-                    "Specify 'fetch' or False for the synchronize_session "
-                    "parameter.")
-
-            target_cls = self._mapper_zero().class_
-
-            #TODO: detect when the where clause is a trivial primary key match
-            objs_to_expunge = [
-                                obj for (cls, pk),obj in
-                                session.identity_map.iteritems()
-                                if issubclass(cls, target_cls) and
-                                eval_condition(obj)]
-
-        elif synchronize_session == 'fetch':
-            #TODO: use RETURNING when available
-            select_stmt = context.statement.with_only_columns(
-                                                primary_table.primary_key)
-            matched_rows = session.execute(
-                                        select_stmt,
-                                        params=self._params).fetchall()
-
-        delete_stmt = sql.delete(primary_table, context.whereclause)
-
-        result = session.execute(delete_stmt, params=self._params)
-
-        if synchronize_session == 'evaluate':
-            session._remove_newly_deleted([attributes.instance_state(obj) 
-                                            for obj in objs_to_expunge])
-        elif synchronize_session == 'fetch':
-            target_mapper = self._mapper_zero()
-            for primary_key in matched_rows:
-                # TODO: inline this and call remove_newly_deleted
-                # once
-                identity_key = target_mapper.identity_key_from_primary_key(
-                                                            list(primary_key))
-                if identity_key in session.identity_map:
-                    session._remove_newly_deleted(
-                        [attributes.instance_state(
-                            session.identity_map[identity_key]
-                        )]
-                    )
-
-        session.dispatch.after_bulk_delete(session, self, context, result)
-
-        return result.rowcount
+        delete_op = persistence.BulkDelete.factory(self, synchronize_session)
+        delete_op.exec_()
+        return delete_op.rowcount
 
     def update(self, values, synchronize_session='evaluate'):
         """Perform a bulk update query.
         # fk assignments
         #TODO: cascades need handling.
 
-        if synchronize_session == 'expire':
-            util.warn_deprecated("The 'expire' value as applied to "
-                                    "the synchronize_session argument of "
-                                    "query.update() is now called 'fetch'")
-            synchronize_session = 'fetch'
-
-        if synchronize_session not in [False, 'evaluate', 'fetch']:
-            raise sa_exc.ArgumentError(
-                            "Valid strategies for session synchronization "
-                            "are False, 'evaluate' and 'fetch'")
-        self._no_select_modifiers("update")
-
-        self = self.enable_eagerloads(False)
-
-        context = self._compile_context()
-        if len(context.statement.froms) != 1 or \
-                    not isinstance(context.statement.froms[0], schema.Table):
-            raise sa_exc.ArgumentError(
-                            "Only update via a single table query is "
-                            "currently supported")
-        primary_table = context.statement.froms[0]
-
-        session = self.session
-
-        if self._autoflush:
-            session._autoflush()
-
-        if synchronize_session == 'evaluate':
-            try:
-                evaluator_compiler = evaluator.EvaluatorCompiler()
-                if self.whereclause is not None:
-                    eval_condition = evaluator_compiler.process(
-                                                    self.whereclause)
-                else:
-                    def eval_condition(obj):
-                        return True
-
-                value_evaluators = {}
-                for key,value in values.iteritems():
-                    key = _attr_as_key(key)
-                    value_evaluators[key] = evaluator_compiler.process(
-                                        expression._literal_as_binds(value))
-            except evaluator.UnevaluatableError:
-                raise sa_exc.InvalidRequestError(
-                        "Could not evaluate current criteria in Python. "
-                        "Specify 'fetch' or False for the "
-                        "synchronize_session parameter.")
-            target_cls = self._mapper_zero().class_
-            matched_objects = []
-            for (cls, pk),obj in session.identity_map.iteritems():
-                evaluated_keys = value_evaluators.keys()
-
-                if issubclass(cls, target_cls) and eval_condition(obj):
-                    matched_objects.append(obj)
-
-        elif synchronize_session == 'fetch':
-            select_stmt = context.statement.with_only_columns(
-                                                primary_table.primary_key)
-            matched_rows = session.execute(
-                                        select_stmt,
-                                        params=self._params).fetchall()
-
-        update_stmt = sql.update(primary_table, context.whereclause, values)
-
-        result = session.execute(update_stmt, params=self._params)
-
-        if synchronize_session == 'evaluate':
-            target_cls = self._mapper_zero().class_
-            states = set()
-            for obj in matched_objects:
-                state, dict_ = attributes.instance_state(obj),\
-                                        attributes.instance_dict(obj)
-
-                # only evaluate unmodified attributes
-                to_evaluate = state.unmodified.intersection(
-                                                        evaluated_keys)
-                for key in to_evaluate:
-                    dict_[key] = value_evaluators[key](obj)
-
-                state.commit(dict_, list(to_evaluate))
-
-                # expire attributes with pending changes 
-                # (there was no autoflush, so they are overwritten)
-                state.expire_attributes(dict_,
-                                set(evaluated_keys).
-                                    difference(to_evaluate))
-                states.add(state)
-            session._register_altered(states)
-
-        elif synchronize_session == 'fetch':
-            target_mapper = self._mapper_zero()
-
-            states = set([
-                attributes.instance_state(session.identity_map[identity_key])
-                for identity_key in [
-                    target_mapper.identity_key_from_primary_key(
-                                                            list(primary_key))
-                    for primary_key in matched_rows
-                ]
-            ])
-            attrib = [_attr_as_key(k) for k in values]
-            for state in states:
-                session._expire_state(state, attrib)
-            session._register_altered(states)
-
-        session.dispatch.after_bulk_update(session, self, context, result)
-
-        return result.rowcount
+        update_op = persistence.BulkUpdate.factory(self, synchronize_session, values)
+        update_op.exec_()
+        return update_op.rowcount
+
 
     def _compile_context(self, labels=True):
         context = QueryContext(self)

test/orm/test_update_delete.py

 
         mapper(User, users)
 
+    def test_illegal_eval(self):
+        User = self.classes.User
+        s = Session()
+        assert_raises_message(
+            exc.ArgumentError,
+            "Valid strategies for session synchronization "
+            "are 'evaluate', 'fetch', False",
+            s.query(User).update,
+            {},
+            synchronize_session="fake"
+        )
+
     def test_illegal_operations(self):
         User = self.classes.User