Commits

Kelvin Jin committed 2433fdb

Made some small changes, see comment for details

  • Participants
  • Parent commits f64a338

Comments (1)

  1. Kelvin Jin author

    Concerning the reduce problem: It seems like a pretty clear indicator of what's wrong is in worker_manager.ml, line 43: the variable is matched with (Some(Mapper(,)),Reduce) which returns an error and thus no workers are added to the worker_manager. We want it to match to somehow (Some(Reducer(,)),Reduce), but I've spent a long time looking at it and can't figure out why. My guess is that the problem is in worker.ml.

    Also I changed the thread pool size because my computer runs out of memory with 200 threads. Also de-commented out reduce. Everything else is trivial.

Files changed (1)

File controller/map_reduce.ml

 
 let map kv_pairs shared_data map_filename : (string * string) list = 
   let wm = Worker_manager.initialize_mappers map_filename shared_data in
-  let p = Thread_pool.create 200 in
+  let p = Thread_pool.create 100 in
   let work = Hashtbl.create (List.length kv_pairs) in
   let results = ref [] in  
   List.iter (fun (k,v) -> Hashtbl.add work k v) kv_pairs;
   perform_mapping work
   
 let combine kv_pairs : (string * string list) list = 
-  let htbl = Hashtbl.create (List.length kv_pairs) in
+  let htbl = Hashtbl.create 16 in
   let add (k,v) = 
     if (Hashtbl.mem htbl k) then
       let ls = (Hashtbl.find htbl k) in 
-      (Hashtbl.remove htbl k); (Hashtbl.add htbl k (v::ls));
+      Hashtbl.replace htbl k (v::ls)
     else
-      (Hashtbl.add htbl k [v]);
+      Hashtbl.add htbl k [v]
   in
-  List.iter (fun (k,v) -> add (k,v)) kv_pairs;
+  List.iter add kv_pairs;
   
   let results = Hashtbl.fold (fun k v acc -> (k, v) :: acc) htbl [] in
   print_combine_results results;
   results
   
 let reduce kvs_pairs shared_data reduce_filename : (string * string list) list =
-  (* let wm = Worker_manager.initialize_reducers reduce_filename shared_data in
-    let p = Thread_pool.create 200 in
+  let wm = Worker_manager.initialize_reducers reduce_filename shared_data in
+    let p = Thread_pool.create 100 in
     let work = Hashtbl.create (List.length kvs_pairs) in
     let results = ref [] in  
     List.iter (fun (k,v) -> Hashtbl.add work k v) kvs_pairs;
           perform_reduce work_table
         )
     in
-    perform_reduce work *)
-    failwith "reduce not here"
+    perform_reduce work
   
 let map_reduce (app_name : string) (mapper : string) 
     (reducer : string) (filename : string) =