Source

OCaml Map Reduce Project / controller / map_reduce.ml

Full commit
open Util
open Worker_manager

let map kv_pairs shared_data map_filename : (string * string) list = 
  let wm = Worker_manager.initialize_mappers map_filename shared_data in
  let p = Thread_pool.create 100 in
  let work = Hashtbl.create (List.length kv_pairs) in
  let results = ref [] in  
  (* Add the pairs to work table *)
  List.iter (fun (k,v) -> Hashtbl.add work k v) kv_pairs;

  let results_mutex = Mutex.create() in
  let work_mutex = Mutex.create() in

  (* Handle one of the pairs - deals with one worker from start to finish *)
  let a_job (k,v) () = 
    let w = Worker_manager.pop_worker wm in
    match (Worker_manager.map w k v) with
    | Some l -> 
      ( Mutex.lock(work_mutex);
        if Hashtbl.mem work k then
          ( Hashtbl.remove work k;
            Mutex.lock(results_mutex);
            results := !results @ l;
            Mutex.unlock(results_mutex);
            Mutex.unlock(work_mutex);
            Worker_manager.push_worker wm w; )
        else 
          (* Another thread already beat to it *)
          ( Mutex.unlock(work_mutex);
            Worker_manager.push_worker wm w; ))
    | None -> () (* Don't pop it back on queue *)
  in

  (* Recursively runs the work jobs - returns the results when done *)
  let rec perform_mapping work_table =
    if (Hashtbl.length work_table = 0) then
      ( (* Clean up time *)
        Thread_pool.destroy p;
        Worker_manager.clean_up_workers wm;
        print_map_results !results;
        !results )
    else
      ( Hashtbl.iter (fun k v -> Thread_pool.add_work(a_job (k,v)) p) work_table;
        Thread.delay(0.10);
        perform_mapping work_table
      )
  in
  perform_mapping work
  
let combine kv_pairs : (string * string list) list = 
  let htbl = Hashtbl.create 16 in
  let add (k,v) = 
    if (Hashtbl.mem htbl k) then
      let ls = (Hashtbl.find htbl k) in 
      Hashtbl.replace htbl k (v::ls)
    else
      Hashtbl.add htbl k [v]
  in
  (* Run add for each pair to combine *)
  List.iter add kv_pairs;
  (* Fold over to create and print the combine list *)
  let results = Hashtbl.fold (fun k v acc -> (k, v) :: acc) htbl [] in
  print_combine_results results;
  results
  
(* Still not working - more or less same as map though *)
let reduce kvs_pairs shared_data reduce_filename : (string * string list) list =
  let wm = Worker_manager.initialize_reducers reduce_filename shared_data in
    let p = Thread_pool.create 100 in
    let work = Hashtbl.create (List.length kvs_pairs) in
    let results = ref [] in  
    List.iter (fun (k,v) -> Hashtbl.add work k v) kvs_pairs;

    let work_mutex = Mutex.create() in
    let results_mutex = Mutex.create() in
    (* One job to reduce *)
    let a_job (k,v) () = 
      let w = Worker_manager.pop_worker wm in
      match (Worker_manager.reduce w k v) with
      | Some l -> 
        ( Mutex.lock(work_mutex);
          if Hashtbl.mem work k then
            ( Hashtbl.remove work k;
              Mutex.lock(results_mutex);
              results := !results @ [(k,l)];
              Mutex.unlock(results_mutex);
              Mutex.unlock(work_mutex);
              Worker_manager.push_worker wm w; )
          else 
            (* Another thread already beat to it *)
            ( Mutex.unlock(work_mutex);
              Worker_manager.push_worker wm w; ))
      | None -> () (* There was some error with the worker *)
    in

    (* Recursively handles sending off work and returns results *)
    let rec perform_reduce work_table =
      if (Hashtbl.length work_table = 0) then
        ( (* Clean up time*)
          Thread_pool.destroy p;
          Worker_manager.clean_up_workers wm;
          print_reduce_results !results;
          !results )
      else
        ( Hashtbl.iter (fun k v -> Thread_pool.add_work(a_job (k,v)) p) work_table;
          Thread.delay(0.1); (* prevent flooding of unnecessary work *)
          perform_reduce work_table
        )
    in
    perform_reduce work
  
let map_reduce (app_name : string) (mapper : string) 
    (reducer : string) (filename : string) =
  let app_dir = Printf.sprintf "apps/%s/" app_name in
  let docs = load_documents filename in
  let titles = Hashtable.create 16 Hashtbl.hash in
  let add_document (d : document) : (string * string) =
    let id_s = string_of_int d.id in
    Hashtable.add titles id_s d.title; (id_s, d.body) in
  let kv_pairs = List.map add_document docs in
  let mapped = map kv_pairs "" (app_dir ^ mapper ^ ".ml") in
  let combined = combine mapped in
  let reduced = reduce combined  "" (app_dir ^ reducer ^ ".ml") in
  (titles, reduced)