Commits

Bryan O'Sullivan committed 597c688

Move connection pooling code out to the pool package.

  • Participants
  • Parent commits 64ca02e

Comments (0)

Files changed (2)

     blaze-builder,
     bytestring,
     containers,
-    hashable >= 1.0.1.2,
     network >= 2.3,
+    pool >= 0.1.0.2,
     protocol-buffers >= 1.8.0,
     pureMD5,
     random,
     riak-protobuf >= 0.14.0.0,
-    stm,
     text >= 0.11.0.6,
-    time,
-    vector >= 0.7
+    time
 
   if flag(debug)
     cpp-options: -DASSERTS -DDEBUG

File src/Network/Riak/Connection/Pool.hs

 -- Portability: portable
 --
 -- A high-performance striped pooling abstraction for managing
--- connections to a Riak cluster.
---
--- \"Striped\" means that a single 'Pool' consists of several
--- sub-pools, each managed independently.  A stripe size of 1 is fine
--- for many applications, and probably what you should choose by
--- default.  Larger stripe sizes will lead to reduced contention in
--- high-performance multicore applications, at a trade-off of causing
--- the maximum number of simultaneous connections to grow.
+-- connections to a Riak cluster. This is a thin wrapper around
+-- 'Data.Pool'.
 module Network.Riak.Connection.Pool
     (
       Pool
     , withConnection
     ) where
 
-import Control.Applicative ((<$>))
-import Control.Concurrent (forkIO, killThread, myThreadId, threadDelay)
-import Control.Concurrent.STM
-import Control.Exception (SomeException, catch, onException)
-import Control.Monad (forM_, forever, join, liftM2, unless, when)
-import Data.Hashable (hash)
-import Data.List (partition)
-import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
+import Data.Time.Clock (NominalDiffTime)
 import Data.Typeable (Typeable)
-import Network.Riak.Connection.Internal (connect, disconnect, makeClientID)
-import Network.Riak.Debug (debug)
-import Network.Riak.Types (Client(clientID), Connection)
-import Prelude hiding (catch)
-import System.Mem.Weak (addFinalizer)
-import qualified Data.Vector as V
-
--- | A single connection pool entry.
-data Entry = Entry {
-      connection :: Connection
-    , lastUse :: UTCTime
-    -- ^ Time of last return.
-    }
-
--- | A single striped pool.
-data LocalPool = LocalPool {
-      connected :: TVar Int
-    -- ^ Count of open connections (both idle and in use).
-    , entries :: TVar [Entry]
-    -- ^ Idle entries.
-    }
+import Network.Riak (Client(clientID), Connection, connect, disconnect)
+import Network.Riak.Connection (makeClientID)
+import qualified Data.Pool as Pool
 
 -- | A pool of connections to a Riak server.
 --
       client :: Client
     -- ^ Client specification.  The client ID is ignored, and always
     -- regenerated automatically for each new connection.
-    , numStripes :: Int
-    -- ^ Stripe count.  The number of distinct sub-pools to maintain.
-    -- The smallest acceptable value is 1.
-    , idleTime :: NominalDiffTime
-    -- ^ Amount of time for which an unused connection is kept open.
-    -- The smallest acceptable value is 0.5 seconds.
-    --
-    -- The elapsed time before closing may be a little longer than
-    -- requested, as the reaper thread wakes at 2-second intervals.
-    , maxConnections :: Int
-    -- ^ Maximum number of connections to keep open per stripe.  The
-    -- smallest acceptable value is 1.
-    -- 
-    -- Requests for connections will block if this limit is reached on
-    -- a single stripe, even if other stripes have idle connections
-    -- available.
-    , localPools :: V.Vector LocalPool
-    -- ^ Per-capability connection pools.
+    , pool :: Pool.Pool Connection
     } deriving (Typeable)
 
 instance Show Pool where
-    show Pool{..} = "Pool { client = " ++ show client ++ ", " ++
-                    "numStripes = " ++ show numStripes ++ ", " ++
-                    "idleTime = " ++ show idleTime ++ ", " ++
-                    "maxConnections = " ++ show maxConnections ++ "}"
+    show p = "Pool { client = " ++ show (client p) ++ ", " ++
+                    "numStripes = " ++ show (numStripes p) ++ ", " ++
+                    "idleTime = " ++ show (idleTime p) ++ ", " ++
+                    "maxConnections = " ++ show (maxConnections p) ++ "}"
 
 instance Eq Pool where
     a == b = client a == client b && numStripes a == numStripes b &&
        -- on a single stripe, even if other stripes have idle
        -- connections available.
        -> IO Pool
-create client numStripes idleTime maxConnections = do
-  when (numStripes < 1) $
-    modError "pool " $ "invalid stripe count " ++ show numStripes
-  when (idleTime < 0.5) $
-    modError "pool " $ "invalid idle time " ++ show idleTime
-  when (maxConnections < 1) $
-    modError "pool " $ "invalid maximum connection count " ++
-                       show maxConnections
-  localPools <- atomically . V.replicateM numStripes $
-                liftM2 LocalPool (newTVar 0) (newTVar [])
-  reaperId <- forkIO $ reaper idleTime localPools
-  let p = Pool {
-            client
-          , numStripes
-          , idleTime
-          , maxConnections
-          , localPools
-          }
-  addFinalizer p $ killThread reaperId
-  return p
+create client ns it mc =
+    Pool client `fmap` Pool.createPool c disconnect ns it mc
+  where c = do
+          cid <- makeClientID
+          connect client { clientID = cid }
 
--- | Periodically go through all pools, closing any connections that
--- have been left idle for too long.
-reaper :: NominalDiffTime -> V.Vector LocalPool -> IO ()
-reaper idleTime pools = forever $ do
-  threadDelay (2 * 1000000)
-  now <- getCurrentTime
-  let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime
-  V.forM_ pools $ \LocalPool{..} -> do
-    conns <- atomically $ do
-      (stale,fresh) <- partition isStale <$> readTVar entries
-      unless (null stale) $ do
-        writeTVar entries fresh
-        modifyTVar_ connected (subtract (length stale))
-      return (map connection stale)
-    forM_ conns $ \conn -> do
-      debug "reaper" "closing idle connection"
-      disconnect conn `catch` \(_::SomeException) -> return ()
-              
+-- | Stripe count.  The number of distinct sub-pools to maintain.  The
+-- smallest acceptable value is 1.
+numStripes :: Pool -> Int
+numStripes = Pool.numStripes . pool
+
+-- | Amount of time for which an unused connection is kept open.  The
+-- smallest acceptable value is 0.5 seconds.
+--
+-- The elapsed time before closing may be a little longer than
+-- requested, as the reaper thread wakes at 1-second intervals.
+idleTime :: Pool -> NominalDiffTime
+idleTime = Pool.idleTime . pool
+
+-- | Maximum number of connections to keep open per stripe.  The
+-- smallest acceptable value is 1.
+-- 
+-- Requests for connections will block if this limit is reached on a
+-- single stripe, even if other stripes have idle connections
+-- available.
+maxConnections :: Pool -> Int
+maxConnections = Pool.maxResources . pool
+
 -- | Temporarily take a connection from a 'Pool', perform an action
 -- with it, and return it to the pool afterwards.
 --
 -- 'disconnect' on a connection, as doing so will cause a subsequent
 -- user (who expects the connection to be valid) to throw an exception.
 withConnection :: Pool -> (Connection -> IO a) -> IO a
-withConnection Pool{..} act = do
-  i <- ((`mod` numStripes) . hash) <$> myThreadId
-  let LocalPool{..} = localPools V.! i
-  conn <- join . atomically $ do
-    ents <- readTVar entries
-    case ents of
-      (Entry{..}:es) -> writeTVar entries es >> return (return connection)
-      [] -> do
-        inUse <- readTVar connected
-        when (inUse == maxConnections) retry
-        writeTVar connected $! inUse + 1
-        return $ do
-          cid <- makeClientID
-          connect client { clientID = cid }
-            `onException` atomically (modifyTVar_ connected (subtract 1))
-  ret <- act conn `onException` do
-           disconnect conn `catch` \(_::SomeException) -> return ()
-           atomically (modifyTVar_ connected (subtract 1))
-  now <- getCurrentTime
-  atomically $ modifyTVar_ entries (Entry conn now:)
-  return ret
-
-modifyTVar_ :: TVar a -> (a -> a) -> STM ()
-modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a
-
-modError :: String -> String -> a
-modError func msg =
-    error $ "Network.Riak.Connection.Pool." ++ func ++ ": " ++ msg
+withConnection = Pool.withResource . pool