Commits

Kelvin Jin committed c4012ec Merge

Comments (0)

Files changed (2)

 # con -- rebuild controller
 # clean -- remove all objects and executables
 
-UTIL_SOURCES = shared/hashtable.mli shared/hashtable.ml shared/plane.mli shared/plane.ml \
+# removed for testing - shared/hashtable.mli shared/hashtable.ml
+UTIL_SOURCES = shared/plane.mli shared/plane.ml \
 shared/util.mli shared/util.ml shared/simulations.mli shared/simulations.ml
 
 SHARED_SOURCES = shared/thread_pool.mli shared/thread_pool.ml \

worker_server/worker.ml

     else ());
     success
 
+let mappers = Hashtbl.create 100
+let reducers = Hashtbl.create 100
+let m_mutex = Mutex.create()
+let r_mutex = Mutex.create()
+
 let rec handle_request client =
   match Connection.input client with
     Some v ->
       begin
         match v with
-        | InitMapper (source, shared_data) -> 
-          failwith "It's been a long time, old one."
+        | InitMapper (source, shared_data) ->
+            let m_built = Program.build source in
+            begin
+                match m_built with
+                | (Some id, t) ->
+                  ( Program.write_shared_data id shared_data;
+                    Mutex.lock(m_mutex);
+                    Hashtbl.add mappers id t;
+                    Mutex.unlock(m_mutex);
+                    if (send_response client m_built) then handle_request client
+                    else failwith "Connection error!"
+                  )
+                | (None, err) ->
+                  ( if (send_response client (Mapper (None, err))) then
+                      handle_request client
+                    else
+                      failwith "Connection error!"                  
+                  )              
+            end
+              
         | InitReducer source -> 
-          failwith "Young master, I cannot aid one who opposes the Master!"
+            let r_built = Program.build source in
+            begin
+                match r_built with
+                | (Some id, t) ->
+                  ( Mutex.lock(r_mutex);
+                    Hashtbl.add reducers id t;
+                    Mutex.unlock(r_mutex);
+                    if (send_response client r_built) then handle_request client
+                    else failwith "Connection error!"
+                  )
+                | (None, err) ->
+                  ( if (send_response client (Reducer (None, err))) then
+                      handle_request client
+                    else
+                      failwith "Connection error!"                  
+                  )              
+            end
+            
         | MapRequest (id, k, v) -> 
-          failwith "You won't go unrewarded."
+            if (Hashtbl.mem mappers id) then 
+            ( begin
+                match Program.run id (k,v) with
+                | Some v ->
+                  ( if (send_response client (MapResults (id, v))) then
+                    handle_request client
+                  )
+                | None ->
+                  ( if (send_response client (RuntimeError (id, "error"))) then
+                    handle_request client
+                  )              
+              end
+            )            
+            else if (send_response client (InvalidWorker id)) then 
+              handle_request client
+                        
+          
         | ReduceRequest (id, k, v) -> 
-          failwith "Really? In that case, just tell me what you need."
-      end
+            if (Hashtbl.mem reducers id) then 
+            ( begin
+                match Program.run id (k,v) with
+                | Some v ->
+                  ( if (send_response client (ReduceResults (id, v))) then
+                    handle_request client
+                  )
+                | None ->
+                  ( if (send_response client (RuntimeError (id, "error"))) then
+                    handle_request client
+                  )              
+              end
+            )            
+            else if (send_response client (InvalidWorker id)) then 
+              handle_request client        
+    end
   | None ->
       Connection.close client;
-      print_endline "Connection lost while waiting for request."
-
+      print_endline "Connection lost while waiting for request."
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.