OCaml Map Reduce Project / controller / map_reduce.ml

open Util
open Worker_manager

let map kv_pairs shared_data map_filename : (string * string) list = 
  let wm = Worker_manager.initialize_mappers map_filename shared_data in
  let p = Thread_pool.create 200 in
  let work = Hashtbl.create (List.length kv_pairs) in
  let results = ref [] in  
  List.iter (fun (k,v) -> Hashtbl.add work k v) kv_pairs;

  let work_mutex = Mutex.create() in
  let results_mutex = Mutex.create() in

  let a_job (k,v) () = 
    let w = Worker_manager.pop_worker wm in
    match (Worker_manager.map w k v) with
    | Some l -> 
      ( Mutex.lock(work_mutex);
        if Hashtbl.mem work k then
          ( Hashtbl.remove work k;
            Mutex.lock(results_mutex);
            results := !results @ l;
            Mutex.unlock(results_mutex);
            Mutex.unlock(work_mutex);
            Worker_manager.push_worker wm w; )
        else 
          (* Another thread already beat to it *)
          ( Mutex.unlock(work_mutex);
            Worker_manager.push_worker wm w; ))
    | None -> ()
  in

  let rec perform_mapping work_table =
    if (Hashtbl.length work_table = 0) then
      ( Worker_manager.clean_up_workers wm;
        Thread_pool.destroy p;
        print_map_results !results;
        !results )
    else
      ( Hashtbl.iter (fun k v -> Thread_pool.add_work(a_job (k,v)) p) work_table;
        Thread.delay(0.10);
        perform_mapping work_table
      )
  in
  perform_mapping work
  
let combine kv_pairs : (string * string list) list = 
  let htbl = Hashtbl.create (List.length kv_pairs) in
  let add (k,v) = 
    if (Hashtbl.mem htbl k) then
      let ls = (Hashtbl.find htbl k) in 
      (Hashtbl.remove htbl k); (Hashtbl.add htbl k (v::ls));
    else
      (Hashtbl.add htbl k [v]);
  in
  List.iter (fun (k,v) -> add (k,v)) kv_pairs;
  
  let results = Hashtbl.fold (fun k v acc -> (k, v) :: acc) htbl [] in
  print_combine_results results;
  results
  
let reduce kvs_pairs shared_data reduce_filename : (string * string list) list =
  (* let wm = Worker_manager.initialize_reducers reduce_filename shared_data in
    let p = Thread_pool.create 200 in
    let work = Hashtbl.create (List.length kvs_pairs) in
    let results = ref [] in  
    List.iter (fun (k,v) -> Hashtbl.add work k v) kvs_pairs;

    let work_mutex = Mutex.create() in
    let results_mutex = Mutex.create() in

    let a_job (k,v) () = 
      let w = Worker_manager.pop_worker wm in
      match (Worker_manager.reduce w k v) with
      | Some l -> 
        ( Mutex.lock(work_mutex);
          if Hashtbl.mem work k then
            ( Hashtbl.remove work k;
              Mutex.lock(results_mutex);
              results := !results @ [(k,l)];
              Mutex.unlock(results_mutex);
              Mutex.unlock(work_mutex);
              Worker_manager.push_worker wm w; )
          else 
            (* Another thread already beat to it *)
            ( Mutex.unlock(work_mutex);
              Worker_manager.push_worker wm w; ))
      | None -> () (* There was some error with the task *)
    in

    let rec perform_reduce work_table =
      if (Hashtbl.length work_table = 0) then
        ( Worker_manager.clean_up_workers wm;
          Thread_pool.destroy p;
          print_reduce_results !results;
          !results )
      else
        ( Hashtbl.iter (fun k v -> Thread_pool.add_work(a_job (k,v)) p) work_table;
          Thread.delay(0.1); (* prevent flooding of unnecessary work *)
          perform_reduce work_table
        )
    in
    perform_reduce work *)
    failwith "reduce not here"
  
let map_reduce (app_name : string) (mapper : string) 
    (reducer : string) (filename : string) =
  let app_dir = Printf.sprintf "apps/%s/" app_name in
  let docs = load_documents filename in
  let titles = Hashtable.create 16 Hashtbl.hash in
  let add_document (d : document) : (string * string) =
    let id_s = string_of_int d.id in
    Hashtable.add titles id_s d.title; (id_s, d.body) in
  let kv_pairs = List.map add_document docs in
  let mapped = map kv_pairs "" (app_dir ^ mapper ^ ".ml") in
  let combined = combine mapped in
  let reduced = reduce combined  "" (app_dir ^ reducer ^ ".ml") in
  (titles, reduced)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.