1. badi
  2. hs-cctools-workqueue

Commits

badi  committed 1688741

update example with explanatory comments

  • Participants
  • Parent commits 9b3542b
  • Branches master

Comments (0)

Files changed (1)

File Example.hs

View file
  • Ignore whitespace
 
 import Control.Distributed.CCTools.WorkQueue
 
-import Control.Applicative ((<$>))
 import qualified Data.ByteString.Char8 as BS (pack, putStrLn)
-import Foreign.C.String (newCStringLen)
 import Control.Monad (forM_)
 
 
-
+-- | Creation of Tasks: sleep for max of 10 seconds before echoing the parameter
+-- 
+--   Generate a bash script in-memory that will be materialized on the worker using
+--   the 'work_queue_specify_buffer' api.
+--   Set the tag for the task to be the string representation of the parameter.
 mktask :: Show a => a -> IO Task
 mktask v = do
+
+  -- the script to execute: intercalate with '\n' then convert to ByteString
   let script = BS.pack . unlines $ [
               "t=$(echo $RANDOM % 10 | bc)"
             , "sleep $t"
             , "echo " ++ show v
             ]
+
+  -- Create the Task.
+  -- 'task' :: Command -> IO Task
+  -- 'cmd'  :: String -> Command
   t <- task $ cmd "bash script.sh"
+
+  -- specifyBuffer :: Task -> ByteString -> Location Remote -> Cached -> IO ()
+  -- 'remote' :: FilePath -> Location Remote
+  -- Locations can be either 'Location Local' or 'Location Remote', which have different semantic meanings.
+  -- If we used for instance 'specifyFile', it has type Task -> Location Local -> Location Remote -> Cached -> IO ()
+  -- This prevents local & remote files from being nigligently swapped. There are appropriate 'smart constructor's for each:
+  -- 'remote' and 'local' to create them as needed.
   specifyBuffer t script (remote "script.sh") False
+
+  -- Set the tag for the task.
+  -- 'mktask' is polymorphic, but we need a string representation
+  -- here, so the type is constrained to implement the Show interface
+  -- specifyTag :: Task -> String -> IO ()
   specifyTag    t $ show v
+
   return t
 
+-- | Intended to run as part of the event loop prior to calling 'work_queue_wait'
+--   Used as a callback, it accepts the current WorkQueue and can perform arbitrary actions
+--   In this case though, all it does is get the current runtime
+--   WorkQueue statistics and print the number of tasks running,
+--   waiting, and complete to stdout
 printStats :: WorkQueue -> IO ()
 printStats q = do
   s <- getStats q
+
+  -- equivalent to print (map (\f -> f s) [tasksRunning, tasksWaiting, tasksComplete])
+  -- ...but prettier
   print $ map ($ s) [tasksRunning, tasksWaiting, tasksComplete]
 
+-- | Callback for when a Task is successfully returned.
+--   Executed after calling 'work_queue_wait'
+--  In this instance, just print the task tag and the task output to stdout
+--  We then need to call delete b/c Tasks aren't tracked by the garbage collector
 processResult :: WorkQueue -> Task -> IO ()
 processResult q r = do
   putStrLn $ "Got: " ++ show (tag r)
   delete r
 
 main = do
+
+  -- Make WorkQueue print out all debugging information.  This is
+  -- equivalent to the work_queue.set_debug_flags in the python api.
+  -- However, the type of setDebugFlags is [DebugFlag] -> IO (),
+  -- allowing compiletime errors when an invalid DebugFlag is tried.
   setDebugFlags [All]
+
+  -- Create the WorkQueue. We override the default parameters to
+  -- change the port to 1024, use a catalog server, set the project
+  -- name to "hswq", and log the runtime stats to "/tmp/wq.log"
+  -- workqueue :: QueueParams -> IO WorkQueue
   q  <- workqueue $ defaultQParams { _qport   = Just $ port 1024
                                    , _mode    = Just Catalog
                                    , _name    = Just "hswq"
                                    , _logfile = Just "/tmp/wq.log"
                                    }
+
+  
+  -- create 10 tasks
   ts <- mapM mktask [1..10]
+
+  -- submit the tasks to the queue
+  -- (equivalent to mapM_ (submit q) ts)
   forM_ ts (submit q)
 
+  -- The eventLoop provides a while loop that waits until all tasks in the queue are complete.
+  -- Two callbacks can be provided:
+  --   the first is called at the start of the loop with the WorkQueue as the parameter.
+  --   the second is called only on successfully returned Tasks
+  --
+  -- The type is:
+  -- eventLoop :: WorkQueue -> Timeout -> (WorkQueue -> IO ()) -> (WorkQueue -> Task -> IO ()) -> IO ()
+  --
+  -- In pseudocode, it is equivalent to:
+  -- eventLoop q timeout update process =
+  --    while (q is not empty)
+  --      update q
+  --      r <- wait q timeout
+  --      when r is Just: process q r
+  --
+  -- In this case, we set the Timeout to 1 second (Timeouts are either
+  -- 'Forever' or a positive number of seconds (specified at the
+  -- type-level)).
+  -- The 'update' callback prints the current runtime statistics,
+  -- while the 'process' callback prints the task tag and output to
+  -- stdout
   eventLoop q (seconds 1) printStats processResult
 
   putStrLn "Done!"