Commits

Bryan O'Sullivan committed 48e6902

Fix up the handling of vector clock conflict on monoidal PUT

I had incorrectly assumed that a result of length 1 indicated no
conflict. It can mean either a success or a conflict, depending
on whether the result returned is equal to the result stored.
Tricky, eh?

  • Participants
  • Parent commits 778a7d1

Comments (0)

Files changed (3)

src/Network/Riak/JSON/Monoid.hs

 --
 -- The final value to be stored at the end of any conflict resolution
 -- is returned.
-put :: (FromJSON c, ToJSON c, Monoid c) =>
+put :: (Eq c, FromJSON c, ToJSON c, Monoid c) =>
        Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
     -> IO (c, VClock)
 put = M.put J.put
 -- If a conflict arises, a winner will be chosen using 'mconcat', and
 -- the winner will be stored; this will be repeated until no conflict
 -- occurs.
-put_ :: (FromJSON c, ToJSON c, Monoid c) =>
-       Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
-    -> IO ()
+put_ :: (Eq c, FromJSON c, ToJSON c, Monoid c) =>
+        Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
+     -> IO ()
 put_ = M.put_ J.put
 {-# INLINE put_ #-}
 
 --
 -- For each original value to be stored, the final value that was
 -- stored at the end of any conflict resolution is returned.
-putMany :: (FromJSON c, ToJSON c, Monoid c) =>
+putMany :: (Eq c, FromJSON c, ToJSON c, Monoid c) =>
            Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW
         -> IO [(c, VClock)]
 putMany = M.putMany J.putMany
 -- If any conflicts arise, a winner will be chosen in each case using
 -- 'mconcat', and the winners will be stored; this will be repeated
 -- until no conflicts occur.
-putMany_ :: (FromJSON c, ToJSON c, Monoid c) =>
+putMany_ :: (Eq c, FromJSON c, ToJSON c, Monoid c) =>
             Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW
          -> IO ()
 putMany_ = M.putMany_ J.putMany

src/Network/Riak/Monoid.hs

     , putMany_
     ) where
 
-import Control.Arrow (first, second)
+import Control.Arrow (first)
 import Data.Function (on)
-import Data.List (partition, sortBy)
+import Data.Either (partitionEithers)
+import Data.List (sortBy)
 import Data.Monoid (Monoid(..))
 import Network.Riak.Types.Internal hiding (MessageTag(..))
-import qualified Data.IntMap as M
 
 get :: (Monoid c) =>
        (Connection -> Bucket -> Key -> R -> IO (Maybe ([c], VClock)))
         -> Connection -> Bucket -> [Key] -> R -> IO [Maybe (c, VClock)]
 getMany doGet conn b ks r = map (fmap (first mconcat)) `fmap` doGet conn b ks r
 
-put :: Monoid c => (Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
-                               -> IO ([c], VClock))
+put :: (Eq c, Monoid c) =>
+       (Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
+                   -> IO ([c], VClock))
     -> Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
     -> IO (c, VClock)
 put doPut conn bucket key mvclock0 val0 w dw = do
   let go val mvclock1 = do
         (xs, vclock) <- doPut conn bucket key mvclock1 val w dw
         case xs of
-          [c] -> return (c, vclock)
-          _   -> go (mconcat xs) (Just vclock)
+          []             -> return (val, vclock) -- not observed in the wild
+          [v] | v == val -> return (val, vclock)
+          ys             -> go (mconcat (val:ys)) (Just vclock)
   go val0 mvclock0
 
-put_ :: Monoid c => (Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
-                                -> IO ([c], VClock))
+put_ :: (Eq c, Monoid c) =>
+        (Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
+                    -> IO ([c], VClock))
      -> Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
      -> IO ()
 put_ doPut conn bucket key mvclock0 val0 w dw =
     put doPut conn bucket key mvclock0 val0 w dw >> return ()
 {-# INLINE put_ #-}
 
-putMany :: (Monoid c) =>
+putMany :: (Eq c, Monoid c) =>
            (Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW
                        -> IO [([c], VClock)])
         -> Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW
   go acc [] = return . map snd . sortBy (compare `on` fst) $ acc
   go acc puts = do
     rs <- doPut conn bucket (map snd puts) w dw
-    let (conflicts, ok) = partition isConflict $ zip (map fst puts) rs
-        isConflict (_,(_:_:_,_)) = True
-        isConflict  _            = False
-    go (map (second (first mconcat)) ok++acc) (map asPut conflicts)
-  asPut (i,(c,v)) = (i,(keys M.! i, Just v, mconcat c))
-  keys = M.fromAscList (zip [(0::Int)..] (map fst3 puts0))
-  fst3 (a,_,_) = a
+    let (conflicts, ok) = partitionEithers $ zipWith mush puts rs
+    go (ok++acc) conflicts
+  mush (i,(k,_,c)) (cs,v) =
+      case cs of
+        []           -> Right (i,(c,v)) -- not observed in the wild
+        [x] | x == c -> Right (i,(c,v))
+        _            -> Left (i,(k,Just v, mconcat (c:cs)))
 
-putMany_ :: (Monoid c) =>
-           (Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW
-                       -> IO [([c], VClock)])
-        -> Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW
-        -> IO ()
+putMany_ :: (Eq c, Monoid c) =>
+            (Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW
+                        -> IO [([c], VClock)])
+         -> Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW -> IO ()
 putMany_ doPut conn bucket puts0 w dw =
     putMany doPut conn bucket puts0 w dw >> return ()
 {-# INLINE putMany_ #-}

src/Network/Riak/Value/Monoid.hs

 --
 -- The final value to be stored at the end of any conflict resolution
 -- is returned.
-put :: (Monoid c, V.IsContent c) =>
+put :: (Eq c, Monoid c, V.IsContent c) =>
        Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
     -> IO (c, VClock)
 put = M.put V.put 
 -- If a conflict arises, a winner will be chosen using 'mconcat', and
 -- the winner will be stored; this will be repeated until no conflict
 -- occurs.
-put_ :: (Monoid c, V.IsContent c) =>
+put_ :: (Eq c, Monoid c, V.IsContent c) =>
         Connection -> Bucket -> Key -> Maybe VClock -> c -> W -> DW
      -> IO ()
 put_ = M.put_ V.put 
 --
 -- For each original value to be stored, the final value that was
 -- stored at the end of any conflict resolution is returned.
-putMany :: (Monoid c, V.IsContent c) =>
+putMany :: (Eq c, Monoid c, V.IsContent c) =>
            Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW
         -> IO [(c, VClock)]
 putMany = M.putMany V.putMany
 -- If any conflicts arise, a winner will be chosen in each case using
 -- 'mconcat', and the winners will be stored; this will be repeated
 -- until no conflicts occur.
-putMany_ :: (Monoid c, V.IsContent c) =>
-           Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW
-        -> IO ()
+putMany_ :: (Eq c, Monoid c, V.IsContent c) =>
+            Connection -> Bucket -> [(Key, Maybe VClock, c)] -> W -> DW -> IO ()
 putMany_ = M.putMany_ V.putMany
 {-# INLINE putMany_ #-}