OCaml Map Reduce Project / worker_server / worker.ml

open Protocol

let send_response client response =
  let success = Connection.output client response in
    (if not success then
      (Connection.close client;
       print_endline "Connection lost before response could be sent.")
    else ());
    success

(* Tables for storing active workers *)
let mappers = Hashtbl.create 100
let reducers = Hashtbl.create 100
let m_mutex = Mutex.create()
let r_mutex = Mutex.create()

(* Must handle both inits and both requests *)
let rec handle_request client =
  match Connection.input client with
    Some v ->
      begin
        match v with
        | InitMapper (source, shared_data) ->
            let m_built = Program.build source in
            begin
                match m_built with
                | (Some id, t) ->
                  ( Program.write_shared_data id shared_data;
                    Mutex.lock(m_mutex);
                    Hashtbl.add mappers id t;
                    Mutex.unlock(m_mutex);
                    if (send_response client m_built) then handle_request client
                    else failwith "Connection error!"
                  )
                | (None, err) ->
                  ( if (send_response client (Mapper (None, "error error!"))) then
                      handle_request client
                    else
                      failwith "Connection error!"                  
                  )              
            end
              
        | InitReducer source -> 
            let r_built = Program.build source in
            begin
                match r_built with
                | (Some id, t) ->
                  ( Mutex.lock(r_mutex);
                    Hashtbl.add reducers id t;
                    Mutex.unlock(r_mutex);
                    if (send_response client r_built) then handle_request client
                    else failwith "Connection error!"
                  )
                | (None, err) ->
                  ( if (send_response client (Reducer (None, "error error!"))) then
                      handle_request client
                    else
                      failwith "Connection error!"                  
                  )              
            end
            
        | MapRequest (id, k, v) -> 
            let m_result =
            (* Check valid worker first *)
              if (Hashtbl.mem mappers id) then
                begin
                  let prog_result = Program.run id (k,v) in
                  match prog_result with
                    | None -> RuntimeError (id, "error")
                    | Some v -> MapResults (id, v)
                end
                (* else wasn't valid! *)
              else InvalidWorker id in
              
              if (send_response client m_result) then handle_request client
              else (); (* Nothing left to do *)
          
        | ReduceRequest (id, k, v) -> 
            let r_result =
            (* Check valid worker first *)
              if (Hashtbl.mem reducers id) then
                begin
                  let prog_result = Program.run id (k,v) in
                  match prog_result with
                    | None -> (RuntimeError (id, "error");)
                    | Some x -> (ReduceResults (id, x))
                end
                (* else wasn't valid worker *)
              else InvalidWorker id in
              
              if (send_response client r_result) then handle_request client
              else (); (* Nothing left to do *)  
    end
  | None ->
      Connection.close client;
      print_endline "Connection lost while waiting for request."
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.