Commits

Nick Wade  committed 401aeb4

Potentially working map and combine - filler for reduce

  • Participants
  • Parent commits f7cb822

Comments (0)

Files changed (1)

File controller/map_reduce.ml

 open Util
 open Worker_manager
-open Hashtable
 
-let map kv_pairs shared_data map_filename : (string * string) list =
+(* TODO implement these *)
+let map kv_pairs shared_data map_filename : (string * string) list = 
   let wm = Worker_manager.initialize_mappers map_filename shared_data in
   let p = Thread_pool.create 200 in
   let work = Hashtbl.create (List.length kv_pairs) in
   let results = ref [] in  
-  List.iter (fun (k,v) -> Hashtbl.add work k v);
-  
+  List.iter (fun (k,v) -> Hashtbl.add work k v) kv_pairs;
+
   let work_mutex = Mutex.create() in
   let results_mutex = Mutex.create() in
-  
-  let a_job k v = 
+
+  let a_job (k,v) () = 
     let w = Worker_manager.pop_worker wm in
     match (Worker_manager.map w k v) with
     | Some l -> 
         if Hashtbl.mem work k then
           ( Hashtbl.remove work k;
             Mutex.lock(results_mutex);
-            results := !results @ [l];
+            results := !results @ l;
             Mutex.unlock(results_mutex);
             Mutex.unlock(work_mutex);
             Worker_manager.push_worker wm w; )
             Worker_manager.push_worker wm w; ))
     | None -> () (* There was some error with the task *)
   in
-  
+
   let rec perform_mapping work_table =
-    if (Mutex.lock(work_mutex);
-        Hashtbl.length work_table = 0
-        Mutex.unlock(work_mutex;)) then
+    if (Hashtbl.length work_table = 0) then
       ( Worker_manager.clean_up_workers wm;
         Thread_pool.destroy p;
+        print_map_results !results;
         !results )
     else
-      ( Hashtbl.iter (fun k v -> Thread_pool.add_work(a_job(k v)) p) work_table;
+      ( Hashtbl.iter (fun k v -> Thread_pool.add_work(a_job (k,v)) p) work_table;
         Thread.delay(0.1); (* prevent flooding of unnecessary work *)
         perform_mapping work_table
       )
   perform_mapping work
   
 let combine kv_pairs : (string * string list) list = 
-  let combined_list = ref [] in
-  let table = Hashtbl.create 50 in
-    let add (key, value) = 
-      let new_value = if Hashtbl.mem table key then
-        value :: (Hashtbl.find table key) else [v] in
-      Hashtbl.add table key new_value 
-    in List.iter add kv_pairs;
-    Hashtbl.iter (fun k v -> combined_list := (k, v) :: !combined_list) table;
-    !combined_list;
-      
+  let combined = Hashtbl.create (List.length kv_pairs) in
+  let add (key, value) = 
+    Hashtbl.add combined key (if Hashtbl.mem combined key 
+      then value::(Hashtbl.find combined key) 
+      else [value])
+  in List.iter add kv_pairs;
+  
+  let c_results = ref [] in
+  Hashtbl.iter (fun k v -> c_results := (k,v)::!c_results) combined;
+  print_combine_results !c_results;
+  !c_results
+  
 let reduce kvs_pairs shared_data reduce_filename : (string * string list) list =
+  [("",[""])]
   
-
 let map_reduce (app_name : string) (mapper : string) 
     (reducer : string) (filename : string) =
   let app_dir = Printf.sprintf "apps/%s/" app_name in