hs-cctools-workqueue / Example.hs

-- -*- indent-tabs-mode: nil -*- --

module Main where

import Control.Distributed.CCTools.WorkQueue

import qualified Data.ByteString.Char8 as BS (pack, putStrLn)
import Control.Monad (forM_)

-- | Creation of Tasks: sleep for max of 10 seconds before echoing the parameter
--   Generate a bash script in-memory that will be materialized on the worker using
--   the 'work_queue_specify_buffer' api.
--   Set the tag for the task to be the string representation of the parameter.
mktask :: Show a => a -> IO Task
mktask v = do

  -- the script to execute: intercalate with '\n' then convert to ByteString
  let script = BS.pack . unlines $ [
              "t=$(echo $RANDOM % 10 | bc)"
            , "sleep $t"
            , "echo " ++ show v

  -- Create the Task.
  -- 'task' :: Command -> IO Task
  -- 'cmd'  :: String -> Command
  t <- task $ cmd "bash script.sh"

  -- specifyBuffer :: Task -> ByteString -> Location Remote -> Cached -> IO ()
  -- 'remote' :: FilePath -> Location Remote
  -- Locations can be either 'Location Local' or 'Location Remote', which have different semantic meanings.
  -- If we used for instance 'specifyFile', it has type Task -> Location Local -> Location Remote -> Cached -> IO ()
  -- This prevents local & remote files from being nigligently swapped. There are appropriate 'smart constructor's for each:
  -- 'remote' and 'local' to create them as needed.
  specifyBuffer t script (remote "script.sh") False

  -- Set the tag for the task.
  -- 'mktask' is polymorphic, but we need a string representation
  -- here, so the type is constrained to implement the Show interface
  -- specifyTag :: Task -> String -> IO ()
  specifyTag    t $ show v

  return t

-- | Intended to run as part of the event loop prior to calling 'work_queue_wait'
--   Used as a callback, it accepts the current WorkQueue and can perform arbitrary actions
--   In this case though, all it does is get the current runtime
--   WorkQueue statistics and print the number of tasks running,
--   waiting, and complete to stdout
printStats :: WorkQueue -> IO ()
printStats q = do
  s <- getStats q

  -- equivalent to print (map (\f -> f s) [tasksRunning, tasksWaiting, tasksComplete])
  -- ...but prettier
  print $ map ($ s) [tasksRunning, tasksWaiting, tasksComplete]

-- | Callback for when a Task is successfully returned.
--   Executed after calling 'work_queue_wait'
--  In this instance, just print the task tag and the task output to stdout
--  We then need to call delete b/c Tasks aren't tracked by the garbage collector
processResult :: WorkQueue -> Task -> IO ()
processResult q r = do
  putStrLn $ "Got: " ++ show (tag r)
  BS.putStrLn . output $ r
  delete r

main = do

  -- Make WorkQueue print out all debugging information.  This is
  -- equivalent to the work_queue.set_debug_flags in the python api.
  -- However, the type of setDebugFlags is [DebugFlag] -> IO (),
  -- allowing compiletime errors when an invalid DebugFlag is tried.
  setDebugFlags [All]

  -- Create the WorkQueue. We override the default parameters to
  -- change the port to 1024, use a catalog server, set the project
  -- name to "hswq", and log the runtime stats to "/tmp/wq.log"
  -- workqueue :: QueueParams -> IO WorkQueue
  q  <- workqueue $ defaultQParams { _qport   = Just $ port 1024
                                   , _mode    = Just Catalog
                                   , _name    = Just "hswq"
                                   , _logfile = Just "/tmp/wq.log"

  -- create 10 tasks
  ts <- mapM mktask [1..10]

  -- submit the tasks to the queue
  -- (equivalent to mapM_ (submit q) ts)
  forM_ ts (submit q)

  -- The eventLoop provides a while loop that waits until all tasks in the queue are complete.
  -- Two callbacks can be provided:
  --   the first is called at the start of the loop with the WorkQueue as the parameter.
  --   the second is called only on successfully returned Tasks
  -- The type is:
  -- eventLoop :: WorkQueue -> Timeout -> (WorkQueue -> IO ()) -> (WorkQueue -> Task -> IO ()) -> IO ()
  -- In pseudocode, it is equivalent to:
  -- eventLoop q timeout update process =
  --    while (q is not empty)
  --      update q
  --      r <- wait q timeout
  --      when r is Just: process q r
  -- In this case, we set the Timeout to 1 second (Timeouts are either
  -- 'Forever' or a positive number of seconds (specified at the
  -- type-level)).
  -- The 'update' callback prints the current runtime statistics,
  -- while the 'process' callback prints the task tag and output to
  -- stdout
  eventLoop q (seconds 1) printStats processResult

  putStrLn "Done!"
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.