Commits

Nick Wade committed facd5ef

Updated map reduce functions - still untested and probably wrong

  • Participants
  • Parent commits 1967653

Comments (0)

Files changed (1)

controller/map_reduce.ml

 open Util
 open Worker_manager
 
-let table = Hashtbl.create 50 and
-let mutex = Mutex.create and
-let map_results = ref [] and
-let pool = Thread_pool.create 200 in
-
-let map_job pair wm =
-  let worker = Worker_manager.pop_worker wm in
-  let (k,v) = pair in
-  match Worker_manager.map worker k v with
-    | Some l -> map_success worker l wm
-    | None -> map_failure worker wm
-and
-
-let map_success worker lst wm =
-  map_results := !map_results @ lst
-  Mutex.lock mutex
-    Hashtbl.remove table worker
-  Mutex.unlock mutex
-  Worker_manager.push_worker wm worker
-and
-
-let map_failure = worker wm =
-  Mutex.lock mutex
-    let pair = Hashtbl.find table worker and
-    Hashtbl.remove table worker;
-  Mutex.unlock mutex
-  map_job pair wm
-and
-
 let map kv_pairs shared_data map_filename : (string * string) list = 
   let manager = Worker_manager.initialize_mappers map_filename shared_data in
+  let pool = Thread_pool.create 200 in
+  let mutex = Mutex.create() in
+  let results = ref [] in  
+    let rec helper (key, value) =
+      let do_work() =
+        let worker = Worker_manager.pop_worker manager in
+          match Worker_manager.map worker key value with
+            | None -> helper (key, value)
+            | Some l -> ( Mutex.lock mutex; 
+              results := !results @ l;
+              Mutex.unlock mutex;
+              Worker_manager.push_worker manager worker; )
+      in Thread_pool.add_work do_work pool;
+    in List.iter helper kv_pairs;
+    Worker_manager.clean_up_workers manager;
+    Thread_pool.destroy pool;
+    !results;  
   
-  let helper pairs =
-    let done =
-      Worker_manager.clean_up_workers;
-    in
-    match pairs with
-    | [] -> done(); !map_results
-    | (k,v)::tl -> Thread_pool.add_work (map_job (k,v) wm); helper tl
-  in
-  helper kv_pairs
-
 let combine kv_pairs : (string * string list) list = 
-  failwith "You have been doomed ever since you lost the ability to love."
+  let combined_list = ref [] in
+  let table = Hashtbl.create 50 in
+    let add (key, value) = 
+      let new_value = if Hashtbl.mem table key then
+        value :: (Hashtbl.find table key) else [v] in
+      Hashtbl.add table key new_value 
+    in List.iter add kv_pairs;
+    Hashtbl.iter (fun k v -> combined_list := (k, v) :: !combined_list) table;
+    !combined_list;
+      
 let reduce kvs_pairs shared_data reduce_filename : (string * string list) list =
-  failwith "The only thing necessary for evil to triumph is for good men to do nothing"
+  let manager = Worker_manager.initialize_reducers reduce_filename in
+  let pool = Thread_pool.create 200 in
+  let mutex = Mutex.create() in
+  let results = ref [] in
+  let rec helper (key, lst) =
+    let do_work() =
+      let worker = Worker_manager.pop_worker manager in
+        match Worker_manager.reduce worker key lst with
+          | None -> helper (key, lst)
+          | Some l -> ( Mutex.lock mutex;
+              results := !results @ l;
+              Worker_manager.push_worker manager worker; )
+      in Thread_pool.add_work do_work pool;
+    in List.iter helper kvs_pairs;
+    Worker_manager.clean_up_workers manager;
+    Thread_pool.destroy pool;
+    !results;
 
 let map_reduce (app_name : string) (mapper : string) 
     (reducer : string) (filename : string) =