Commits

Stefan Saasen committed e6fd623

Change count to use parallel map reduce

Comments (0)

Files changed (5)

logparser/logparser.cabal

 executable logparser
     main-is:                Main.hs
     hs-source-dirs:         src
-    other-modules:          Stash.Log.Analyser, Stash.Log.Parser, Stash.Log.GitOpsAnalyser, Stash.Log.Common
+    other-modules:          Stash.Log.Analyser, Stash.Log.Parser, Stash.Log.GitOpsAnalyser, Stash.Log.Common, Stash.Log.Parallel, Stash.Log.LineChunks
     build-depends:          base < 5 && >= 3,
                             bytestring >= 0.9,
                             old-locale,
                             data-default,
                             mtl >= 2.0.0.0 && < 3,
                             text,
+                            parallel,
+                            deepseq,
+                            transformers,
                             unordered-containers >= 0.2
     ghc-options:
                             -Wall
                             -fno-warn-unused-do-bind
                             -rtsopts
                             -O2
+                            -threaded
                             -- -prof -auto-all -caf-all -fforce-recomp

logparser/src/Main.hs

 import Stash.Log.Parser
 import Stash.Log.Analyser
 import Stash.Log.GitOpsAnalyser
+import Stash.Log.LineChunks
 import Data.Default
+import Control.Parallel.Strategies (NFData)
 import UI.Command
 import Prelude hiding (takeWhile)
 import Text.Printf (printf)
 import Control.Monad (liftM)
 import Control.Monad.Trans (liftIO)
+import Control.Monad.Trans.Reader (ReaderT)
 
 -- =================================================================================
 
 parseAndPrint :: (Show a) => (Input -> a) -> FilePath -> IO ()
 parseAndPrint f path = print . f . L.lines =<< L.readFile path
 
-printCountLines :: (Show a) => (L.ByteString -> a) -> FilePath -> IO ()
-printCountLines f path = print . f =<< L.readFile path
+
+
+printCountLines :: (Show a, NFData a) => ([L.ByteString] -> a) -> FilePath -> IO ()
+printCountLines f path = do
+                    numLines <- chunkedReadWith f path
+                    putStrLn $ path ++ ": " ++ show numLines
 
 toLines :: FilePath -> IO [L.ByteString]
 toLines path = liftM L.lines $ L.readFile path

logparser/src/Stash/Log/Analyser.hs

 import qualified Data.HashMap.Strict as M
 import Data.List (foldl', groupBy)
 import Data.Maybe (mapMaybe)
+import Data.Int (Int64)
 import Data.Function (on)
 import Text.Printf (printf)
 import Stash.Log.Parser
 import Stash.Log.Common (logDateEqHour, logDateEqMin)
+import Stash.Log.Parallel
+import Control.Parallel.Strategies
+
 
 data DateValuePair = DateValuePair {
      getLogDate     :: !LogDate
 }
 
 -- | Count the number of lines in the given input file
-countLines :: L.ByteString -> Integer
-countLines input = toInteger $ L.count '\n' input
+countLines :: [L.ByteString] -> Int64
+--countLines input = toInteger $ L.count '\n' input
+countLines = mapReduce rdeepseq (L.count '\n')
+                      rdeepseq sum
 
 -- | Count the number of lines marked as incoming (aka request)
 countRequestLines :: Input -> Integer

logparser/src/Stash/Log/LineChunks.hs

+{-
+From
+Real World Haskell
+by Bryan O'Sullivan, Don Stewart, and John Goerzen
+- http://book.realworldhaskell.org/read/concurrent-and-multicore-programming.html
+-}
+
+module Stash.Log.LineChunks
+    (
+      chunkedReadWith
+    ) where
+
+import Control.Exception (bracket, finally)
+import Control.Monad (forM, liftM)
+import Control.Parallel.Strategies (NFData)
+import Control.DeepSeq (rnf)
+import Data.Int (Int64)
+import qualified Data.ByteString.Lazy.Char8 as LB
+import GHC.Conc (numCapabilities)
+import System.IO
+
+data ChunkSpec = CS {
+      chunkOffset :: !Int64
+    , chunkLength :: !Int64
+    } deriving (Eq, Show)
+
+withChunks :: (NFData a) =>
+              (FilePath -> IO [ChunkSpec])
+           -> ([LB.ByteString] -> a)
+           -> FilePath
+           -> IO a
+withChunks chunkFunc process path = do
+  (chunks, handles) <- chunkedRead chunkFunc path
+  let r = process chunks
+  (rnf r `seq` return r) `finally` mapM_ hClose handles
+  
+chunkedReadWith :: (NFData a) =>
+                   ([LB.ByteString] -> a) -> FilePath -> IO a
+chunkedReadWith func path =
+    withChunks (lineChunks (numCapabilities * 4)) func path
+
+chunkedRead :: (FilePath -> IO [ChunkSpec])
+            -> FilePath
+            -> IO ([LB.ByteString], [Handle])
+chunkedRead chunkFunc path = do
+  chunks <- chunkFunc path
+  liftM unzip . forM chunks $ \spec -> do
+    h <- openFile path ReadMode
+    hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
+    chunk <- LB.take (chunkLength spec) `liftM` LB.hGetContents h
+    return (chunk, h)
+
+lineChunks :: Int -> FilePath -> IO [ChunkSpec]
+lineChunks numChunks path = do
+  bracket (openFile path ReadMode) hClose $ \h -> do
+    totalSize <- fromIntegral `liftM` hFileSize h
+    let chunkSize = totalSize `div` fromIntegral numChunks
+        findChunks offset = do
+          let newOffset = offset + chunkSize
+          hSeek h AbsoluteSeek (fromIntegral newOffset)
+          let findNewline off = do
+                eof <- hIsEOF h
+                if eof
+                  then return [CS offset (totalSize - offset)]
+                  else do
+                    bytes <- LB.hGet h 4096
+                    case LB.elemIndex '\n' bytes of
+                      Just n -> do
+                        chunks@(c:_) <- findChunks (off + n + 1)
+                        let coff = chunkOffset c
+                        return (CS offset (coff - offset):chunks)
+                      Nothing -> findNewline (off + LB.length bytes)
+          findNewline newOffset
+    findChunks 0
+

logparser/src/Stash/Log/Parallel.hs

+{-
+From
+Real World Haskell
+by Bryan O'Sullivan, Don Stewart, and John Goerzen
+- http://book.realworldhaskell.org/read/concurrent-and-multicore-programming.html
+-}
+module Stash.Log.Parallel
+(mapReduce
+) where
+
+import Control.Parallel (pseq)
+import Control.Parallel.Strategies
+
+mapReduce
+    :: Strategy b    -- evaluation strategy for mapping
+    -> (a -> b)      -- map function
+    -> Strategy c    -- evaluation strategy for reduction
+    -> ([b] -> c)    -- reduce function
+    -> [a]           -- list to map over
+    -> c
+mapReduce mapStrat mapFunc reduceStrat reduceFunc input = mapResult `pseq` reduceResult
+  where mapResult    = parMap mapStrat mapFunc input
+        reduceResult = reduceFunc mapResult `using` reduceStrat
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.