Commits

Bryan O'Sullivan  committed e1cd5c3

Add pipeline and standalone request support.

  • Participants
  • Parent commits 6f5ba66

Comments (0)

Files changed (8)

     Network.Riak
     Network.Riak.Connection
     Network.Riak.Content
+    Network.Riak.Pipeline
+    Network.Riak.Request
     Network.Riak.Simple
     Network.Riak.Types
     Network.Riak.Protocol.ServerInfo

File src/Network/Riak/Content.hs

 
 module Network.Riak.Content
     (
-      unspecified
+      Content(..)
+    , Link.Link(..)
+    , empty
     , binary
+    , link
     ) where
 
 import qualified Data.ByteString.Lazy.Char8 as L
 import qualified Data.Sequence as Seq
 import Network.Riak.Protocol.Content (Content(..))
+import qualified Network.Riak.Protocol.Link as Link
+import Network.Riak.Types.Internal
 
-unspecified :: Content
-unspecified = Content { value = L.empty
-                      , content_type = Nothing
-                      , charset = Nothing
-                      , content_encoding = Nothing
-                      , vtag = Nothing
-                      , links = Seq.empty
-                      , last_mod = Nothing
-                      , last_mod_usecs = Nothing
-                      , usermeta = Seq.empty
-                      }
+link :: Bucket -> Key -> Tag -> Link.Link
+link bucket key tag = Link.Link (Just bucket) (Just key) (Just tag)
+{-# INLINE link #-}
+
+empty :: Content
+empty = Content { value = L.empty
+                , content_type = Nothing
+                , charset = Nothing
+                , content_encoding = Nothing
+                , vtag = Nothing
+                , links = Seq.empty
+                , last_mod = Nothing
+                , last_mod_usecs = Nothing
+                , usermeta = Seq.empty
+                }
 
 binary :: L.ByteString -> Content
-binary bs = unspecified { value = bs
-                        , content_type = Just "application/octet-stream"
-                        }
+binary bs = empty { value = bs
+                  , content_type = Just "application/octet-stream"
+                  }

File src/Network/Riak/Pipeline.hs

+{-# LANGUAGE RecordWildCards #-}
+
+module Network.Riak.Pipeline
+    (
+      pipeline
+    , pipeline_
+    ) where
+
+import Control.Monad
+import Network.Riak.Types.Internal
+import Network.Riak.Connection.Internal
+import Data.Binary.Put (runPut)
+import qualified Network.Socket.ByteString.Lazy as L
+import Control.Concurrent.Chan
+import Control.Concurrent
+
+pipeline :: (Request req, Response resp) => Connection -> [req] -> IO [resp]
+pipeline conn@Connection{..} reqs = do
+  ch <- newChan
+  let numReqs = length reqs
+  _ <- forkIO . replicateM_ numReqs $
+       writeChan ch =<< recvResponse conn
+  L.sendAll connSock . runPut . mapM_ putRequest $ reqs
+  replicateM numReqs $ readChan ch
+
+pipeline_ :: (Request req) => Connection -> [req] -> IO ()
+pipeline_ conn@Connection{..} reqs = do
+  done <- newEmptyMVar
+  _ <- forkIO $ do
+         forM_ reqs (recvResponse_ conn . expectedResponse)
+         putMVar done ()
+  L.sendAll connSock . runPut . mapM_ putRequest $ reqs
+  takeMVar done

File src/Network/Riak/Request.hs

+{-# LANGUAGE OverloadedStrings #-}
+
+module Network.Riak.Request
+    (
+    -- * Connection management
+      PingRequest
+    , ping
+    , GetClientIDRequest
+    , getClientID
+    , GetServerInfoRequest
+    , getServerInfo
+    -- * Data management
+    , Get.GetRequest
+    , get
+    , Put.PutRequest
+    , put
+    , Del.DeleteRequest
+    , delete
+    -- * Metadata
+    , ListBucketsRequest
+    , listBuckets
+    , Keys.ListKeysRequest
+    , listKeys
+    , GetBucket.GetBucketRequest
+    , getBucket
+    , SetBucket.SetBucketRequest
+    , setBucket
+    -- * Map/reduce
+    , MapReduceRequest
+    , mapReduce
+    ) where
+
+import Control.Applicative ((<$>))
+import Network.Riak.Protocol.PingRequest
+import qualified Network.Riak.Protocol.DeleteRequest as Del
+import Network.Riak.Protocol.GetClientIDRequest
+import Network.Riak.Protocol.GetServerInfoRequest
+import Network.Riak.Protocol.ListBucketsRequest
+import qualified Network.Riak.Protocol.ListKeysRequest as Keys
+import qualified Network.Riak.Protocol.PutRequest as Put
+import Network.Riak.Protocol.Content
+import qualified Network.Riak.Protocol.GetRequest as Get
+import qualified Network.Riak.Protocol.GetBucketRequest as GetBucket
+import qualified Network.Riak.Protocol.SetBucketRequest as SetBucket
+import Network.Riak.Protocol.MapReduceRequest
+import Network.Riak.Protocol.BucketProps
+import Network.Riak.Types.Internal hiding (MessageTag(..))
+
+ping :: PingRequest
+ping = PingRequest
+{-# INLINE ping #-}
+
+getClientID :: GetClientIDRequest
+getClientID = GetClientIDRequest
+{-# INLINE getClientID #-}
+
+getServerInfo :: GetServerInfoRequest
+getServerInfo = GetServerInfoRequest
+{-# INLINE getServerInfo #-}
+
+get :: Bucket -> Key -> Maybe R -> Get.GetRequest
+get bucket key r = Get.GetRequest { Get.bucket = bucket
+                                  , Get.key = key
+                                  , Get.r = fromQuorum <$> r }
+{-# INLINE get #-}
+
+put :: Bucket -> Key -> Maybe VClock -> Content -> Maybe W -> Maybe DW
+    -> Bool -> Put.PutRequest
+put bucket key mvclock cont mw mdw returnBody =
+    Put.PutRequest bucket key (fromVClock <$> mvclock) cont
+                   (fromQuorum <$> mw) (fromQuorum <$> mdw) (Just returnBody)
+{-# INLINE put #-}
+
+delete :: Bucket -> Key -> Maybe RW -> Del.DeleteRequest
+delete bucket key rw = Del.DeleteRequest bucket key (fromQuorum <$> rw)
+{-# INLINE delete #-}
+
+listBuckets :: ListBucketsRequest
+listBuckets = ListBucketsRequest
+{-# INLINE listBuckets #-}
+
+listKeys :: Bucket -> Keys.ListKeysRequest
+listKeys = Keys.ListKeysRequest
+{-# INLINE listKeys #-}
+
+getBucket :: Bucket -> GetBucket.GetBucketRequest
+getBucket bucket = GetBucket.GetBucketRequest bucket
+{-# INLINE getBucket #-}
+
+setBucket :: Bucket -> BucketProps -> SetBucket.SetBucketRequest
+setBucket bucket props = SetBucket.SetBucketRequest bucket props
+{-# INLINE setBucket #-}
+
+mapReduce :: Job -> MapReduceRequest
+mapReduce (JSON bs)   = MapReduceRequest bs "application/json"
+mapReduce (Erlang bs) = MapReduceRequest bs "application/x-erlang-binary"

File src/Network/Riak/Simple.hs

 import Network.Riak.Connection.Internal
 import Network.Riak.Protocol.BucketProps
 import Network.Riak.Protocol.Content
-import Network.Riak.Protocol.DeleteRequest
-import Network.Riak.Protocol.GetBucketRequest
 import Network.Riak.Protocol.GetBucketResponse as GetBucketResponse
-import Network.Riak.Protocol.GetClientIDRequest
 import Network.Riak.Protocol.GetClientIDResponse as GetClientIDResponse
-import Network.Riak.Protocol.GetRequest as GetRequest
 import Network.Riak.Protocol.GetResponse
-import Network.Riak.Protocol.GetServerInfoRequest
-import Network.Riak.Protocol.ListBucketsRequest
 import Network.Riak.Protocol.ListBucketsResponse
-import Network.Riak.Protocol.ListKeysRequest
 import Network.Riak.Protocol.ListKeysResponse
 import Network.Riak.Protocol.MapReduce
-import Network.Riak.Protocol.MapReduceRequest
-import Network.Riak.Protocol.PingRequest
-import Network.Riak.Protocol.PutRequest
 import Network.Riak.Protocol.PutResponse
 import Network.Riak.Protocol.ServerInfo
-import Network.Riak.Protocol.SetBucketRequest
 import Network.Riak.Types.Internal hiding (MessageTag(..))
 import qualified Network.Riak.Types.Internal as T
+import qualified Network.Riak.Request as Req
 
 ping :: Connection -> IO ()
 ping conn@Connection{..} = do
-  sendRequest conn PingRequest
+  sendRequest conn Req.ping
   recvResponse_ conn T.PingResponse
 
 getClientID :: Connection -> IO ClientID
 getClientID conn = do
-  sendRequest conn GetClientIDRequest
+  sendRequest conn Req.getClientID
   GetClientIDResponse.client_id <$> recvResponse conn
 
 getServerInfo :: Connection -> IO ServerInfo
 getServerInfo conn = do
-  sendRequest conn GetServerInfoRequest
+  sendRequest conn Req.getServerInfo
   recvResponse conn
 
 get :: Connection -> T.Bucket -> T.Key -> Maybe R
     -> IO (Maybe (Seq Content, Maybe VClock))
 get conn@Connection{..} bucket key r = do
-  sendRequest conn GetRequest { bucket = bucket
-                              , key = key
-                              , r = fromQuorum <$> r }
+  sendRequest conn $ Req.get bucket key r
   maybe Nothing cast <$> recvMaybeResponse conn
  where cast GetResponse{..} = Just (content, VClock <$> vclock)
 
     -> Content -> Maybe W -> Maybe DW -> Bool
     -> IO (Seq Content, Maybe VClock)
 put conn@Connection{..} bucket key mvclock cont mw mdw returnBody = do
-  sendRequest conn $ PutRequest bucket key (fromVClock <$> mvclock) cont
-                     (fromQuorum <$> mw) (fromQuorum <$> mdw) (Just returnBody)
+  sendRequest conn $ Req.put bucket key mvclock cont mw mdw returnBody
   PutResponse{..} <- recvResponse conn
   return (content, VClock <$> vclock)
 
 delete :: Connection -> T.Bucket -> T.Key -> Maybe RW -> IO ()
 delete conn bucket key rw = do
-  sendRequest conn $ DeleteRequest bucket key (fromQuorum <$> rw)
+  sendRequest conn $ Req.delete bucket key rw
   recvResponse_ conn T.DeleteResponse
 
 listBuckets :: Connection -> IO (Seq T.Bucket)
 listBuckets conn = do
-  sendRequest conn $ ListBucketsRequest
+  sendRequest conn Req.listBuckets
   buckets <$> recvResponse conn
 
 listKeys :: Connection -> T.Bucket -> IO (Seq T.Key, Maybe Bool)
 listKeys conn bucket = do
-  sendRequest conn $ ListKeysRequest bucket
+  sendRequest conn $ Req.listKeys bucket
   ListKeysResponse{..} <- recvResponse conn
   return (keys, done)
 
 getBucket :: Connection -> T.Bucket -> IO BucketProps
 getBucket conn bucket = do
-  sendRequest conn $ GetBucketRequest bucket
+  sendRequest conn $ Req.getBucket bucket
   GetBucketResponse.props <$> recvResponse conn
 
 setBucket :: Connection -> T.Bucket -> BucketProps -> IO ()
 setBucket conn bucket props = do
-  sendRequest conn $ SetBucketRequest bucket props
+  sendRequest conn $ Req.setBucket bucket props
   recvResponse_ conn T.SetBucketResponse
 
 mapReduce :: Connection -> Job -> IO MapReduce
 mapReduce conn job = do
-  sendRequest conn $ case job of
-                       JSON bs -> MapReduceRequest bs "application/json"
-                       Erlang bs -> MapReduceRequest bs "application/x-erlang-binary"
+  sendRequest conn $ Req.mapReduce job
   recvResponse conn

File src/Network/Riak/Tag.hs

 module Network.Riak.Tag
     (
-      Tagged(..)
-    , MessageTag
-    , putTag
+      putTag
     , getTag
     ) where
 
     messageTag _ = Types.PingRequest
     {-# INLINE messageTag #-}
 
-instance Request PingRequest
+instance Request PingRequest where
+    expectedResponse _ = Types.PingResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged PingResponse where
     messageTag _ = Types.PingResponse
     messageTag _ = Types.GetClientIDRequest
     {-# INLINE messageTag #-}
 
-instance Request GetClientIDRequest
+instance Request GetClientIDRequest where
+    expectedResponse _ = Types.GetClientIDResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged GetClientIDResponse where
     messageTag _ = Types.GetClientIDResponse
     messageTag _ = Types.SetClientIDRequest
     {-# INLINE messageTag #-}
 
-instance Request SetClientIDRequest
+instance Request SetClientIDRequest where
+    expectedResponse _ = Types.SetClientIDResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged SetClientIDResponse where
     messageTag _ = Types.SetClientIDResponse
     {-# INLINE messageTag #-}
 
-instance Request SetClientIDResponse
+instance Request SetClientIDResponse where
+    expectedResponse _ = Types.SetClientIDResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged GetServerInfoRequest where
     messageTag _ = Types.GetServerInfoRequest
     {-# INLINE messageTag #-}
 
-instance Request GetServerInfoRequest
+instance Request GetServerInfoRequest where
+    expectedResponse _ = Types.GetServerInfoResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged ServerInfo where
     messageTag _ = Types.GetServerInfoResponse
     messageTag _ = Types.GetRequest
     {-# INLINE messageTag #-}
 
-instance Request GetRequest
+instance Request GetRequest where
+    expectedResponse _ = Types.GetResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged GetResponse where
     messageTag _ = Types.GetResponse
     messageTag _ = Types.PutRequest
     {-# INLINE messageTag #-}
 
-instance Request PutRequest
+instance Request PutRequest where
+    expectedResponse _ = Types.PutResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged PutResponse where
     messageTag _ = Types.PutResponse
     messageTag _ = Types.DeleteRequest
     {-# INLINE messageTag #-}
 
-instance Request DeleteRequest
+instance Request DeleteRequest where
+    expectedResponse _ = Types.DeleteResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged ListBucketsRequest where
     messageTag _ = Types.ListBucketsRequest
     {-# INLINE messageTag #-}
 
-instance Request ListBucketsRequest
+instance Request ListBucketsRequest where
+    expectedResponse _ = Types.ListBucketsResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged ListBucketsResponse where
     messageTag _ = Types.ListBucketsResponse
     messageTag _ = Types.ListKeysRequest
     {-# INLINE messageTag #-}
 
-instance Request ListKeysRequest
+instance Request ListKeysRequest where
+    expectedResponse _ = Types.ListKeysResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged ListKeysResponse where
     messageTag _ = Types.ListKeysResponse
     messageTag _ = Types.GetBucketRequest
     {-# INLINE messageTag #-}
 
-instance Request GetBucketRequest
+instance Request GetBucketRequest where
+    expectedResponse _ = Types.GetBucketResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged GetBucketResponse where
     messageTag _ = Types.GetBucketResponse
     messageTag _ = Types.SetBucketRequest
     {-# INLINE messageTag #-}
 
-instance Request SetBucketRequest
+instance Request SetBucketRequest where
+    expectedResponse _ = Types.SetBucketResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged MapReduceRequest where
     messageTag _ = Types.MapReduceRequest
     {-# INLINE messageTag #-}
 
-instance Request MapReduceRequest
+instance Request MapReduceRequest where
+    expectedResponse _ = Types.MapReduceResponse
+    {-# INLINE expectedResponse #-}
 
 instance Tagged MapReduce where
     messageTag _ = Types.MapReduceResponse

File src/Network/Riak/Types.hs

     -- * Data types
     , Bucket
     , Key
+    , Tag
     , VClock(..)
     , Job(..)
     -- * Quorum management

File src/Network/Riak/Types/Internal.hs

     -- * Data types
     , Bucket
     , Key
+    , Tag
     , VClock(..)
     , Job(..)
     -- * Quorum management
     , fromQuorum
     , toQuorum
     -- * Message identification
-    , Request
+    , Request(..)
     , Response
     , MessageTag(..)
     , Tagged(..)
 
 type Key = ByteString
 
+type Tag = ByteString
+
 data Job = JSON ByteString
          | Erlang ByteString
            deriving (Eq, Show)
     messageTag m = m
     {-# INLINE messageTag #-}
 
-class (Tagged msg, ReflectDescriptor msg, Wire msg) => Request msg
+class (Tagged msg, ReflectDescriptor msg, Wire msg) => Request msg where
+    expectedResponse :: msg -> MessageTag
 
 class (Tagged msg, ReflectDescriptor msg, Wire msg) => Response msg