Commits

angri  committed 0cea7e7

better exceptions on hitting limits during moving nodes

  • Participants
  • Parent commits c151901

Comments (0)

Files changed (3)

File doc/index.rst

   :meth:`~MPClassManager.move_subtree_to_bottom` -- for moving nodes based
   on specified new parent node.
 
+The last four methods raise :exc:`TooManyChildrenError` if new parent node
+already has ``36 ** steplen`` children and can not accept one more child
+node. They also raise :exc:`MovingToDescendantError` if a new parent node
+is one of descendants of moved node.
+
 
 -------
 Support
 .. autoexception:: PathOverflowError
 .. autoexception:: TooManyChildrenError
 .. autoexception:: PathTooDeepError
+.. autoexception:: MovingToDescendantError
 
 .. autoclass:: MPManager(table, parent_id_field=None, path_field='mp_path', depth_field='mp_depth', tree_id_field='mp_tree_id', steplen=3, instance_manager_key='_mp_instance_manager')
     :members: __get__

File sqlamp/__init__.py

     "Maximum depth of nesting limit is exceeded. Raised during flush."
 
 class MovingToDescendantError(RuntimeError):
-    pass
+    """
+    An attempt to move a tree inside of one of its descendants was made.
+
+    See `moving nodes`_.
+    """
 
 
 def inc_path(path, steplen):
 
         See also general notes on `moving nodes`_.
         """
-        self._move_subtree_by_parent('top', session, node_id, new_parent_id)
+        opts = self._mp_opts
+        old_path, old_depth, old_tree_id, \
+            parents_parent_id, parents_path, parents_depth, new_tree_id \
+            = self._prepare_to_move_subtree(session, node_id, new_parent_id)
+        new_depth = parents_depth + 1
+
+        new_path = parents_path + ALPHABET[0] * opts.steplen
+        self._pull_nodes('down', session, new_tree_id, new_path, new_depth)
+        [[old_path]] = session.execute(
+            sqlalchemy.select([opts.path_field], opts.pk_field == node_id)
+        )
+        self._reparent(session, node_id, new_parent_id, new_tree_id, new_path,
+                       new_depth, old_tree_id, old_path, old_depth)
 
     def move_subtree_to_bottom(self, session, node_id, new_parent_id):
         """
         The same as :meth:`move_subtree_before` but makes target tree/subtree
         root the last child of anchor node.
         """
-        self._move_subtree_by_parent('bottom', session, node_id, new_parent_id)
-
-    def _move_subtree_by_parent(self, top_or_bottom, session,
-                                node_id, new_parent_id):
         opts = self._mp_opts
         old_path, old_depth, old_tree_id, \
             parents_parent_id, parents_path, parents_depth, new_tree_id \
             = self._prepare_to_move_subtree(session, node_id, new_parent_id)
-
         new_depth = parents_depth + 1
 
-        if top_or_bottom == 'top':
+        children_filter = opts.filter_children(new_tree_id, parents_path,
+                                               parents_depth)
+        last_child_path = session.execute(
+            sqlalchemy.select([opts.path_field], children_filter) \
+                      .order_by(opts.tree_id_field.desc(),
+                                opts.path_field.desc()) \
+                      .limit(1)
+        ).fetchall()
+        if not last_child_path:
             new_path = parents_path + ALPHABET[0] * opts.steplen
-            self._pull_nodes('down', session, new_tree_id, new_path, new_depth)
-            [[old_path]] = session.execute(
-                sqlalchemy.select([opts.path_field], opts.pk_field == node_id)
-            )
         else:
-            assert top_or_bottom == 'bottom'
-            children_filter = opts.filter_children(new_tree_id, parents_path,
-                                                   parents_depth)
-            last_child_path = session.execute(
-                sqlalchemy.select([opts.path_field], children_filter) \
-                          .order_by(opts.tree_id_field.desc(),
-                                    opts.path_field.desc()) \
-                          .limit(1)
-            ).fetchall()
-            if not last_child_path:
-                new_path = parents_path + ALPHABET[0] * opts.steplen
-            else:
-                [[last_child_path]] = last_child_path
+            [[last_child_path]] = last_child_path
+            try:
                 new_path = inc_path(last_child_path, opts.steplen)
-
+            except PathOverflowError:
+                raise TooManyChildrenError()
         self._reparent(session, node_id, new_parent_id, new_tree_id, new_path,
                        new_depth, old_tree_id, old_path, old_depth)
 
             if not nodes:
                 return
             _, lastnodepath = nodes[0]
-            prev_path = inc_path(lastnodepath, opts.steplen)
+            try:
+                prev_path = inc_path(lastnodepath, opts.steplen)
+            except PathOverflowError:
+                raise TooManyChildrenError()
         else:
             assert up_or_down == 'up'
             prev_path = from_path

File tests/functional-tests.py

         ])
 
 
+class MoveNodesLimitsTestCase(_BaseFunctionalTestCase):
+    def setUp(self):
+        super(MoveNodesLimitsTestCase, self).setUp()
+        self.tbl = sqlalchemy.Table('tbl3', metadata,
+            sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
+            sqlalchemy.Column('pid', sqlalchemy.ForeignKey('tbl3.id'))
+        )
+        class Node(Cls):
+            mp = sqlamp.MPManager(self.tbl, steplen=1)
+        rel = sqlalchemy.orm.relation(Node, remote_side=[self.tbl.c.id])
+        sqlalchemy.orm.mapper(Node, self.tbl, extension=[Node.mp],
+                              properties={'parent': rel})
+        self.Node = Node
+        self.tbl.create()
+
+        # create two trees with roots packed of children up to limit:
+        self.r1 = self.Node()
+        self.r2 = self.Node()
+        for root in (self.r1, self.r2):
+            self.sess.add(root)
+            self.sess.flush()
+            for i in xrange(len(sqlamp.ALPHABET)):
+                self.sess.add(self.Node(pid=root.id))
+        self.sess.commit()
+        self.sess.expire_all()
+
+    def tearDown(self):
+        super(MoveNodesLimitsTestCase, self).tearDown()
+        self.tbl.drop()
+        metadata.remove(self.tbl)
+
+    def test_move_by_sibling(self):
+        query = sqlalchemy.select([self.tbl]).order_by(self.tbl.c.id)
+        data_before = query.execute().fetchall()
+
+        c12 = self.r1.mp.query_children()[1]
+        self.assertRaises(sqlamp.TooManyChildrenError,
+                          self.Node.mp.move_subtree_before,
+                          self.sess, self.r2.id, c12.id)
+        self.assertRaises(sqlamp.TooManyChildrenError,
+                          self.Node.mp.move_subtree_after,
+                          self.sess, self.r2.id, c12.id)
+
+        # nothing should have been changed so far
+        data_after = query.execute().fetchall()
+        self.assertEqual(data_before, data_after)
+
+        # free some space
+        self.Node.mp.delete_subtree(self.sess, c12.id)
+        self.sess.expire_all()
+
+        # now we should be able to insert something
+        c13 = self.r1.mp.query_children()[1]
+        self.Node.mp.move_subtree_after(self.sess, self.r2.id, c13.id)
+
+    def test_move_by_parent(self):
+        query = sqlalchemy.select([self.tbl]).order_by(self.tbl.c.id)
+        data_before = query.execute().fetchall()
+
+        self.assertRaises(sqlamp.TooManyChildrenError,
+                          self.Node.mp.move_subtree_to_top,
+                          self.sess, self.r2.id, self.r1.id)
+        self.assertRaises(sqlamp.TooManyChildrenError,
+                          self.Node.mp.move_subtree_to_bottom,
+                          self.sess, self.r1.id, self.r2.id)
+
+        data_after = query.execute().fetchall()
+        self.assertEqual(data_before, data_after)
+
+        c19 = self.r1.mp.query_children()[8]
+        self.Node.mp.delete_subtree(self.sess, c19.id)
+        self.sess.expire_all()
+
+        self.Node.mp.move_subtree_to_top(self.sess, self.r2.id, self.r1.id)
+
+
 def get_suite():
     loader = unittest.TestLoader()
     suite = unittest.TestSuite()