Commits

Anonymous committed 173bff5 Merge

merge

Comments (0)

Files changed (3)

 ~$
 ^_build/
 \.(byte|native)$
+^setup\.log$
                ]
              ) :> ti _)
              in
-             ( ti_add_ser_deser t
+             ( ti_add_marshal_struc ~ser:True ~deser:True t ()
              ; t
              )
         ;
                (fun () ->
                   match r.val with
                   [ None -> failwith "dest: not filled"
-                  | Some a -> ti_ser ti_cr_a a
+                  | Some a -> ti_ser_marshal ti_cr_a a
                   ]
                )
         ;
 
              initializer
              let self = (self :> ti _) in
-             ( uti_add_meth self "ser" &
+             ( uti_add_meth self "ser.marshal" &
                  ubox
                    (ti_abs self ti_string)
                    (fun d ->
                             self#type_name
                       ]
                    )
-             ; uti_add_meth self "deser" &
+             ; uti_add_meth self "deser.marshal" &
                  ubox
                    (ti_abs ti_string self)
                    (fun s ->
                 dest_remote_counter.val
                 ti_a#type_name
                 (fun s ->
-                   r.val := Some (Cd_Ser.ti_deser ti_cr_a s)
+                   r.val := Some (Cd_Ser.ti_deser_marshal ti_cr_a s)
                 )
                 (dest_ubox_get ti_cr_a r)
           }
              let rec get_dests_td td u =
                match td with
                [ Simple _ -> []
-               | Dispatch_method disp -> get_dests_td (disp "ser") u
+               | Dispatch_method disp -> get_dests_td (disp "ser.marshal") u
                | Sum_type destr _constr ->
                    let (_vname, disp) = destr u in
-                   let uarr = disp "ser" in
+                   let uarr = disp "ser.marshal" in
                    get_dests_arr uarr
                | Record_type destr _utis _fields _constr ->
                    let uarrfn = destr u in
                 ti_string
                 ()
         ;
-        value () = ti_add_ser_deser (ti_resp_elem :> ti _)
+        value () = ti_add_marshal_struc
+          ~ser:True ~deser:True (ti_resp_elem :> ti _) ()
         ;
 
         value ti_proto_call_resp : #ti proto_call_resp =
         ;
         value ti_proto_call_resp = (ti_proto_call_resp :> ti _)
         ;
-        value () = ti_add_ser_deser ti_proto_call_resp
+        value () = ti_add_marshal_struc ~ser:True ~deser:True
+          ti_proto_call_resp
+          ()
         ;
 
 
+module U = Unix;
+module LU = Lwt_unix;
+
+value listening_backlog = 5;
+value handshake_buffer_size = 4096;
+
+
 type config_action =
-  [= `Listen of Unix.sockaddr
-  | `Connect of Unix.sockaddr
+  [= `Listen of U.sockaddr
+  |  `Connect of U.sockaddr
   ]
 ;
 
+value get_env_opt name =
+  try Some (Sys.getenv name) with [Not_found -> None]
+;
+
 module Env_config
  :
   sig
     value default_listen_port = 12343
     ;
 
-    value get_env_opt name =
-      try Some (Sys.getenv name) with [Not_found -> None]
-    ;
-
     value failwith fmt = Printf.ksprintf failwith fmt
     ;
 
     value inet_addr_of_string host_txt =
-      try Unix.inet_addr_of_string host_txt
+      try U.inet_addr_of_string host_txt
       with
       [ Failure msg ->
           failwith "error parsing address %S: %s" host_txt msg
     value inet_sockaddr_of_host_port host_txt port_txt =
       let ia = inet_addr_of_string host_txt in
       let p = port_of_txt port_txt in
-      Unix.ADDR_INET ia p
+      U.ADDR_INET ia p
     ;
 
     value role
-    : [= `Master of Unix.sockaddr | `Worker of Unix.sockaddr ]
+    : [= `Master of U.sockaddr | `Worker of U.sockaddr ]
     =
       let g n = get_env_opt ("PARVEL__" ^ n) in
       match (g "ROLE", g "HOST", g "PORT") with
       [ ( (None | Some "master"), host_txt_opt, port_txt_opt ) ->
           let ia =
             match host_txt_opt with
-            [ None -> Unix.inet_addr_any
+            [ None -> U.inet_addr_any
             | Some host_txt -> inet_addr_of_string host_txt
             ]
           in
             | Some port_txt -> port_of_txt port_txt
             ]
           in
-            `Master (Unix.ADDR_INET ia p)
+            `Master (U.ADDR_INET ia p)
 
       | ( Some "worker", host_txt_opt, port_txt_opt ) ->
           match (host_txt_opt, port_txt_opt) with
     value add 
   end
 ;
+*)
 
-module Net
- =
-  struct
 
-    value run_listen bind_addr =
-      .
-    ;
-
-    value run_connect server_addr =
-      .
-    ;
-
-    value run () =
-      match Config_env.action with
-      [ `Listen a -> run_listen a
-      | `Connect a -> run_connect a
-      ]
-    ;
-
-  end
+value host_key =
+  sprintf "%s:%i" (U.gethostname ()) (U.getpid ())
 ;
 
-value () = Net.run ()
+
+type host_id = string;
+
+type host_info =
+  { hi_fd : mutable (option (U.file_descr))
+  }
 ;
-*)
+
+value net_nodes : Hashtbl.t host_id host_info = Hashtbl.create 17
+;
+
+value host_id_opt = ref None;
+
+value get_host_id () =
+  match host_id_opt with
+  [ None -> failwith "host id is not defined yet"
+  | Some h -> h
+  ]
+;
+
+
+module I = Iteratees.Make(Parvel_IO)
+;
+
+open I.Ops
+;
+
+
+module Ds_it =
+  Dumbstreaming_it.Make(I)
+;
+
+
+value enum_socket : enumeratee char 'a =
+  I.enum_readchars
+    ~buffer_size:handshake_buffer_size
+    ~read_func:(fun buf pos len -> LU.recv buf pos len [])
+;
+
+
+value proc_connect client_socket =
+  (enum_socket client_socket)
+  (
+   Ds_it.read
+     ~get_piece_it :
+       ( ~totalsize:int64 ->
+         ~totalcount:int ->
+         ~piecesize:int ->
+         ~piecenumber:int ->
+         I.iteratee char 'i
+       )
+      -> ~combine_it : (I.iteratee 'i 'a)
+      -> I.iteratee char (option 'a)
+  )
+;
+
+
+value run_listener_lwt bind_addr =
+  let listening_socket = LU.socket U.PF_INET U.SOCK_STREAM 0 in
+  let () = LU.bind listening_socket bind_addr in
+  let () = LU.listen listening_socket listening_backlog in
+  loop ()
+  where rec loop () =
+    LU.accept >>% fun (client_socket, _client_sockaddr) ->
+    let () = Lwt.ignore_result (proc_connect client_socket) in
+    loop ()
+;
+
+
+value run_listen bind_addr =
+  ( host_id_opt := some & sprintf "%s-master" host_key
+  ; Lwt.ignore_result (run_listener_lwt bind_addr)
+  )
+;
+
+
+value run_connect server_addr =
+  .
+;
+
+
+value () =
+  match Config_env.action with
+  [ `Listen a -> run_listen a
+  | `Connect a -> run_connect a
+  ]
+;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.