Commits

Bryan O'Sullivan committed d247f08

Implement put, and proper type safety for quorum operations.

Comments (0)

Files changed (7)

 
   exposed-modules:     
     Network.Riak
+    Network.Riak.Content
+    Network.Riak.Types
 
   other-modules:       
     Network.Riak.Message.Code
+    Network.Riak.Types.Internal
     Network.Riak.Message
     Network.Riak.Socket
-    Network.Riak.Types
     Network.Riakclient
     Network.Riakclient.RpbBucketProps
     Network.Riakclient.RpbContent
     Network.Riakclient.RpbSetClientIdReq
   
   build-depends:       
-    base >= 4,
+    base == 4.*,
     binary,
     bytestring,
     containers,
     network >= 2.3,
     protocol-buffers >= 1.8.0,
     protocol-buffers-descriptor >= 1.8.1,
+    pureMD5,
     random
   
   -- needed for the code generated by hprotoc

src/Network/Riak.hs

     , makeClientID
     , ping
     , get
+    , Network.Riak.put
     ) where
 
 import qualified Data.ByteString.Char8 as B
+import Control.Applicative
 import Data.Binary hiding (get)
 import Data.Binary.Put
+import Control.Monad
 import Network.Socket.ByteString.Lazy as L
 import Network.Socket as Socket
+import Network.Riakclient.RpbContent
+import Network.Riakclient.RpbPutReq
+import Network.Riakclient.RpbPutResp
 import qualified Data.ByteString.Lazy.Char8 as L
 import Numeric (showHex)
 import System.Random
 import Network.Riakclient.RpbSetClientIdReq
 import Network.Riak.Message
 import Network.Riak.Types as T
+import Network.Riak.Types.Internal
 import Text.ProtocolBuffers
 import Data.IORef
 
   _ <- recvResponse conn
   return ()
 
-get :: Connection -> T.Bucket -> T.Key -> Maybe Int -> IO (Maybe RpbGetResp)
+get :: Connection -> T.Bucket -> T.Key -> Maybe R
+    -> IO (Maybe (Seq Content, Maybe VClock))
 get conn@Connection{..} bucket key r = do
-  let req = RpbGetReq { bucket = bucket, key = key, r = fromIntegral `fmap` r }
+  let req = RpbGetReq { bucket = bucket, key = key, r = fromQuorum <$> r }
   sendRequest conn req
   resp <- recvResponse conn
   case resp of
     Left msg | msg == Code.getResp -> return Nothing
-    Right (GetResponse r) -> return (Just r)
-    _             -> fail $  "get: invalid response" ++ show r
+    Right (GetResponse RpbGetResp{..}) -> return . Just $ (content, VClock <$> vclock)
+    bad             -> fail $  "get: invalid response " ++ show bad
+
+put :: Connection -> T.Bucket -> T.Key -> Maybe T.VClock
+    -> Content -> Maybe W -> Maybe DW -> Bool
+    -> IO (Seq Content, Maybe VClock)
+put conn@Connection{..} bucket key vclock content w dw returnBody = do
+  let req = RpbPutReq bucket key (fromVClock <$> vclock) content (fromQuorum <$> w) (fromQuorum <$> dw) (Just returnBody)
+  sendRequest conn req
+  resp <- recvResponse_ conn
+  case resp of
+    PutResponse RpbPutResp{..} -> return (content, VClock <$> vclock)
+    bad ->  fail $ "put: invalid response " ++ show bad
 
 setClientID :: Connection -> ClientID -> IO ()
 setClientID conn id = do
   let req = RpbSetClientIdReq { client_id = id }
   sendRequest conn req
-  _ <- recvResponse conn
-  return ()
+  resp <- recvResponse_ conn
+  unless (resp == SetClientIDResponse) .
+    fail $ "setClientID: invalid response " ++ show resp

src/Network/Riak/Content.hs

+{-# LANGUAGE OverloadedStrings #-}
+
+module Network.Riak.Content
+    (
+      unspecified
+    , binary
+    ) where
+
+import qualified Data.ByteString.Lazy.Char8 as L
+import qualified Data.Sequence as Seq
+import Network.Riakclient.RpbContent
+import Network.Riak.Types
+
+unspecified :: Content
+unspecified = RpbContent { value = L.empty
+                         , content_type = Nothing
+                         , charset = Nothing
+                         , content_encoding = Nothing
+                         , vtag = Nothing
+                         , links = Seq.empty
+                         , last_mod = Nothing
+                         , last_mod_usecs = Nothing
+                         , usermeta = Seq.empty
+                         }
+
+binary :: L.ByteString -> Content
+binary bs = unspecified { value = bs
+                        , content_type = Just "application/octet-stream"
+                        }

src/Network/Riak/Message.hs

 import Text.ProtocolBuffers as PB
 import Text.ProtocolBuffers.Get
 import Network.Riakclient.RpbGetResp
+import Network.Riakclient.RpbPutResp
 import Network.Socket.ByteString.Lazy as L
 
 
               | PingResponse
               | SetClientIDResponse
               | GetResponse RpbGetResp
+              | PutResponse RpbPutResp
                 deriving (Eq, Show)
 
 putPingReq :: Put
 putPingReq = putWord32be 1 >> putCode pingReq
 
-putRequest :: (ReflectDescriptor req, Coded req) => req -> Put
+putRequest :: (Coded req, ReflectDescriptor req, Wire req) => req -> Put
 putRequest req = do
   putWord32be (fromIntegral (1 + messageSize req))
   putCode (messageCode req)
   messagePutM req
 
-sendRequest :: (ReflectDescriptor req, Coded req) => Connection -> req -> IO ()
+sendRequest :: (Coded req, ReflectDescriptor req, Wire req) =>
+               Connection -> req -> IO ()
 sendRequest Connection{..} req = L.sendAll connSock . runPut . putRequest $ req
 
 getterMap :: Map.IntMap (Get Response)
               errorResp -:> return ErrorResponse
             , pingResp -:> return PingResponse
             , getResp -:> (GetResponse `fmap` messageGetM)
+            , putResp -:> (PutResponse `fmap` messageGetM)
             , setClientIdResp -:> return SetClientIDResponse
             ]
   where a -:> b = (messageNumber a, b)

src/Network/Riak/Message/Code.hs

 import Data.Binary.Put
 import Network.Riakclient.RpbSetClientIdReq
 import Network.Riakclient.RpbGetReq
+import Network.Riakclient.RpbPutReq
 import Text.ProtocolBuffers
 import Text.ProtocolBuffers.Get
 
 messageNumber (M m) = fromIntegral m
 {-# INLINE messageNumber #-}
 
-class (Wire msg) => Coded msg where
+class Coded msg where
     messageCode :: msg -> MessageCode
 
+instance Coded MessageCode where
+    messageCode m = m
+    {-# INLINE messageCode #-}
+
 instance Coded RpbSetClientIdReq where
     messageCode _ = setClientIdReq
     {-# INLINE messageCode #-}
     messageCode _ = getReq
     {-# INLINE messageCode #-}
 
+instance Coded RpbPutReq where
+    messageCode _ = putReq
+    {-# INLINE messageCode #-}
+
 putCode :: MessageCode -> Put
 putCode (M m) = putWord8 m
 {-# INLINE putCode #-}

src/Network/Riak/Types.hs

     (
       ClientID
     , Client(..)
+    , Content
     , Connection(..)
     , Bucket
     , Key
+    , Quorum
+    , Q(..)
+    , RW(..)
+    , R(..)
+    , W(..)
+    , DW(..)
+    , VClock
+    , fromQuorum
+    , toQuorum
     ) where
 
 import qualified Data.ByteString as B
 import qualified Data.ByteString.Lazy as L
 import Data.IORef (IORef)
 import Network.Socket
+import Data.Word
+import Network.Riakclient.RpbContent
+import Network.Riak.Types.Internal
     
 type ClientID = L.ByteString
 
 type Bucket = L.ByteString
 
 type Key = L.ByteString
+
+data Q = Default
+       | All
+       | Quorum
+       | One
+         deriving (Eq, Enum, Show)
+
+newtype RW = RW Q deriving (Eq, Show)
+newtype R  = R Q deriving (Eq, Show)
+newtype W  = W Q deriving (Eq, Show)
+newtype DW = DW Q deriving (Eq, Show)
+
+type Content = RpbContent
+
+fromQ :: Q -> Word32
+fromQ Default = 4294967291
+fromQ All     = 4294967292
+fromQ Quorum  = 4294967293
+fromQ One     = 4294967294
+{-# INLINE fromQ #-}
+
+toQ :: Word32 -> Maybe Q
+toQ 4294967291 = Just Default
+toQ 4294967292 = Just All
+toQ 4294967293 = Just Quorum
+toQ 4294967294 = Just One
+toQ _          = Nothing
+{-# INLINE toQ #-}
+
+class Quorum q where
+    fromQuorum :: q -> Word32
+    toQuorum :: Word32 -> Maybe q
+
+instance Quorum Q where
+    fromQuorum = fromQ
+    {-# INLINE fromQuorum #-}
+
+    toQuorum = toQ
+    {-# INLINE toQuorum #-}
+
+instance Quorum R where
+    fromQuorum (R q) = fromQ q
+    {-# INLINE fromQuorum #-}
+
+    toQuorum = fmap R . toQ
+    {-# INLINE toQuorum #-}
+
+instance Quorum W where
+    fromQuorum (W q) = fromQ q
+    {-# INLINE fromQuorum #-}
+
+    toQuorum = fmap W . toQ
+    {-# INLINE toQuorum #-}
+
+instance Quorum RW where
+    fromQuorum (RW q) = fromQ q
+    {-# INLINE fromQuorum #-}
+
+    toQuorum = fmap RW . toQ
+    {-# INLINE toQuorum #-}
+
+instance Quorum DW where
+    fromQuorum (DW q) = fromQ q
+    {-# INLINE fromQuorum #-}
+
+    toQuorum = fmap DW . toQ
+    {-# INLINE toQuorum #-}

src/Network/Riak/Types/Internal.hs

+module Network.Riak.Types.Internal
+    (
+     VClock(..)
+    ) where
+
+import qualified Data.ByteString.Lazy as L
+import Data.Digest.Pure.MD5 (md5)
+
+newtype VClock = VClock {
+      fromVClock :: L.ByteString
+    } deriving (Eq)
+
+instance Show VClock where
+    show (VClock s) = "VClock " ++ show (md5 s)