Source

hs-cctools-workqueue / Example.hs

Full commit
-- -*- indent-tabs-mode: nil -*- --

module Main where

import Control.Distributed.CCTools.WorkQueue

import qualified Data.ByteString.Char8 as BS (pack, putStrLn)
import Control.Monad (forM_)


-- | Creation of Tasks: sleep for max of 10 seconds before echoing the parameter
-- 
--   Generate a bash script in-memory that will be materialized on the worker using
--   the 'work_queue_specify_buffer' api.
--   Set the tag for the task to be the string representation of the parameter.
mktask :: Show a => a -> IO Task
mktask v = do

  -- the bash script to execute: sleep a max of 10 seconds then echo the parameter
  let script = BS.pack . unlines $ [
              "t=$(echo $RANDOM % 10 | bc)"
            , "sleep $t"
            , "echo " ++ show v
            ]

  -- Create the Task.
  -- cmd  :: String -> Command
  -- task :: Command -> IO Task
  t <- task $ cmd "bash script.sh"

  -- specifyBuffer :: Task -> ByteString -> Location Remote -> Cached -> IO ()
  -- remote :: FilePath -> Location Remote
  -- Locations can be either 'Location Local' or 'Location Remote', which have different semantic meanings:
  -- both are 'FilePath's, but the former is on the Master and the latter the Worker.
  -- If we used for instance 'specifyFile', it has type Task -> Location Local -> Location Remote -> Cached -> IO ()
  -- This prevents local & remote files from being nigligently swapped. There are appropriate "smart constructor"s for each:
  -- use 'remote' and 'local' to create them as needed.
  specifyBuffer t script (remote "script.sh") False

  -- Set the tag for the task.
  -- 'mktask' is polymorphic, but we need a string representation
  -- here, so the type is constrained to implement the Show interface
  -- specifyTag :: Task -> String -> IO ()
  specifyTag    t $ show v

  return t


-- | Callback for the start of the event loop, called prior to 'work_queue_wait'
--   It accepts the current WorkQueue and can perform arbitrary actions.
--   In this case though, all it does is get the current runtime
--   WorkQueue statistics and print the number of tasks running,
--   waiting, and complete to stdout
printStats :: WorkQueue -> IO ()
printStats q = do
  s <- getStats q

  -- equivalent to print (map (\f -> f s) [tasksRunning, tasksWaiting, tasksComplete])
  -- ...but prettier
  print $ map ($ s) [tasksRunning, tasksWaiting, tasksComplete]


-- | Callback for when a Task is successfully returned.
--   Executed after calling 'work_queue_wait'
--  In this instance, just print the task tag and the task output to stdout
--  We then need to call delete b/c Tasks aren't tracked by the garbage collector.
processResult :: WorkQueue -> Task -> IO ()
processResult q r = do
  putStrLn $ "Got: " ++ show (tag r)
  BS.putStrLn . output $ r
  delete r



main = do

  -- Make WorkQueue print out all debugging information.  This is
  -- equivalent to the work_queue.set_debug_flags in the python api.
  -- However, the type of setDebugFlags is [DebugFlag] -> IO (),
  -- allowing compiletime errors when an invalid DebugFlag is tried.
  setDebugFlags [All]

  -- Create the WorkQueue. We override the default parameters to
  -- change the port to 1024, use a catalog server, set the project
  -- name to "hswq", and log the runtime stats to "/tmp/wq.log"
  -- The default parameters only specify port 9123 and the catalog server, project name, and logging are disabled.
  -- workqueue :: QueueParams -> IO WorkQueue
  q  <- workqueue $ defaultQParams { _qport   = Just $ port 1024
                                   , _mode    = Just Catalog
                                   , _name    = Just "hswq"
                                   , _logfile = Just "/tmp/wq.log"
                                   }

  
  -- create and submit 10 tasks
  forM_ [1..10] $ \i -> do
         t <- mktask i
         submit q t

  -- The eventLoop provides a while loop that waits until all tasks in the queue are complete.
  -- Two callbacks can be provided:
  --   - the first is called at the start of the loop with the WorkQueue as the parameter.
  --   - the second is called only on successfully returned Tasks
  --
  -- The type is:
  -- eventLoop :: WorkQueue -> Timeout -> (WorkQueue -> IO ()) -> (WorkQueue -> Task -> IO ()) -> IO ()
  --
  -- In pseudocode, it is equivalent to:
  -- eventLoop q timeout update process =
  --    while (q is not empty)
  --      update q
  --      r <- wait q timeout
  --      when r is Just: process q r
  --
  -- In this case, we set the Timeout to 1 second (Timeouts are either
  -- 'Forever' or a positive number of seconds. The 'printStats' callback
  -- prints the current runtime statistics, while the 'processResult'
  -- callback prints the task tag and output to stdout
  eventLoop q (seconds 1) printStats processResult

  putStrLn "Done!"