Commits

Armin Rigo  committed 4bdb916

Progress. It should more or less work, except of course it doesn't.

  • Participants
  • Parent commits 827abc6

Comments (0)

Files changed (7)

 #  define Du_TRACK_REFS
 #endif
 
+#ifdef Du_AME
+typedef long owner_version_t;
+#endif
+
 
 typedef struct _DuObject {
     int ob_refcnt;
 #ifdef Du_TRACK_REFS
     struct _DuObject *ob_debug_prev, *ob_debug_next;
 #endif
+#ifdef Du_AME
+    owner_version_t ob_version;
+#endif
 } DuObject;
 
 #ifdef Du_TRACK_REFS
         Du_FatalError("Du_Initialize() must be called first");
     ob->ob_debug_prev = &chainedlist;
     ob->ob_debug_next = chainedlist.ob_debug_next;
+#ifdef Du_AME
+    ob->ob_version = 0;
+#endif
     chainedlist.ob_debug_next = ob;
     ob->ob_debug_next->ob_debug_prev = ob;
 }
 struct tx_descriptor {
     jmp_buf *setjmp_buf;
     int in_transaction;
-    unsigned long start_time;
+    owner_version_t start_time;
+    owner_version_t end_time;
+    owner_version_t my_lock_word;
     struct OrecList reads;
     struct RedoLog redolog;   /* last item, because it's the biggest one */
 };
 
 /* global_timestamp contains in its lowest bit a flag equal to 1
    if there is an inevitable transaction running */
-static volatile unsigned long global_timestamp = 2;
+static volatile owner_version_t global_timestamp = 2;
 static __thread struct tx_descriptor *thread_descriptor;
 
 
   redolog_clear(&d->redolog);
 }
 
-static void tx_spinloop()
+static void tx_spinloop(void)
 {
     spinloop();
 }
 
-static void tx_abort()
-{
-    abort();
-}
+static void tx_abort(void);
 
 static int is_inevitable(struct tx_descriptor *d)
 {
     return 0;
 }
 
+/*** run the redo log to commit a transaction, and release the locks */
+static void tx_redo(struct tx_descriptor *d)
+{
+    owner_version_t newver = d->end_time;
+    wlog_t *item;
+    REDOLOG_LOOP_FORWARD(d->redolog, item)
+    {
+        DuObject *globalobj = item->addr;
+        DuObject *localobj = item->val;
+        long size = localobj->ob_type->dt_size;
+        assert(size >= sizeof(DuObject));
+        memcpy(((char *)globalobj) + sizeof(DuObject),
+               ((char *)localobj) + sizeof(DuObject),
+               size - sizeof(DuObject));
+        CFENCE;
+        globalobj->ob_version = newver;
+    } REDOLOG_LOOP_END;
+}
+
+/*** on abort, release locks and restore the old version number. */
+static void releaseAndRevertLocks(struct tx_descriptor *d)
+{
+  wlog_t *item;
+  REDOLOG_LOOP_FORWARD(d->redolog, item)
+  {
+      if (item->p != -1) {
+          DuObject* o = item->addr;
+          o->ob_version = item->p;
+      }
+  } REDOLOG_LOOP_END;
+}
+
+/*** lock all locations */
+static void acquireLocks(struct tx_descriptor *d)
+{
+    wlog_t *item;
+    // try to lock every location in the write set
+    REDOLOG_LOOP_BACKWARD(d->redolog, item)
+    {
+        DuObject* o = item->addr;
+        owner_version_t ovt;
+
+    retry:
+        ovt = o->ob_version;
+
+        // if object not locked, lock it
+        //
+        // NB: if ovt > start time, we may introduce inconsistent reads.
+        // Since most writes are also reads, we'll just abort under this
+        // condition.  This can introduce false conflicts
+        if (!IS_LOCKED_OR_NEWER(ovt, d->start_time)) {
+            if (!bool_cas(&o->ob_version, ovt, d->my_lock_word)) {
+                CFENCE;
+                goto retry;
+            }
+            // save old version to item->p.  Now we hold the lock.
+            item->p = ovt;
+        }
+        // else if the location is too recent...
+        else if (!IS_LOCKED(ovt))
+            tx_abort();
+        // else it is locked: check it's not by me (no duplicates in redolog)
+        else {
+            assert(ovt != d->my_lock_word);
+            // we can either abort or spinloop.  Because we are at the end of
+            // the transaction we might try to spinloop, even though after the
+            // lock is released the ovt will be very recent, possibly greater
+            // than d->start_time.  It is necessary to spinloop in case we are
+            // inevitable, so use that as a criteria.  Another solution to
+            // avoid deadlocks would be to sort the order in which we take the
+            // locks.
+            if (is_inevitable(d))
+                tx_spinloop();
+            else
+                tx_abort();
+            goto retry;
+        }
+    } REDOLOG_LOOP_END;
+}
+
 /**
  * fast-path validation, assuming that I don't hold locks.
  */
 static void validate_fast(struct tx_descriptor *d)
 {
     int i;
-    unsigned long ovt;
+    owner_version_t ovt;
     assert(!is_inevitable(d));
     for (i=0; i<d->reads.size; i++) {
      retry:
-        ovt = _Du_AME_GET_VERSION(d->reads.items[i]);
+        ovt = d->reads.items[i]->ob_version;
         if (IS_LOCKED_OR_NEWER(ovt, d->start_time)) {
             // If locked, we wait until it becomes unlocked.  The chances are
             // that it will then have a very recent start_time, likely greater
     }
 }
 
+/**
+ * validate the read set by making sure that all orecs that we've read have
+ * timestamps at least as old as our start time, unless we locked those orecs.
+ */
+static void validate(struct tx_descriptor *d)
+{
+    int i;
+    owner_version_t ovt;
+    assert(!is_inevitable(d));
+    for (i=0; i<d->reads.size; i++) {
+        ovt = d->reads.items[i]->ob_version;
+        if (IS_LOCKED_OR_NEWER(ovt, d->start_time)) {
+            if (!IS_LOCKED(ovt)) {
+                // if unlocked and newer than start time, abort
+                tx_abort();
+            }
+            else {
+                // if locked and not by me, abort
+                if (ovt != d->my_lock_word)
+                    tx_abort();
+            }
+        }
+    }
+}
+
+static void tx_abort(void)
+{
+    struct tx_descriptor *d = thread_descriptor;
+    assert(!is_inevitable(d));
+    // release the locks and restore version numbers
+    releaseAndRevertLocks(d);
+    // reset all lists
+    common_cleanup(d);
+
+    tx_spinloop();
+    longjmp(*d->setjmp_buf, 1);
+}
+
 
 DuObject *_Du_AME_read_from_global(DuObject *glob)
 {
 
  not_found:
  retry:;
-    long ovt = _Du_AME_GET_VERSION(glob);
+    owner_version_t ovt = glob->ob_version;
     if (IS_LOCKED_OR_NEWER(ovt, d->start_time)) {
         if (IS_LOCKED(ovt)) {
             tx_spinloop();
             goto retry;
         }
         /* else this location is too new, scale forward */
-        unsigned long newts = global_timestamp & ~1;
+        owner_version_t newts = global_timestamp & ~1;
         validate_fast(d);
         d->start_time = newts;
     }
     assert(thread_descriptor == NULL);
     struct tx_descriptor *d = malloc(sizeof(struct tx_descriptor));
     memset(d, 0, sizeof(struct tx_descriptor));
+
+    /* initialize 'my_lock_word' to be a unique negative number */
+    d->my_lock_word = (owner_version_t)d;
+    if (!IS_LOCKED(d->my_lock_word))
+        d->my_lock_word = ~d->my_lock_word;
+    assert(IS_LOCKED(d->my_lock_word));
+
     common_cleanup(d);
     thread_descriptor = d;
 }
 
     // if I don't have writes, I'm committed
     if (!redolog_any_entry(&d->redolog)) {
+        if (is_inevitable(d))
+            abort();
         common_cleanup(d);
         return;
     }
 
-    abort();
+    // acquire locks
+    acquireLocks(d);
+
+    if (is_inevitable(d)) {
+        abort();
+    }
+    else {
+        while (1) {
+            owner_version_t expected = global_timestamp;
+            if (expected & 1) {
+                abort();
+            }
+            if (bool_cas(&global_timestamp, expected, expected + 2)) {
+                d->end_time = expected + 2;
+                break;
+            }
+        }
+
+        // validate (but skip validation if nobody else committed)
+        if (d->end_time != (d->start_time + 2))
+            validate(d);
+
+        // run the redo log, and release the locks
+        tx_redo(d);
+    }
+
+    common_cleanup(d);
 }
 #include "../duhton.h"
 
 
-#define _Du_AME_GET_VERSION(ob)  ((long)((ob)->ob_debug_next))
-
 DuObject *_Du_AME_read_from_global(DuObject *glob);
 void _Du_AME_oreclist_insert(DuObject *glob);
 DuObject *_Du_AME_writebarrier(DuObject *glob);
 #define Du_AME_READ_START(ob)                                           \
 while (1) {                                                             \
     DuObject *__du_ame_ob = (DuObject *)(ob);                           \
-    long __du_ame_version = 0;                                          \
+    owner_version_t __du_ame_version = 0;                               \
     if (Du_AME_GLOBAL(__du_ame_ob)) {                                   \
         __du_ame_ob = _Du_AME_read_from_global(__du_ame_ob);            \
         (ob) = (typeof(ob))__du_ame_ob;                                 \
-        __du_ame_version = _Du_AME_GET_VERSION(__du_ame_ob);            \
+        __du_ame_version = __du_ame_ob->ob_version;                     \
         CFENCE;                                                         \
     }
 
     __du_ame_ob = (DuObject *)(ob);                                     \
     if (Du_AME_GLOBAL(__du_ame_ob)) {                                   \
         CFENCE;                                                         \
-        if (_Du_AME_GET_VERSION(__du_ame_ob) != __du_ame_version)       \
+        if (__du_ame_ob->ob_version != __du_ame_version)                \
             continue;                                                   \
         _Du_AME_oreclist_insert(__du_ame_ob);                           \
     }                                                                   \

File stm/atomic_ops.h

 #else
 /* x86 (32 bits and 64 bits) */
 static inline _Bool
-bool_cas(volatile unsigned long* ptr, unsigned long old, unsigned long _new)
+bool_cas(volatile long* ptr, long old, long _new)
 {
-    unsigned long prev;
+    long prev;
     asm volatile("lock;"
 #if defined(__amd64__)
                  "cmpxchgq %1, %2;"
 typedef struct {
   DuObject *addr;
   DuObject *val;
-  unsigned long p;   // the previous version number (if locked)
+  owner_version_t p;   // the previous version number (if locked)
 } wlog_t;
 
 typedef struct {
 
 struct OrecList {
   long size, alloc;
-  unsigned long locked;
+  long locked;
   DuObject **items;
 };
 

File stm/thread.c

 #include "fifo.h"
 
 
-#define NUMTHREADS    4
+#define NUMTHREADS    2
 
 
 static int num_waiting_threads;