Commits

Bryan O'Sullivan committed b355c20

Simplify the module hierarchy (goodbye .Pipeline).

Comments (0)

Files changed (4)

     Network.Riak
     Network.Riak.Connection
     Network.Riak.Content
-    Network.Riak.Pipeline
     Network.Riak.Request
     Network.Riak.Response
     Network.Riak.Simple

src/Network/Riak/Connection.hs

     , exchange
     , exchangeMaybe
     , exchange_
-    -- ** Composing and parsing
-    , putRequest
-    , getResponse
+    -- ** Pipelining many requests
+    , pipeline
+    , pipelineMaybe
+    , pipeline_
     ) where
 
 import Network.Riak.Connection.Internal

src/Network/Riak/Connection/Internal.hs

-{-# LANGUAGE OverloadedStrings, RecordWildCards #-}
+{-# LANGUAGE OverloadedStrings, RecordWildCards, ScopedTypeVariables #-}
 
 module Network.Riak.Connection.Internal
     (
     , makeClientID
     -- * Requests and responses
     -- ** Sending and receiving requests and responses
+    , exchange
+    , exchangeMaybe
+    , exchange_
+    -- ** Pipelining many requests
+    , pipeline
+    , pipelineMaybe
+    , pipeline_
+    -- * Low-level protocol operations
+    -- ** Sending and receiving
     , sendRequest
     , recvResponse
     , recvMaybeResponse
     , recvResponse_
-    , exchange
-    , exchangeMaybe
-    , exchange_
     -- ** Composing and parsing requests and responses
     , putRequest
     , getResponse
     , recvGetN
     ) where
 
-import Control.Monad (unless, when)
+import Control.Concurrent
+import Control.Exception (IOException)
+import Control.Monad (forM_, replicateM, replicateM_, unless, when)
 import Data.Binary.Put (Put, putWord32be, runPut)
 import Data.IORef (modifyIORef, newIORef, readIORef, writeIORef)
 import Data.Int (Int64)
 import Network.Riak.Tag (getTag, putTag)
 import Network.Riak.Types.Internal hiding (MessageTag(..))
 import Network.Socket as Socket
-import qualified Network.Socket.ByteString as B
-import qualified Network.Socket.ByteString.Lazy as L
 import Numeric (showHex)
 import System.Random (randomIO)
 import Text.ProtocolBuffers (messageGetM, messagePutM, messageSize)
 import qualified Data.ByteString as B
 import qualified Data.ByteString.Lazy.Char8 as L
 import qualified Network.Riak.Types.Internal as T
+import qualified Network.Socket.ByteString as B
+import qualified Network.Socket.ByteString.Lazy as L
 
 defaultClient :: Client
 defaultClient = Client {
 connect :: Client -> IO Connection
 connect cli0 = do
   client@Client{..} <- addClientID cli0
-  let hints = defaultHints
-  (ai:_) <- getAddrInfo (Just hints) (Just host) (Just port)
-  sock <- socket (addrFamily ai) (addrSocketType ai) (addrProtocol ai)
-  Socket.connect sock (addrAddress ai)
-  buf <- newIORef L.empty
-  let conn = Connection sock client buf
-  setClientID conn clientID
-  return conn
+  let hints = defaultHints {
+                addrFlags = [AI_ADDRCONFIG]
+              , addrSocketType = Stream
+              }
+  ais <- getAddrInfo (Just hints) (Just host) (Just port)
+  let ai = case ais of
+             (a:_) -> a
+             _     -> moduleError "connect" $
+                      "could not look up server " ++ host ++ ":" ++ port
+  onIOException "connect" $ do
+    sock <- socket (addrFamily ai) (addrSocketType ai) (addrProtocol ai)
+    Socket.connect sock (addrAddress ai)
+    buf <- newIORef L.empty
+    let conn = Connection sock client buf
+    setClientID conn clientID
+    return conn
 
 disconnect :: Connection -> IO ()
-disconnect Connection{..} = do
+disconnect Connection{..} = onIOException "disconnect" $ do
   sClose connSock
   writeIORef connBuffer L.empty
 
                          show expected ++ ", received " ++ show tag
 
 exchange :: Exchange req resp => Connection -> req -> IO resp
-exchange conn@Connection{..} req = do
-  sendRequest conn req
-  recvResponse conn
+exchange conn@Connection{..} req =
+  onIOException ("exchange[" ++ show (messageTag req) ++ "]") $ do
+    sendRequest conn req
+    recvResponse conn
 
 exchangeMaybe :: Exchange req resp => Connection -> req -> IO (Maybe resp)
-exchangeMaybe conn@Connection{..} req = do
-  sendRequest conn req
-  recvMaybeResponse conn
+exchangeMaybe conn@Connection{..} req =
+  onIOException ("exchangeMaybe[" ++ show (messageTag req) ++ "]") $ do
+    sendRequest conn req
+    recvMaybeResponse conn
 
 exchange_ :: Request req => Connection -> req -> IO ()
-exchange_ conn req = do
-  sendRequest conn req
-  recvResponse_ conn (expectedResponse req)
+exchange_ conn req =
+  onIOException ("exchange_[" ++ show (messageTag req) ++ "]") $ do
+    sendRequest conn req
+    recvResponse_ conn (expectedResponse req)
 
 sendRequest :: (Request req) => Connection -> req -> IO ()
 sendRequest Connection{..} = L.sendAll connSock . runPut . putRequest
           Left err  -> moduleError "recvMaybeResponse" err
           Right ret -> return (Just ret)
 
+pipe :: (Request req) => (Connection -> IO resp) -> Connection -> [req]
+     -> IO [resp]
+pipe receive conn@Connection{..} reqs = do
+  ch <- newChan
+  let numReqs = length reqs
+  _ <- forkIO . replicateM_ numReqs $ writeChan ch =<< receive conn
+  onIOException ("pipe[" ++ show (messageTag (head reqs)) ++ "]") .
+    L.sendAll connSock . runPut . mapM_ putRequest $ reqs
+  replicateM numReqs $ readChan ch
+
+pipeline :: (Exchange req resp) => Connection -> [req] -> IO [resp]
+pipeline = pipe recvResponse
+
+pipelineMaybe :: (Exchange req resp) => Connection -> [req] -> IO [Maybe resp]
+pipelineMaybe = pipe recvMaybeResponse
+
+pipeline_ :: (Request req) => Connection -> [req] -> IO ()
+pipeline_ conn@Connection{..} reqs = do
+  done <- newEmptyMVar
+  _ <- forkIO $ do
+         forM_ reqs (recvResponse_ conn . expectedResponse)
+         putMVar done ()
+  L.sendAll connSock . runPut . mapM_ putRequest $ reqs
+  takeMVar done
+
+onIOException :: String -> IO a -> IO a
+onIOException func act =
+    act `catch` \(e::IOException) -> moduleError func (show e)
+
 moduleError :: String -> String -> a
 moduleError = riakError "Network.Riak.Connection.Internal"

src/Network/Riak/Pipeline.hs

-{-# LANGUAGE RecordWildCards #-}
-
-module Network.Riak.Pipeline
-    (
-      pipeline
-    , pipelineMaybe
-    , pipeline_
-    ) where
-
-import Control.Monad
-import Network.Riak.Types.Internal
-import Network.Riak.Connection.Internal
-import Data.Binary.Put (runPut)
-import qualified Network.Socket.ByteString.Lazy as L
-import Control.Concurrent.Chan
-import Control.Concurrent
-
-pipe :: (Request req) => (Connection -> IO resp) -> Connection -> [req]
-     -> IO [resp]
-pipe recv conn@Connection{..} reqs = do
-  ch <- newChan
-  let numReqs = length reqs
-  _ <- forkIO . replicateM_ numReqs $
-       writeChan ch =<< recv conn
-  L.sendAll connSock . runPut . mapM_ putRequest $ reqs
-  replicateM numReqs $ readChan ch
-
-pipeline :: (Exchange req resp) => Connection -> [req] -> IO [resp]
-pipeline = pipe recvResponse
-
-pipelineMaybe :: (Exchange req resp) => Connection -> [req] -> IO [Maybe resp]
-pipelineMaybe = pipe recvMaybeResponse
-
-pipeline_ :: (Request req) => Connection -> [req] -> IO ()
-pipeline_ conn@Connection{..} reqs = do
-  done <- newEmptyMVar
-  _ <- forkIO $ do
-         forM_ reqs (recvResponse_ conn . expectedResponse)
-         putMVar done ()
-  L.sendAll connSock . runPut . mapM_ putRequest $ reqs
-  takeMVar done