Bryan O'Sullivan avatar Bryan O'Sullivan committed 778a7d1

Add support for connection pooling

Comments (0)

Files changed (2)

   exposed-modules:     
     Network.Riak
     Network.Riak.Connection
+    Network.Riak.Connection.Pool
     Network.Riak.Content
     Network.Riak.Debug
     Network.Riak.Request
     binary,
     bytestring,
     containers,
+    hashable,
     network >= 2.3,
     protocol-buffers >= 1.8.0,
     protocol-buffers-descriptor >= 1.8.1,
     pureMD5,
-    random
+    random,
+    stm,
+    time,
+    vector >= 0.7
 
   if flag(debug)
     cpp-options: -DASSERTS -DDEBUG

src/Network/Riak/Connection/Pool.hs

+{-# LANGUAGE DeriveDataTypeable, NamedFieldPuns, RecordWildCards,
+    ScopedTypeVariables #-}
+
+-- |
+-- Module:      Network.Riak.Connection.Pool
+-- Copyright:   (c) 2011 MailRank, Inc.
+-- License:     Apache
+-- Maintainer:  Bryan O'Sullivan <bos@mailrank.com>
+-- Stability:   experimental
+-- Portability: portable
+--
+-- A high-performance striped pooling abstraction for managing
+-- connections to a Riak cluster.
+--
+-- \"Striped\" means that a single 'Pool' consists of several
+-- sub-pools, each managed independently.  A stripe size of 1 is fine
+-- for many applications, and probably what you should choose by
+-- default.  Larger stripe sizes will lead to reduced contention in
+-- high-performance multicore applications, at a trade-off of causing
+-- the maximum number of simultaneous connections to grow.
+module Network.Riak.Connection.Pool
+    (
+      Pool
+    , client
+    , create
+    , idleTime
+    , maxConnections
+    , numStripes
+    , withConnection
+    ) where
+
+import Control.Applicative ((<$>))
+import Control.Concurrent (forkIO, killThread, myThreadId, threadDelay)
+import Control.Concurrent.STM
+import Control.Exception (SomeException, catch, onException)
+import Control.Monad (forM_, forever, join, liftM2, unless, when)
+import Data.Hashable (hash)
+import Data.List (partition)
+import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
+import Data.Typeable (Typeable)
+import Network.Riak.Connection.Internal (connect, disconnect, makeClientID)
+import Network.Riak.Debug (debug)
+import Network.Riak.Types (Client(clientID), Connection)
+import Prelude hiding (catch)
+import System.Mem.Weak (addFinalizer)
+import qualified Data.Vector as V
+
+-- | A single connection pool entry.
+data Entry = Entry {
+      connection :: Connection
+    , lastUse :: UTCTime
+    -- ^ Time of last return.
+    }
+
+-- | A single striped pool.
+data LocalPool = LocalPool {
+      connected :: TVar Int
+    -- ^ Count of open connections (both idle and in use).
+    , entries :: TVar [Entry]
+    -- ^ Idle entries.
+    }
+
+-- | A pool of connections to a Riak server.
+--
+-- This pool is \"striped\", i.e. it consists of several sub-pools
+-- that are managed independently.
+--
+-- The total number of connections that can possibly be open at once
+-- is 'maxConnections' * 'numStripes'.
+data Pool = Pool {
+      client :: Client
+    -- ^ Client specification.  The client ID is ignored, and always
+    -- regenerated automatically for each new connection.
+    , numStripes :: Int
+    -- ^ Stripe count.  The number of distinct sub-pools to maintain.
+    -- The smallest acceptable value is 1.
+    , idleTime :: NominalDiffTime
+    -- ^ Amount of time for which an unused connection is kept open.
+    -- The smallest acceptable value is 0.5 seconds.
+    --
+    -- The elapsed time before closing may be a little longer than
+    -- requested, as the reaper thread wakes at 2-second intervals.
+    , maxConnections :: Int
+    -- ^ Maximum number of connections to keep open per stripe.  The
+    -- smallest acceptable value is 1.
+    -- 
+    -- Requests for connections will block if this limit is reached on
+    -- a single stripe, even if other stripes have idle connections
+    -- available.
+    , localPools :: V.Vector LocalPool
+    -- ^ Per-capability connection pools.
+    } deriving (Typeable)
+
+instance Show Pool where
+    show Pool{..} = "Pool { client = " ++ show client ++ ", " ++
+                    "numStripes = " ++ show numStripes ++ ", " ++
+                    "idleTime = " ++ show idleTime ++ ", " ++
+                    "maxConnections = " ++ show maxConnections ++ "}"
+
+instance Eq Pool where
+    a == b = client a == client b && numStripes a == numStripes b &&
+             idleTime a == idleTime b && maxConnections a == maxConnections b
+
+-- | Create a new connection pool.
+create :: Client
+       -- ^ Client configuration.  The client ID is ignored, and
+       -- always regenerated automatically for each new connection.
+       -> Int
+       -- ^ Stripe count.  The number of distinct sub-pools to
+       -- maintain.  The smallest acceptable value is 1.
+       -> NominalDiffTime
+       -- ^ Amount of time for which an unused connection is kept
+       -- open.  The smallest acceptable value is 0.5 seconds.
+       --
+       -- The elapsed time before closing may be a little longer than
+       -- requested, as the reaper thread wakes at 2-second intervals.
+       -> Int
+       -- ^ Maximum number of connections to keep open per stripe.
+       -- The smallest acceptable value is 1.
+       -- 
+       -- Requests for connections will block if this limit is reached
+       -- on a single stripe, even if other stripes have idle
+       -- connections available.
+       -> IO Pool
+create client numStripes idleTime maxConnections = do
+  when (numStripes < 1) $
+    modError "pool " $ "invalid stripe count " ++ show numStripes
+  when (idleTime < 0.5) $
+    modError "pool " $ "invalid idle time " ++ show idleTime
+  when (maxConnections < 1) $
+    modError "pool " $ "invalid maximum connection count " ++
+                       show maxConnections
+  localPools <- atomically . V.replicateM numStripes $
+                liftM2 LocalPool (newTVar 0) (newTVar [])
+  reaperId <- forkIO $ reaper idleTime localPools
+  let p = Pool {
+            client
+          , numStripes
+          , idleTime
+          , maxConnections
+          , localPools
+          }
+  addFinalizer p $ killThread reaperId
+  return p
+
+-- | Periodically go through all pools, closing any connections that
+-- have been left idle for too long.
+reaper :: NominalDiffTime -> V.Vector LocalPool -> IO ()
+reaper idleTime pools = forever $ do
+  threadDelay (2 * 1000000)
+  now <- getCurrentTime
+  let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime
+  V.forM_ pools $ \LocalPool{..} -> do
+    conns <- atomically $ do
+      (stale,fresh) <- partition isStale <$> readTVar entries
+      unless (null stale) $ do
+        writeTVar entries fresh
+        modifyTVar_ connected (subtract (length stale))
+      return (map connection stale)
+    forM_ conns $ \conn -> do
+      debug "reaper" "closing idle connection"
+      disconnect conn `catch` \(_::SomeException) -> return ()
+              
+-- | Temporarily take a connection from a 'Pool', perform an action
+-- with it, and return it to the pool afterwards.
+--
+-- * If the pool has a connection available, it is used
+--   immediately.
+--
+-- * Otherwise, if the maximum number of connections has not been
+--   reached, a new connection is created and used.
+--
+-- * If the maximum number of connections has been reached, this
+--   function blocks until a connection becomes available, then that
+--   connection is used.
+--
+-- If the action throws an exception of any type, the 'Connection' is
+-- destroyed, and not returned to the pool.
+--
+-- It probably goes without saying that you should never call
+-- 'disconnect' on a connection, as doing so will cause a subsequent
+-- user, expecting the connection to be valid, to throw an exception.
+withConnection :: Pool -> (Connection -> IO a) -> IO a
+withConnection Pool{..} act = do
+  i <- ((`mod` numStripes) . hash) <$> myThreadId
+  let LocalPool{..} = localPools V.! i
+  conn <- join . atomically $ do
+    ents <- readTVar entries
+    case ents of
+      (Entry{..}:es) -> writeTVar entries es >> return (return connection)
+      [] -> do
+        inUse <- readTVar connected
+        when (inUse == maxConnections) retry
+        writeTVar connected $! inUse + 1
+        return $ do
+          cid <- makeClientID
+          connect client { clientID = cid }
+            `onException` atomically (modifyTVar_ connected (subtract 1))
+  ret <- act conn `onException` do
+           disconnect conn `catch` \(_::SomeException) -> return ()
+           atomically (modifyTVar_ connected (subtract 1))
+  now <- getCurrentTime
+  atomically $ modifyTVar_ entries (Entry conn now:)
+  return ret
+
+modifyTVar_ :: TVar a -> (a -> a) -> STM ()
+modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a
+
+modError :: String -> String -> a
+modError func msg =
+    error $ "Network.Riak.Connection.Pool." ++ func ++ ": " ++ msg
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.