Source

OCaml Map Reduce Project / controller / map_reduce.ml

Full commit
open Util
open Worker_manager

let map kv_pairs shared_data map_filename : (string * string) list = 
  let manager = Worker_manager.initialize_mappers map_filename shared_data in
  let pool = Thread_pool.create 200 in
  let mutex = Mutex.create() in
  let results = ref [] in  
    let rec helper (key, value) =
      let do_work() =
        let worker = Worker_manager.pop_worker manager in
          match Worker_manager.map worker key value with
            | None -> helper (key, value)
            | Some l -> ( Mutex.lock mutex; 
              results := !results @ l;
              Mutex.unlock mutex;
              Worker_manager.push_worker manager worker; )
      in Thread_pool.add_work do_work pool;
    in List.iter helper kv_pairs;
    Worker_manager.clean_up_workers manager;
    Thread_pool.destroy pool;
    !results;  
  
let combine kv_pairs : (string * string list) list = 
  let combined_list = ref [] in
  let table = Hashtbl.create 50 in
    let add (key, value) = 
      let new_value = if Hashtbl.mem table key then
        value :: (Hashtbl.find table key) else [v] in
      Hashtbl.add table key new_value 
    in List.iter add kv_pairs;
    Hashtbl.iter (fun k v -> combined_list := (k, v) :: !combined_list) table;
    !combined_list;
      
let reduce kvs_pairs shared_data reduce_filename : (string * string list) list =
  let manager = Worker_manager.initialize_reducers reduce_filename in
  let pool = Thread_pool.create 200 in
  let mutex = Mutex.create() in
  let results = ref [] in
  let rec helper (key, lst) =
    let do_work() =
      let worker = Worker_manager.pop_worker manager in
        match Worker_manager.reduce worker key lst with
          | None -> helper (key, lst)
          | Some l -> ( Mutex.lock mutex;
              results := !results @ l;
              Worker_manager.push_worker manager worker; )
      in Thread_pool.add_work do_work pool;
    in List.iter helper kvs_pairs;
    Worker_manager.clean_up_workers manager;
    Thread_pool.destroy pool;
    !results;

let map_reduce (app_name : string) (mapper : string) 
    (reducer : string) (filename : string) =
  let app_dir = Printf.sprintf "apps/%s/" app_name in
  let docs = load_documents filename in
  let titles = Hashtable.create 16 Hashtbl.hash in
  let add_document (d : document) : (string * string) =
    let id_s = string_of_int d.id in
    Hashtable.add titles id_s d.title; (id_s, d.body) in
  let kv_pairs = List.map add_document docs in
  let mapped = map kv_pairs "" (app_dir ^ mapper ^ ".ml") in
  let combined = combine mapped in
  let reduced = reduce combined  "" (app_dir ^ reducer ^ ".ml") in
  (titles, reduced)