Commits

Jean-Tiare Le Bigot committed 01b2310

Major transaction engine rewrite.

* added support for transaction operating on multiple targets
* transient flag can now be set on a per instance basis
* transaction status is status is always persisted, even on error if not transient
* transaction now embeds a minimal schema that should be suitable for most applications

Attempts to preserve backward compatibility
All current tests passes
Documentation is up to date

next: write tests specific to features introduced in this revision.

  • Participants
  • Parent commits 4902d6a

Comments (0)

Files changed (2)

dynamodb_mapper/tests/test_transactions.py

             raise InsufficientEnergyError(target.energy, self.energy)
         target.energy = new_energy
 
+
 class TransientUserEnergyTransaction(UserEnergyTransaction):
     """Exactly like UserEnergyTransaction, but transient (never saved to the DB)."""
     transient = True
 
         self.assertEquals(m_save.call_count, 0)
 
+    @mock.patch("dynamodb_mapper.transactions.Transaction.save")
     @mock.patch.object(User, "save")
     @mock.patch("dynamodb_mapper.model.DynamoDBModel.get")
-    def test_target_not_found(self, m_get, m_user_save):
+    def test_target_not_found(self, m_get, m_user_save, m_transaction_save):
         m_get.side_effect = DynamoDBKeyNotFoundError("ONOZ!")
         t = UserEnergyTransaction.from_dict({"user_id": USER_ID, "energy": 10})
 
         self.assertRaises(TargetNotFoundError, t.commit)
 
         self.assertEquals(m_user_save.call_count, 0)
+        m_transaction_save.assert_called()
 
+    @mock.patch("dynamodb_mapper.transactions.Transaction.save")
     @mock.patch.object(User, "save")
     @mock.patch.object(User, "get")
-    def test_insufficient_energy(self, m_user_get, m_user_save):
+    def test_insufficient_energy(self, m_user_get, m_user_save, m_transaction_save):
         m_user_instance = self._get_default_user()
         m_user_get.return_value = m_user_instance
         t = UserEnergyTransaction.from_dict({"user_id": USER_ID, "energy": -ENERGY * 2})
 
         self.assertEquals(m_user_instance.energy, ENERGY)
         self.assertEquals(m_user_save.call_count, 0)
+        m_transaction_save.assert_called()
 
     @mock.patch("dynamodb_mapper.transactions.Transaction.save")
     @mock.patch.object(User, "save")
         m_transaction_save.assert_called()
         self.assertEqual(m_user_save.call_count, failed_tries)
 
+    @mock.patch("dynamodb_mapper.transactions.Transaction.save")
     @mock.patch.object(User, "save")
     @mock.patch.object(User, "get")
-    def test_max_retries_exceeded(self, m_user_get, m_user_save):
+    def test_max_retries_exceeded(self, m_user_get, m_user_save, m_transaction_save):
         # Return a clean user every time -- we will be retrying a lot.
         m_user_get.side_effect = lambda *args, **kw: self._get_default_user()
         m_user_save.side_effect = ExpectedValueError()
 
         self.assertRaises(MaxRetriesExceededError, t.commit)
         self.assertEquals(m_user_save.call_count, Transaction.MAX_RETRIES)
+        m_transaction_save.assert_called()

dynamodb_mapper/transactions.py

 
     This class gracefully handles concurrent modifications and auto-retries but
     embeds no tool to rollback at the moment.
+
+    Transactions status may be persisted for tracability, further analysis...
+    for this purpose, a minimal schema is embedded in this base class. When
+    deriving, you MUST keep
+    * datetime field as rangekey
+    * status field
+    The hash key field may be changed to pick a ore relevant name or change its
+    type. In any case, you are responsible of setting its value. For example, if
+    collecting rewards for a player, you may wish to keep track of related
+    transactions by user_id hence set requester_id to user_id
+
+    Deriving class MUST set field __table__
+
     """
+
+    __hash_key__ = "requester_id"
     __range_key__ = "datetime"
+
+    __schema__ = {
+        "requester_id": int,
+        "datetime": datetime,
+        "status": unicode #IN("pending", "running", "done")
+    }
+
     # Transient transactions (with this flag set to True) are not saved in the
-    # database, and are as a result write-only.
+    # database, and are as a result write-only. This value is defined on the
+    # class level bu may be redefined on a per instance basis.
     transient = False
-
+    # Maximum attempts. Each attempt consumes write credits
     MAX_RETRIES = 100
 
     def _setup(self):
         """
         pass
 
-    def _get_target(self):
-        """Fetch the object on which this transaction is supposed to operate
-        (e.g. a User instance for UserResourceTransactions) from the DB and
-        return it.
+    def _get_transactors(self):
+        """Fetch a list of targets (getter, setter) tuples. The transaction
+        engine will walk the list. For each tuple, the getter and the setter are
+        called successively until this step of the transaction succeed or exhaust
+        the MAX_RETRIES.
 
-        It is important that this method actually connect to the database and
-        retrieve a clean, up-to-date version of the object -- because it will
-        be called repeatedly if conditional updates fail due to the target
-        object having changed.
+        * getter: Fetch the object on which this transaction is supposed to operate
+            (e.g. a User instance for UserResourceTransactions) from the DB and
+            return it.
+            It is important that this method actually connect to the database and
+            retrieve a clean, up-to-date version of the object -- because it will
+            be called repeatedly if conditional updates fail due to the target
+            object having changed.
+            The getter takes no argument and returns a DBModel instance
+
+        * setter: Applyies the transaction to the target, modifying it in-place.
+            Does *not* attempt to save the target or the transaction to the DB.
+            The setter takes a DBModel instance as argument. Its return value is
+            ignored
+
+        The list is walked from 0 to len(transactors)-1. Order may matter.
 
         :raise TargetNotFoundError: If the target doesn't exist in the DB.
         """
+        #FIXME: compat method
+        return [(self._get_target, self._alter_target)]
+
+    def _get_target(self):
+        #FIXME: legacy
         pass
 
     def _alter_target(self, target):
-        """Apply the transaction to the target, modifying it in-place.
-
-        Does *not* attempt to save the target or the transaction to the DB.
-        """
+        #FIXME: legacy
         pass
 
-    def _apply_and_save_target(self):
+    def _apply_and_save_target(self, getter, setter):
         """Apply the Transaction and attempt to save its target (but not
         the Transaction itself). May be called repeatedly until it stops
         raising :exc:`ExpectedValueError`.
 
+        Will succeed iff no attributes of the object returned by getter has been
+        modified before ou save method to prevent accidental overwrites.
+
+        :param getter: getter as defined in :py:meth:`_get_transactors`
+        :param setter: setter as defined in :py:meth:`_get_transactors`
+
         :raise ExpectedValueError: If the target is changed by an external
             source (other than the Transaction) between its retrieval from
             the DB and the save attempt.
         """
-        target = self._get_target()
-
-        # We want to redo the transaction if *anything* in the user
-        # changed, not just the target attribute (no accidental overwrites).
+        # load base object
+        target = getter()
         old_values = target.to_db_dict()
 
-        self._alter_target(target)
+        # edit and attempt to save it
+        setter(target)
         target.save(expected_values=old_values)
 
     def _assign_datetime_and_save(self):
         self.datetime = datetime.now(utc_tz)
         self.save(allow_overwrite=False)
 
-    def _retry(self, fn, exc_class):
-        """Call ``fn`` repeatedly, until it stops raising ``exc_class`` or
-        it has been called ``MAX_RETRIES`` times (in which case
+    def _retry(self, fn, exc_class, *args):
+        """Call ``fn`` repeatedly with ``*args``, until it stops raising
+        ``exc_class`` or it has been called ``MAX_RETRIES`` times (in which case
         :exc:`MaxRetriesExceededError` is raised).
 
         :param fn: The callable to retry calling.
-
         :param exc_class: An exception class (or tuple thereof) that, if raised
             by fn, means it has failed and should be called again.
             *Any other exception will propagate normally, cancelling the
             auto-retry process.*
+        :param *args: Optional arguments to pass to ``fn``
         """
         tries = 0
         while tries < self.MAX_RETRIES:
             tries += 1
             try:
-                fn()
-                # Nothing was raised: we're done!
+                fn(*args)
+                # Nothing was raised: we're done !
                 break
             except exc_class as e:
                 log.debug(
         else:
             raise MaxRetriesExceededError()
 
+    def commit(self):
+        """ Run the transaction and, if needed, store its states to the database
+
+            - set up preconditions and parameters (:meth:`_setup` -- only called
+              once no matter what).
+            - fetch all transaction steps (:meth:`_get_transactors`).
+            - for each transaction :
+                - fetch the target object from the DB.
+                - modify the target object according to the transaction's parameters.
+                - save the (modified) target to the DB
+            - save the transaction to the DB
+
+        Each transation may be retried up to ``MAX_RETRIES`` times automatically.
+        commit uses conditional writes to avoid overwriting data in the case of
+        concurrent transactions on the same target (see :meth:`_retry`).
+        """
+        self.status = "pending"
+
+        self._setup()
+        transactors = self._get_transactors()
+
+        try:
+            self.status = "running"
+            for transactor in transactors:
+                self._retry(self._apply_and_save_target, ExpectedValueError, *transactor)
+            self.status = "done"
+        finally:
+            # Always (attempt to) save transaction status
+            self._retry(self._assign_datetime_and_save, OverwriteError)
+
     def save(self, allow_overwrite=True, expected_values=None):
         """If the transaction is transient (``transient = True``),
         do nothing.
 
         If the transaction is persistent (``transient = False``), save it to
         the DB, as :meth:`DynamoDBModel.save`.
+
+        Note: this method is called automatically from ``commit``. You may but do
+        not need to call it explicitely.
         """
-        cls = type(self)
-        if cls.transient:
+        if self.transient:
             log.debug(
                 "class=%s: Transient transaction class, ignoring save attempt.",
-                cls)
+                type(self))
         else:
             super(Transaction, self).save(
                 allow_overwrite=allow_overwrite, expected_values=expected_values)
 
-    def commit(self):
-        """Commit the transaction:
-
-            - set up preconditions and parameters (:meth:`_setup` -- only called
-              once no matter what).
-            - fetch the target object in the DB (:meth:`_get_target`).
-            - modify the target object according to the transaction's parameters
-              (:meth:`_alter_target`).
-            - save the (modified) target to the DB
-            - save the transaction to the DB
-
-        commit knows how to auto-retry, and uses conditional writes to avoid
-        overwriting data in the case of concurrent transactions on the same
-        target (see :meth:`_retry`).
-        """
-        self._setup()
-
-        self._retry(self._apply_and_save_target, ExpectedValueError)
-        self._retry(self._assign_datetime_and_save, OverwriteError)