OCaml Map Reduce Project / controller / map_reduce.ml

open Util
open Worker_manager
open Hashtable

let map kv_pairs shared_data map_filename : (string * string) list =
  let wm = Worker_manager.initialize_mappers map_filename shared_data in
  let p = Thread_pool.create 200 in
  let work = Hashtbl.create (List.length kv_pairs) in
  let results = ref [] in  
  List.iter (fun (k,v) -> Hashtbl.add work k v);
  
  let work_mutex = Mutex.create() in
  let results_mutex = Mutex.create() in
  
  let a_job k v = 
    let w = Worker_manager.pop_worker wm in
    match (Worker_manager.map w k v) with
    | Some l -> 
      ( Mutex.lock(work_mutex);
        if Hashtbl.mem work k then
          ( Hashtbl.remove work k;
            Mutex.lock(results_mutex);
            results := !results @ [l];
            Mutex.unlock(results_mutex);
            Mutex.unlock(work_mutex);
            Worker_manager.push_worker wm w; )
        else 
          (* Another thread already beat to it *)
          ( Mutex.unlock(work_mutex);
            Worker_manager.push_worker wm w; ))
    | None -> () (* There was some error with the task *)
  in
  
  let rec perform_mapping work_table =
    if (Mutex.lock(work_mutex);
        Hashtbl.length work_table = 0
        Mutex.unlock(work_mutex;)) then
      ( Worker_manager.clean_up_workers wm;
        Thread_pool.destroy p;
        !results )
    else
      ( Hashtbl.iter (fun k v -> Thread_pool.add_work(a_job(k v)) p) work_table;
        Thread.delay(0.1); (* prevent flooding of unnecessary work *)
        perform_mapping work_table
      )
  in
  perform_mapping work
  
let combine kv_pairs : (string * string list) list = 
  let combined_list = ref [] in
  let table = Hashtbl.create 50 in
    let add (key, value) = 
      let new_value = if Hashtbl.mem table key then
        value :: (Hashtbl.find table key) else [v] in
      Hashtbl.add table key new_value 
    in List.iter add kv_pairs;
    Hashtbl.iter (fun k v -> combined_list := (k, v) :: !combined_list) table;
    !combined_list;
      
let reduce kvs_pairs shared_data reduce_filename : (string * string list) list =
  

let map_reduce (app_name : string) (mapper : string) 
    (reducer : string) (filename : string) =
  let app_dir = Printf.sprintf "apps/%s/" app_name in
  let docs = load_documents filename in
  let titles = Hashtable.create 16 Hashtbl.hash in
  let add_document (d : document) : (string * string) =
    let id_s = string_of_int d.id in
    Hashtable.add titles id_s d.title; (id_s, d.body) in
  let kv_pairs = List.map add_document docs in
  let mapped = map kv_pairs "" (app_dir ^ mapper ^ ".ml") in
  let combined = combine mapped in
  let reduced = reduce combined  "" (app_dir ^ reducer ^ ".ml") in
  (titles, reduced)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.