Commits

Bryan O'Sullivan committed 4d611cf

Many changes!

* listKeys -> foldKeys
* bidirectional fundeps between request and response types
* somewhat sane exception usage
* JSON content
* moar pipeline!
* drop some useless response types
* response type coercion functions

  • Participants
  • Parent commits e1cd5c3

Comments (0)

Files changed (14)

     Network.Riak.Content
     Network.Riak.Pipeline
     Network.Riak.Request
+    Network.Riak.Response
     Network.Riak.Simple
     Network.Riak.Types
     Network.Riak.Protocol.ServerInfo
     Network.Riak.Protocol.SetBucketRequest
     Network.Riak.Protocol.SetClientIDRequest
     Network.Riak.Protocol.PingRequest
-    Network.Riak.Protocol.PingResponse
     Network.Riak.Protocol.GetClientIDRequest
-    Network.Riak.Protocol.SetClientIDResponse
     Network.Riak.Protocol.GetServerInfoRequest
     Network.Riak.Protocol.ListBucketsRequest
 
     Network.Riak.Protocol
   
   build-depends:       
+    aeson,
     base == 4.*,
     binary,
     bytestring,

src/Network/Riak.hs

     , delete
     -- * Metadata
     , listBuckets
-    , listKeys
+    , foldKeys
     , getBucket
     , setBucket
     -- * Map/reduce

src/Network/Riak/Connection.hs

     , makeClientID
     -- * Requests and responses
     -- ** Sending and receiving
-    , sendRequest
-    , recvResponse
-    , recvMaybeResponse
-    , recvResponse_
+    , exchange
+    , exchangeMaybe
+    , exchange_
     -- ** Composing and parsing
     , putRequest
     , getResponse

src/Network/Riak/Connection/Internal.hs

     , recvResponse
     , recvMaybeResponse
     , recvResponse_
+    , exchange
+    , exchangeMaybe
+    , exchange_
     -- ** Composing and parsing requests and responses
     , putRequest
     , getResponse
 import Data.Int (Int64)
 import Network.Riak.Protocol.SetClientIDRequest
 import Network.Riak.Tag (getTag, putTag)
-import Network.Riak.Types.Internal
-    (Client(..), ClientID, Connection(..), Request, Response, Tagged(..))
+import Network.Riak.Types.Internal hiding (MessageTag(..))
 import Network.Socket as Socket
 import qualified Network.Socket.ByteString as B
 import qualified Network.Socket.ByteString.Lazy as L
         else go (bs:acc) (n' - fromIntegral len)
 
 recvExactly :: Connection -> Int64 -> IO L.ByteString
-recvExactly = recvWith (const (fail "short read from network"))
+recvExactly = recvWith $ \_ ->
+              moduleError "recvExactly" "short read from network"
 
 recvGet :: Connection -> Get a -> IO a
 recvGet Connection{..} get = do
         if L.null bs
           then shutdown connSock ShutdownReceive >> return Nothing
           else return (Just bs)
-      step (Failed _ err)    = fail err
+      step (Failed _ err)    = moduleError "recvGet" err
       step (Finished bs _ r) = writeIORef connBuffer bs >> return r
       step (Partial k)       = (step . k) =<< refill
   mbs <- do
       else return (Just buf)
   case mbs of
     Just bs -> step $ runGet get bs
-    Nothing -> fail "socket closed"
+    Nothing -> moduleError "recvGet" "socket closed"
   
 recvGetN :: Connection -> Int64 -> Get a -> IO a
 recvGetN conn n get = do
     Finished bs' _ r -> finish bs' r
     Partial k    -> case k Nothing of
                       Finished bs' _ r -> finish bs' r
-                      Failed _ err -> fail err
-                      Partial _    -> fail "parser wants more input!?"
-    Failed _ err -> fail err
+                      Failed _ err -> moduleError "recvGetN" err
+                      Partial _    -> moduleError "recvGetN"
+                                      "parser wants more input!?"
+    Failed _ err -> moduleError "recvGetN" err
 
 putRequest :: (Request req) => req -> Put
 putRequest req = do
     else return . Left $ "received unexpected response: expected " ++
                          show expected ++ ", received " ++ show tag
 
+exchange :: Exchange req resp => Connection -> req -> IO resp
+exchange conn@Connection{..} req = do
+  sendRequest conn req
+  recvResponse conn
+
+exchangeMaybe :: Exchange req resp => Connection -> req -> IO (Maybe resp)
+exchangeMaybe conn@Connection{..} req = do
+  sendRequest conn req
+  recvMaybeResponse conn
+
+exchange_ :: Request req => Connection -> req -> IO ()
+exchange_ conn req = do
+  sendRequest conn req
+  recvResponse_ conn (expectedResponse req)
+
 sendRequest :: (Request req) => Connection -> req -> IO ()
 sendRequest Connection{..} = L.sendAll connSock . runPut . putRequest
 
     len <- fromIntegral `fmap` recvGet conn getWord32be
     r <- recvGetN conn len (getResponse (messageTag dummy))
     case r of
-      Left err -> fail err
+      Left err  -> moduleError "recvResponse" err
       Right ret -> return ret
 
 recvResponse_ :: Connection -> T.MessageTag -> IO ()
   len <- fromIntegral `fmap` recvGet conn getWord32be
   tag <- recvGet conn getTag
   when (tag /= expected) .
-    fail $ "received unexpected response: expected " ++
-           show expected ++ ", received " ++ show tag
+    moduleError "recvResponse_" $ "received unexpected response: expected " ++
+                                  show expected ++ ", received " ++ show tag
   recvExactly conn (len-1) >> return ()
 
 recvMaybeResponse :: (Response a) => Connection -> IO (Maybe a)
       else do
         r <- recvGetN conn len (getResponse (messageTag dummy))
         case r of
-          Left err -> fail err
+          Left err  -> moduleError "recvMaybeResponse" err
           Right ret -> return (Just ret)
+
+moduleError :: String -> String -> a
+moduleError = riakError "Network.Riak.Connection.Internal"

src/Network/Riak/Content.hs

     , Link.Link(..)
     , empty
     , binary
+    , json
     , link
     ) where
 
 import Network.Riak.Protocol.Content (Content(..))
 import qualified Network.Riak.Protocol.Link as Link
 import Network.Riak.Types.Internal
+import Data.Aeson.Encode
+import Data.Aeson.Types
 
 link :: Bucket -> Key -> Tag -> Link.Link
 link bucket key tag = Link.Link (Just bucket) (Just key) (Just tag)
 binary bs = empty { value = bs
                   , content_type = Just "application/octet-stream"
                   }
+
+json :: ToJSON a => a -> Content
+json j = empty { value = encode j
+               , content_type = Just "application/json"
+               }

src/Network/Riak/Pipeline.hs

 module Network.Riak.Pipeline
     (
       pipeline
+    , pipelineMaybe
     , pipeline_
     ) where
 
 import Control.Concurrent.Chan
 import Control.Concurrent
 
-pipeline :: (Request req, Response resp) => Connection -> [req] -> IO [resp]
-pipeline conn@Connection{..} reqs = do
+pipe :: (Request req) => (Connection -> IO resp) -> Connection -> [req]
+     -> IO [resp]
+pipe recv conn@Connection{..} reqs = do
   ch <- newChan
   let numReqs = length reqs
   _ <- forkIO . replicateM_ numReqs $
-       writeChan ch =<< recvResponse conn
+       writeChan ch =<< recv conn
   L.sendAll connSock . runPut . mapM_ putRequest $ reqs
   replicateM numReqs $ readChan ch
 
+pipeline :: (Exchange req resp) => Connection -> [req] -> IO [resp]
+pipeline = pipe recvResponse
+
+pipelineMaybe :: (Exchange req resp) => Connection -> [req] -> IO [Maybe resp]
+pipelineMaybe = pipe recvMaybeResponse
+
 pipeline_ :: (Request req) => Connection -> [req] -> IO ()
 pipeline_ conn@Connection{..} reqs = do
   done <- newEmptyMVar

src/Network/Riak/Protocol/PingResponse.hs

-{-# LANGUAGE DeriveDataTypeable #-}
-{-# LANGUAGE FlexibleInstances #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# OPTIONS_GHC -fno-warn-unused-imports #-}
-module Network.Riak.Protocol.PingResponse (PingResponse(..)) where
-import Prelude ((+))
-import qualified Prelude as P'
-import qualified Text.ProtocolBuffers.Header as P'
- 
-data PingResponse = PingResponse{}
-                  deriving (P'.Show, P'.Eq, P'.Ord, P'.Typeable)
- 
-instance P'.Mergeable PingResponse where
-  mergeEmpty = PingResponse
-  mergeAppend (PingResponse) (PingResponse) = PingResponse
- 
-instance P'.Default PingResponse where
-  defaultValue = PingResponse
- 
-instance P'.Wire PingResponse where
-  wireSize ft' self'@(PingResponse)
-   = case ft' of
-       10 -> calc'Size
-       11 -> P'.prependMessageSize calc'Size
-       _ -> P'.wireSizeErr ft' self'
-    where
-        calc'Size = 0
-  wirePut ft' self'@(PingResponse)
-   = case ft' of
-       10 -> put'Fields
-       11 -> do
-               P'.putSize (P'.wireSize 10 self')
-               put'Fields
-       _ -> P'.wirePutErr ft' self'
-    where
-        put'Fields
-         = do
-             P'.return ()
-  wireGet ft'
-   = case ft' of
-       10 -> P'.getBareMessageWith update'Self
-       11 -> P'.getMessageWith update'Self
-       _ -> P'.wireGetErr ft'
-    where
-        update'Self wire'Tag old'Self
-         = case wire'Tag of
-             _ -> let (field'Number, wire'Type) = P'.splitWireTag wire'Tag in P'.unknown field'Number wire'Type old'Self
- 
-instance P'.MessageAPI msg' (msg' -> PingResponse) PingResponse where
-  getVal m' f' = f' m'
- 
-instance P'.GPB PingResponse
- 
-instance P'.ReflectDescriptor PingResponse where
-  getMessageInfo _ = P'.GetMessageInfo (P'.fromDistinctAscList []) (P'.fromDistinctAscList [])
-  reflectDescriptorInfo _
-   = P'.read
-      "DescriptorInfo {descName = ProtoName {protobufName = FIName \".Protocol.PingResponse\", haskellPrefix = [MName \"Network\",MName \"Riak\"], parentModule = [MName \"Protocol\"], baseName = MName \"PingResponse\"}, descFilePath = [\"Network\",\"Riak\",\"Protocol\",\"PingResponse.hs\"], isGroup = False, fields = fromList [], keys = fromList [], extRanges = [], knownKeys = fromList [], storeUnknown = False}"

src/Network/Riak/Protocol/SetClientIDResponse.hs

-{-# LANGUAGE DeriveDataTypeable #-}
-{-# LANGUAGE FlexibleInstances #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# OPTIONS_GHC -fno-warn-unused-imports #-}
-module Network.Riak.Protocol.SetClientIDResponse (SetClientIDResponse(..)) where
-import Prelude ((+))
-import qualified Prelude as P'
-import qualified Text.ProtocolBuffers.Header as P'
- 
-data SetClientIDResponse = SetClientIDResponse{}
-                         deriving (P'.Show, P'.Eq, P'.Ord, P'.Typeable)
- 
-instance P'.Mergeable SetClientIDResponse where
-  mergeEmpty = SetClientIDResponse
-  mergeAppend (SetClientIDResponse) (SetClientIDResponse) = SetClientIDResponse
- 
-instance P'.Default SetClientIDResponse where
-  defaultValue = SetClientIDResponse
- 
-instance P'.Wire SetClientIDResponse where
-  wireSize ft' self'@(SetClientIDResponse)
-   = case ft' of
-       10 -> calc'Size
-       11 -> P'.prependMessageSize calc'Size
-       _ -> P'.wireSizeErr ft' self'
-    where
-        calc'Size = 0
-  wirePut ft' self'@(SetClientIDResponse)
-   = case ft' of
-       10 -> put'Fields
-       11 -> do
-               P'.putSize (P'.wireSize 10 self')
-               put'Fields
-       _ -> P'.wirePutErr ft' self'
-    where
-        put'Fields
-         = do
-             P'.return ()
-  wireGet ft'
-   = case ft' of
-       10 -> P'.getBareMessageWith update'Self
-       11 -> P'.getMessageWith update'Self
-       _ -> P'.wireGetErr ft'
-    where
-        update'Self wire'Tag old'Self
-         = case wire'Tag of
-             _ -> let (field'Number, wire'Type) = P'.splitWireTag wire'Tag in P'.unknown field'Number wire'Type old'Self
- 
-instance P'.MessageAPI msg' (msg' -> SetClientIDResponse) SetClientIDResponse where
-  getVal m' f' = f' m'
- 
-instance P'.GPB SetClientIDResponse
- 
-instance P'.ReflectDescriptor SetClientIDResponse where
-  getMessageInfo _ = P'.GetMessageInfo (P'.fromDistinctAscList []) (P'.fromDistinctAscList [])
-  reflectDescriptorInfo _
-   = P'.read
-      "DescriptorInfo {descName = ProtoName {protobufName = FIName \".Protocol.SetClientIDResponse\", haskellPrefix = [MName \"Network\",MName \"Riak\"], parentModule = [MName \"Protocol\"], baseName = MName \"SetClientIDResponse\"}, descFilePath = [\"Network\",\"Riak\",\"Protocol\",\"SetClientIDResponse.hs\"], isGroup = False, fields = fromList [], keys = fromList [], extRanges = [], knownKeys = fromList [], storeUnknown = False}"

src/Network/Riak/Response.hs

+{-# LANGUAGE RecordWildCards #-}
+
+module Network.Riak.Response
+    (
+    -- * Connection management
+      getClientID
+    -- * Data management
+    , get
+    , put
+    -- * Metadata
+    , listBuckets
+    , getBucket
+    ) where
+
+import Control.Applicative ((<$>))
+import Network.Riak.Types.Internal hiding (MessageTag(..))
+import qualified Data.Sequence as Seq
+import Network.Riak.Protocol.Content
+import Network.Riak.Protocol.GetResponse
+import Network.Riak.Protocol.PutResponse
+import Network.Riak.Protocol.GetClientIDResponse
+import Network.Riak.Protocol.ListBucketsResponse
+import Network.Riak.Protocol.GetBucketResponse
+import Network.Riak.Protocol.BucketProps
+
+getClientID :: GetClientIDResponse -> ClientID
+getClientID = client_id
+{-# INLINE getClientID #-}
+
+get :: Maybe GetResponse -> (Seq.Seq Content, Maybe VClock)
+get (Just GetResponse{..}) = (content, VClock <$> vclock)
+get _                      = (Seq.empty, Nothing)
+{-# INLINE get #-}
+
+put :: PutResponse -> (Seq.Seq Content, Maybe VClock)
+put PutResponse{..} = (content, VClock <$> vclock)
+{-# INLINE put #-}
+
+listBuckets :: ListBucketsResponse -> (Seq.Seq Bucket)
+listBuckets = buckets
+{-# INLINE listBuckets #-}
+
+getBucket :: GetBucketResponse -> BucketProps
+getBucket = props
+{-# INLINE getBucket #-}

src/Network/Riak/Simple.hs

     , delete
     -- * Metadata
     , listBuckets
-    , listKeys
+    , foldKeys
     , getBucket
     , setBucket
     -- * Map/reduce
     ) where
 
 import Control.Applicative ((<$>))
-import Data.Sequence (Seq)
+import qualified Data.Foldable as F
+import qualified Data.Sequence as Seq
 import Network.Riak.Connection.Internal
 import Network.Riak.Protocol.BucketProps
 import Network.Riak.Protocol.Content
-import Network.Riak.Protocol.GetBucketResponse as GetBucketResponse
-import Network.Riak.Protocol.GetClientIDResponse as GetClientIDResponse
-import Network.Riak.Protocol.GetResponse
-import Network.Riak.Protocol.ListBucketsResponse
 import Network.Riak.Protocol.ListKeysResponse
 import Network.Riak.Protocol.MapReduce
-import Network.Riak.Protocol.PutResponse
 import Network.Riak.Protocol.ServerInfo
 import Network.Riak.Types.Internal hiding (MessageTag(..))
 import qualified Network.Riak.Types.Internal as T
 import qualified Network.Riak.Request as Req
+import qualified Network.Riak.Response as Resp
 
 ping :: Connection -> IO ()
-ping conn@Connection{..} = do
-  sendRequest conn Req.ping
-  recvResponse_ conn T.PingResponse
+ping conn = exchange_ conn Req.ping
 
 getClientID :: Connection -> IO ClientID
-getClientID conn = do
-  sendRequest conn Req.getClientID
-  GetClientIDResponse.client_id <$> recvResponse conn
+getClientID conn = Resp.getClientID <$> exchange conn Req.getClientID
 
 getServerInfo :: Connection -> IO ServerInfo
-getServerInfo conn = do
-  sendRequest conn Req.getServerInfo
-  recvResponse conn
+getServerInfo conn = exchange conn Req.getServerInfo
 
 get :: Connection -> T.Bucket -> T.Key -> Maybe R
-    -> IO (Maybe (Seq Content, Maybe VClock))
-get conn@Connection{..} bucket key r = do
-  sendRequest conn $ Req.get bucket key r
-  maybe Nothing cast <$> recvMaybeResponse conn
- where cast GetResponse{..} = Just (content, VClock <$> vclock)
+    -> IO (Seq.Seq Content, Maybe VClock)
+get conn bucket key r =
+  Resp.get <$> exchangeMaybe conn (Req.get bucket key r)
 
 put :: Connection -> T.Bucket -> T.Key -> Maybe T.VClock
     -> Content -> Maybe W -> Maybe DW -> Bool
-    -> IO (Seq Content, Maybe VClock)
-put conn@Connection{..} bucket key mvclock cont mw mdw returnBody = do
-  sendRequest conn $ Req.put bucket key mvclock cont mw mdw returnBody
-  PutResponse{..} <- recvResponse conn
-  return (content, VClock <$> vclock)
+    -> IO (Seq.Seq Content, Maybe VClock)
+put conn bucket key mvclock cont mw mdw returnBody =
+  Resp.put <$> exchange conn (Req.put bucket key mvclock cont mw mdw returnBody)
 
 delete :: Connection -> T.Bucket -> T.Key -> Maybe RW -> IO ()
-delete conn bucket key rw = do
-  sendRequest conn $ Req.delete bucket key rw
-  recvResponse_ conn T.DeleteResponse
+delete conn bucket key rw = exchange_ conn $ Req.delete bucket key rw
 
-listBuckets :: Connection -> IO (Seq T.Bucket)
-listBuckets conn = do
-  sendRequest conn Req.listBuckets
-  buckets <$> recvResponse conn
+listBuckets :: Connection -> IO (Seq.Seq T.Bucket)
+listBuckets conn = Resp.listBuckets <$> exchange conn Req.listBuckets
 
-listKeys :: Connection -> T.Bucket -> IO (Seq T.Key, Maybe Bool)
-listKeys conn bucket = do
+foldKeys :: Connection -> T.Bucket -> (a -> Key -> IO a) -> a -> IO a
+foldKeys conn bucket f z0 = do
   sendRequest conn $ Req.listKeys bucket
-  ListKeysResponse{..} <- recvResponse conn
-  return (keys, done)
+  let loop z = do
+        ListKeysResponse{..} <- recvResponse conn
+        z1 <- F.foldlM f z keys
+        if maybe False id done
+          then return z1
+          else loop z1
+  loop z0
 
 getBucket :: Connection -> T.Bucket -> IO BucketProps
-getBucket conn bucket = do
-  sendRequest conn $ Req.getBucket bucket
-  GetBucketResponse.props <$> recvResponse conn
+getBucket conn bucket = Resp.getBucket <$> exchange conn (Req.getBucket bucket)
 
 setBucket :: Connection -> T.Bucket -> BucketProps -> IO ()
-setBucket conn bucket props = do
-  sendRequest conn $ Req.setBucket bucket props
-  recvResponse_ conn T.SetBucketResponse
+setBucket conn bucket props = exchange_ conn $ Req.setBucket bucket props
 
 mapReduce :: Connection -> Job -> IO MapReduce
-mapReduce conn job = do
-  sendRequest conn $ Req.mapReduce job
-  recvResponse conn
+mapReduce conn = exchange conn . Req.mapReduce

src/Network/Riak/Tag.hs

+{-# LANGUAGE MultiParamTypeClasses #-}
+
 module Network.Riak.Tag
     (
       putTag
 import Network.Riak.Protocol.MapReduce
 import Network.Riak.Protocol.MapReduceRequest
 import Network.Riak.Protocol.PingRequest
-import Network.Riak.Protocol.PingResponse
 import Network.Riak.Protocol.PutRequest
 import Network.Riak.Protocol.PutResponse
 import Network.Riak.Protocol.ServerInfo
 import Network.Riak.Protocol.SetBucketRequest
 import Network.Riak.Protocol.SetClientIDRequest
-import Network.Riak.Protocol.SetClientIDResponse
 import Network.Riak.Types.Internal as Types
 import Text.ProtocolBuffers.Get (Get, getWord8)
 
     expectedResponse _ = Types.PingResponse
     {-# INLINE expectedResponse #-}
 
-instance Tagged PingResponse where
-    messageTag _ = Types.PingResponse
-    {-# INLINE messageTag #-}
-
-instance Response PingResponse
-
 instance Tagged GetClientIDRequest where
     messageTag _ = Types.GetClientIDRequest
     {-# INLINE messageTag #-}
 
 instance Response GetClientIDResponse
 
+instance Exchange GetClientIDRequest GetClientIDResponse
+
 instance Tagged SetClientIDRequest where
     messageTag _ = Types.SetClientIDRequest
     {-# INLINE messageTag #-}
     expectedResponse _ = Types.SetClientIDResponse
     {-# INLINE expectedResponse #-}
 
-instance Tagged SetClientIDResponse where
-    messageTag _ = Types.SetClientIDResponse
-    {-# INLINE messageTag #-}
-
-instance Request SetClientIDResponse where
-    expectedResponse _ = Types.SetClientIDResponse
-    {-# INLINE expectedResponse #-}
-
 instance Tagged GetServerInfoRequest where
     messageTag _ = Types.GetServerInfoRequest
     {-# INLINE messageTag #-}
 
 instance Response ServerInfo
 
+instance Exchange GetServerInfoRequest ServerInfo
+
 instance Tagged GetRequest where
     messageTag _ = Types.GetRequest
     {-# INLINE messageTag #-}
 
 instance Response GetResponse
 
+instance Exchange GetRequest GetResponse
+
 instance Tagged PutRequest where
     messageTag _ = Types.PutRequest
     {-# INLINE messageTag #-}
 
 instance Response PutResponse
 
+instance Exchange PutRequest PutResponse
+
 instance Tagged DeleteRequest where
     messageTag _ = Types.DeleteRequest
     {-# INLINE messageTag #-}
 
 instance Response ListBucketsResponse
 
+instance Exchange ListBucketsRequest ListBucketsResponse
+
 instance Tagged ListKeysRequest where
     messageTag _ = Types.ListKeysRequest
     {-# INLINE messageTag #-}
 
 instance Response GetBucketResponse
 
+instance Exchange GetBucketRequest GetBucketResponse
+
 instance Tagged SetBucketRequest where
     messageTag _ = Types.SetBucketRequest
     {-# INLINE messageTag #-}
 
 instance Response MapReduce
 
+instance Exchange MapReduceRequest MapReduce
+
 putTag :: MessageTag -> Put
 putTag = putWord8 . fromIntegral . fromEnum
 {-# INLINE putTag #-}
 getTag = do
   n <- getWord8
   if n > 24
-    then fail $ "invalid riak message code: " ++ show n
+    then moduleError "getTag" $ "invalid riak message code: " ++ show n
     else return .  toEnum . fromIntegral $ n
 {-# INLINE getTag #-}
+
+moduleError :: String -> String -> a
+moduleError = riakError "Network.Riak.Tag"

src/Network/Riak/Types.hs

     , Client(..)
     -- * Connection management
     , Connection(connClient)
+    -- * Errors
+    , RiakException(excModule, excFunction, excMessage)
     -- * Data types
     , Bucket
     , Key

src/Network/Riak/Types/Internal.hs

+{-# LANGUAGE DeriveDataTypeable, FunctionalDependencies, MultiParamTypeClasses,
+    RecordWildCards #-}
+
 module Network.Riak.Types.Internal
     (
     -- * Client management
     , Client(..)
     -- * Connection management
     , Connection(..)
+    -- * Errors
+    , RiakException(..)
+    , riakError
     -- * Data types
     , Bucket
     , Key
     -- * Message identification
     , Request(..)
     , Response
+    , Exchange
     , MessageTag(..)
     , Tagged(..)
     ) where
 
+import Control.Exception
+import Data.Typeable (Typeable)
 import Data.ByteString.Lazy (ByteString)
 import Data.Digest.Pure.MD5 (md5)
 import Data.IORef (IORef)
     , connBuffer :: IORef ByteString
     } deriving (Eq)
 
+data RiakException = RiakException {
+      excModule :: String
+    , excFunction :: String
+    , excMessage :: String
+    } deriving (Typeable)
+
+showRiakException :: RiakException -> String
+showRiakException RiakException{..} =
+    "Riak error (" ++ excModule ++ "." ++ excFunction ++ "): " ++ excMessage
+
+instance Show RiakException where
+    show = showRiakException
+
+instance Exception RiakException 
+
+riakError :: String -> String -> String -> a
+riakError modu func msg = throw (RiakException modu func msg)
+
 instance Show Connection where
     show conn = show "Connection " ++ host c ++ ":" ++ port c
         where c = connClient conn
 
 class (Tagged msg, ReflectDescriptor msg, Wire msg) => Response msg
 
+class (Request req, Response resp) => Exchange req resp
+    | req -> resp, resp -> req
+
 instance (Tagged a, Tagged b) => Tagged (Either a b) where
     messageTag (Left l)  = messageTag l
     messageTag (Right r) = messageTag r

src/riakextra.proto

 message RpbPingReq { }
-message RpbPingResp { }
 message RpbGetClientIdReq { }
-message RpbSetClientIdResp { }
 message RpbGetServerInfoReq { }
 message RpbListBucketsReq { }