Mike Bayer avatar Mike Bayer committed 16bcc75

Can set/change the "cascade" attribute on a :func:`.relationship`
construct after it's been constructed already. This is not
a pattern for normal use but we like to change the setting
for demonstration purposes in tutorials.

Comments (0)

Files changed (5)

doc/build/changelog/changelog_08.rst

       * :ref:`metadata_create_drop_tables`
 
     .. change::
+        :tags: feature, orm
+
+      Can set/change the "cascade" attribute on a :func:`.relationship`
+      construct after it's been constructed already.  This is not
+      a pattern for normal use but we like to change the setting
+      for demonstration purposes in tutorials.
+
+    .. change::
         :tags: bug, schema
         :tickets: 2664
 

lib/sqlalchemy/orm/interfaces.py

 
     """
 
-    cascade = ()
+    cascade = frozenset()
     """The set of 'cascade' attribute names.
 
     This collection is checked before the 'cascade_iterator' method is called.

lib/sqlalchemy/orm/properties.py

 
     strategy_wildcard_key = 'relationship:*'
 
+    _dependency_processor = None
+
     def __init__(self, argument,
         secondary=None, primaryjoin=None,
         secondaryjoin=None,
         load_on_pending=False,
         strategy_class=None, _local_remote_pairs=None,
         query_class=None,
-        info=None):
+            info=None):
 
         self.uselist = uselist
         self.argument = argument
 
         self._reverse_property = set()
 
-        if cascade is not False:
-            self.cascade = CascadeOptions(cascade)
-        else:
-            self.cascade = CascadeOptions("save-update, merge")
-
-        if self.passive_deletes == 'all' and \
-                    ("delete" in self.cascade or
-                    "delete-orphan" in self.cascade):
-            raise sa_exc.ArgumentError(
-                            "Can't set passive_deletes='all' in conjunction "
-                            "with 'delete' or 'delete-orphan' cascade")
+        self.cascade = cascade if cascade is not False \
+                            else "save-update, merge"
 
         self.order_by = order_by
 
                 if self.property._use_get:
                     return sql.and_(*[
                         sql.or_(
-                        adapt(x) != state_bindparam(adapt(x), state, y),
-                        adapt(x) == None)
+                            adapt(x) != state_bindparam(adapt(x), state, y),
+                            adapt(x) == None)
                         for (x, y) in self.property.local_remote_pairs])
 
             criterion = sql.and_(*[x == y for (x, y) in
                 if (source_state, r) in _recursive:
                     return
 
-        if not "merge" in self.cascade:
+        if not "merge" in self._cascade:
             return
 
         if self.key not in source_dict:
 
     def cascade_iterator(self, type_, state, dict_,
                          visited_states, halt_on=None):
-        #assert type_ in self.cascade
+        #assert type_ in self._cascade
 
         # only actively lazy load on the 'delete' cascade
         if type_ != 'delete' or self.passive_deletes:
                             passive=passive)
 
         skip_pending = type_ == 'refresh-expire' and 'delete-orphan' \
-            not in self.cascade
+            not in self._cascade
 
         for instance_state, c in tuples:
             if instance_state in visited_states:
                     'does not reference mapper %s' % (key, self, other,
                     self.parent))
         if self.direction in (ONETOMANY, MANYTOONE) and self.direction \
-            == other.direction:
+                        == other.direction:
             raise sa_exc.ArgumentError('%s and back-reference %s are '
                     'both of the same direction %r.  Did you mean to '
                     'set remote_side on the many-to-one side ?'
         self._check_conflicts()
         self._process_dependent_arguments()
         self._setup_join_conditions()
-        self._check_cascade_settings()
+        self._check_cascade_settings(self._cascade)
         self._post_init()
         self._generate_backref()
         super(RelationshipProperty, self).do_init()
         for attr in (
             'order_by', 'primaryjoin', 'secondaryjoin',
             'secondary', '_user_defined_foreign_keys', 'remote_side',
-            ):
+                ):
             attr_value = getattr(self, attr)
             if util.callable(attr_value):
                 setattr(self, attr, attr_value())
 
         self.target = self.mapper.mapped_table
 
-        if self.cascade.delete_orphan:
-            self.mapper.primary_mapper()._delete_orphans.append(
-                            (self.key, self.parent.class_)
-                        )
 
     def _setup_join_conditions(self):
         self._join_condition = jc = relationships.JoinCondition(
         if not self.parent.concrete:
             for inheriting in self.parent.iterate_to_root():
                 if inheriting is not self.parent \
-                    and inheriting.has_property(self.key):
+                        and inheriting.has_property(self.key):
                     util.warn("Warning: relationship '%s' on mapper "
                               "'%s' supersedes the same relationship "
                               "on inherited mapper '%s'; this can "
                               "cause dependency issues during flush"
                               % (self.key, self.parent, inheriting))
 
-    def _check_cascade_settings(self):
-        if self.cascade.delete_orphan and not self.single_parent \
+    @property
+    def cascade(self):
+        """Return the current cascade setting for this
+        :class:`.RelationshipProperty`.
+        """
+        return self._cascade
+
+    @cascade.setter
+    def cascade(self, cascade):
+        cascade = CascadeOptions(cascade)
+        if 'mapper' in self.__dict__:
+            self._check_cascade_settings(cascade)
+        self._cascade = cascade
+
+        if self._dependency_processor:
+            self._dependency_processor.cascade = cascade
+
+    def _check_cascade_settings(self, cascade):
+        if cascade.delete_orphan and not self.single_parent \
             and (self.direction is MANYTOMANY or self.direction
                  is MANYTOONE):
             raise sa_exc.ArgumentError(
                     'On %s, delete-orphan cascade is not supported '
-                      'on a many-to-many or many-to-one relationship '
-                      'when single_parent is not set.   Set '
-                      'single_parent=True on the relationship().'
-                      % self)
+                    'on a many-to-many or many-to-one relationship '
+                    'when single_parent is not set.   Set '
+                    'single_parent=True on the relationship().'
+                    % self)
         if self.direction is MANYTOONE and self.passive_deletes:
             util.warn("On %s, 'passive_deletes' is normally configured "
                       "on one-to-many, one-to-one, many-to-many "
                       "relationships only."
                        % self)
 
+        if self.passive_deletes == 'all' and \
+                    ("delete" in cascade or
+                    "delete-orphan" in cascade):
+            raise sa_exc.ArgumentError(
+                    "On %s, can't set passive_deletes='all' in conjunction "
+                    "with 'delete' or 'delete-orphan' cascade" % self)
+
+        if cascade.delete_orphan:
+            self.mapper.primary_mapper()._delete_orphans.append(
+                            (self.key, self.parent.class_)
+                        )
+
     def _columns_are_mapped(self, *cols):
         """Return True if all columns in the given collection are
         mapped by the tables referenced by this :class:`.Relationship`.
         """
         for c in cols:
             if self.secondary is not None \
-                and self.secondary.c.contains_column(c):
+                    and self.secondary.c.contains_column(c):
                 continue
             if not self.parent.mapped_table.c.contains_column(c) and \
-                not self.target.c.contains_column(c):
+                    not self.target.c.contains_column(c):
                 return False
         return True
 

lib/sqlalchemy/orm/unitofwork.py

 
             prop = state.manager.mapper._props[key]
             item_state = attributes.instance_state(item)
-            if prop.cascade.save_update and \
+            if prop._cascade.save_update and \
                 (prop.cascade_backrefs or key == initiator.key) and \
-                not sess._contains_state(item_state):
+                    not sess._contains_state(item_state):
                 sess._save_or_update_state(item_state)
         return item
 
 
             # expunge pending orphans
             item_state = attributes.instance_state(item)
-            if prop.cascade.delete_orphan and \
+            if prop._cascade.delete_orphan and \
                 item_state in sess._new and \
-                prop.mapper._is_orphan(item_state):
+                    prop.mapper._is_orphan(item_state):
                     sess.expunge(item)
 
     def set_(state, newvalue, oldvalue, initiator):
             prop = state.manager.mapper._props[key]
             if newvalue is not None:
                 newvalue_state = attributes.instance_state(newvalue)
-                if prop.cascade.save_update and \
+                if prop._cascade.save_update and \
                     (prop.cascade_backrefs or key == initiator.key) and \
                     not sess._contains_state(newvalue_state):
                     sess._save_or_update_state(newvalue_state)
 
             if oldvalue is not None and \
                 oldvalue is not attributes.PASSIVE_NO_RESULT and \
-                prop.cascade.delete_orphan:
+                prop._cascade.delete_orphan:
                 # possible to reach here with attributes.NEVER_SET ?
                 oldvalue_state = attributes.instance_state(oldvalue)
 

test/orm/test_cascade.py

         class Address(cls.Basic):
             pass
 
+    def test_delete_with_passive_deletes_all(self):
+        User, Address = self.classes.User, self.classes.Address
+        users, addresses = self.tables.users, self.tables.addresses
+
+        mapper(User, users, properties={
+            'addresses': relationship(Address,
+                    passive_deletes="all", cascade="all, delete-orphan")
+            })
+        mapper(Address, addresses)
+        assert_raises_message(
+            sa_exc.ArgumentError,
+            "On User.addresses, can't set passive_deletes='all' "
+            "in conjunction with 'delete' or 'delete-orphan' cascade",
+            configure_mappers
+        )
+
     def test_delete_orphan_without_delete(self):
         User, Address = self.classes.User, self.classes.Address
         users, addresses = self.tables.users, self.tables.addresses
             orm_util.CascadeOptions("all, delete-orphan"),
             frozenset)
 
+    def test_cascade_assignable(self):
+        User, Address = self.classes.User, self.classes.Address
+        users, addresses = self.tables.users, self.tables.addresses
+
+        rel = relationship(Address)
+        eq_(rel.cascade, set(['save-update', 'merge']))
+        rel.cascade = "save-update, merge, expunge"
+        eq_(rel.cascade, set(['save-update', 'merge', 'expunge']))
+
+        mapper(User, users, properties={
+                'addresses': rel
+            })
+        am = mapper(Address, addresses)
+        configure_mappers()
+
+        eq_(rel.cascade, set(['save-update', 'merge', 'expunge']))
+
+        assert ("addresses", User) not in am._delete_orphans
+        rel.cascade = "all, delete, delete-orphan"
+        assert ("addresses", User) in am._delete_orphans
+
+        eq_(rel.cascade,
+            set(['delete', 'delete-orphan', 'expunge', 'merge',
+                    'refresh-expire', 'save-update'])
+            )
+
+
 class O2MCascadeDeleteOrphanTest(fixtures.MappedTest):
     run_inserts = None
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.