OCaml Map Reduce Project / controller /

open Util
open Worker_manager

(* TODO implement these *)
let map kv_pairs shared_data map_filename : (string * string) list = 
  let wm = Worker_manager.initialize_mappers map_filename shared_data in
  let p = Thread_pool.create 200 in
  let work = Hashtbl.create (List.length kv_pairs) in
  let results = ref [] in  
  List.iter (fun (k,v) -> Hashtbl.add work k v) kv_pairs;

  let work_mutex = Mutex.create() in
  let results_mutex = Mutex.create() in

  let a_job (k,v) () = 
    let w = Worker_manager.pop_worker wm in
    match ( w k v) with
    | Some l -> 
      ( Mutex.lock(work_mutex);
        if Hashtbl.mem work k then
          ( Hashtbl.remove work k;
            results := !results @ l;
            Worker_manager.push_worker wm w; )
          (* Another thread already beat to it *)
          ( Mutex.unlock(work_mutex);
            Worker_manager.push_worker wm w; ))
    | None -> () (* There was some error with the task *)

  let rec perform_mapping work_table =
    if (Hashtbl.length work_table = 0) then
      ( Worker_manager.clean_up_workers wm;
        Thread_pool.destroy p;
        print_map_results !results;
        !results )
      ( Hashtbl.iter (fun k v -> Thread_pool.add_work(a_job (k,v)) p) work_table;
        Thread.delay(0.1); (* prevent flooding of unnecessary work *)
        perform_mapping work_table
  perform_mapping work
let combine kv_pairs : (string * string list) list = 
  let combined = Hashtbl.create (List.length kv_pairs) in
  let add (key, value) = 
    Hashtbl.add combined key (if Hashtbl.mem combined key 
      then value::(Hashtbl.find combined key) 
      else [value])
  in List.iter add kv_pairs;
  let c_results = ref [] in
  Hashtbl.iter (fun k v -> c_results := (k,v)::!c_results) combined;
  print_combine_results !c_results;
let reduce kvs_pairs shared_data reduce_filename : (string * string list) list =
let map_reduce (app_name : string) (mapper : string) 
    (reducer : string) (filename : string) =
  let app_dir = Printf.sprintf "apps/%s/" app_name in
  let docs = load_documents filename in
  let titles = Hashtable.create 16 Hashtbl.hash in
  let add_document (d : document) : (string * string) =
    let id_s = string_of_int in
    Hashtable.add titles id_s d.title; (id_s, d.body) in
  let kv_pairs = add_document docs in
  let mapped = map kv_pairs "" (app_dir ^ mapper ^ ".ml") in
  let combined = combine mapped in
  let reduced = reduce combined  "" (app_dir ^ reducer ^ ".ml") in
  (titles, reduced)
