Source

parvel / parvel_net.ml

Full commit
module U = Unix;
module LU = Lwt_unix;

value listening_backlog = 5;
value handshake_buffer_size = 4096;


type config_action =
  { ca_listen : option U.sockaddr
  ; ca_connect : option U.sockaddr
  }
;

module Env_config
 :
  sig
    value action : config_action;
  end
 =
  struct

    value get_env_opt name =
      try Some (Sys.getenv name) with [Not_found -> None]
    ;

    value inet_addr_of_string host_txt =
      try U.inet_addr_of_string host_txt
      with
      [ Failure msg ->
          failwith "error parsing address %S: %s" host_txt msg
      ]
    ;

    value port_of_txt port_txt =
      try
        int_of_string port_txt
      with
      [ Failure msg ->
          failwith "error parsing port %S: %s" port_txt msg
      ]
    ;

    value inet_sockaddr_of_host_port host_txt port_txt =
      let ia = inet_addr_of_string host_txt in
      let p = port_of_txt port_txt in
      U.ADDR_INET ia p
    ;

    value env_addr varname =
      let g n = get_env_opt ("PARVEL__" ^ n) in
      match (g "HOST", g "PORT") with
      [ ( (host_txt_opt, port_txt_opt ) ->
          let ia =
            match host_txt_opt with
            [ None -> U.inet_addr_any
            | Some host_txt -> inet_addr_of_string host_txt
            ]
          in
          let p =
            match port_txt_opt with
            [ None -> default_listen_port
            | Some port_txt -> port_of_txt port_txt
            ]
          in
            `Master (U.ADDR_INET ia p)

      | ( host_txt_opt, port_txt_opt ) ->
          match (host_txt_opt, port_txt_opt) with
          [ (Some host_txt, Some port_txt) ->
             `Worker (inet_sockaddr_of_host_port host_txt port_txt)
          | _ ->
              failwith "for 'worker' mode you should specify \
                        both host and port in environment"
          ]

      | (Some n, _, _) ->
          failwith "bad role: %S" n

      ]
    ;

    value action
    : config_action
    =
      match role with
      [ `Master a -> `Listen a
      | `Worker a -> `Connect a
      ]
    ;

  end
;


(*
module NameService
 :
  sig
    type ns_req;
    value req : string -> ns_req;
    type ns_resp = list gpid;
    value server : server ns_req ns_resp;
  end
 =
  struct
    value add 
  end
;
*)


value host_key =
  sprintf "%s:%i" (U.gethostname ()) (U.getpid ())
;


type host_id = string;

type host_info =
  { hi_fd : mutable (option (U.file_descr))
  }
;

value net_nodes : Hashtbl.t host_id host_info = Hashtbl.create 17
;

value host_id_opt = ref None;

value get_host_id () =
  match host_id_opt with
  [ None -> failwith "host id is not defined yet"
  | Some h -> h
  ]
;


module I = Iteratees.Make(Parvel_IO)
;

open I.Ops
;


module Ds_it =
  Dumbstreaming_it.Make(I)
;


value enum_socket : enumeratee char 'a =
  I.enum_readchars
    ~buffer_size:handshake_buffer_size
    ~read_func:(fun buf pos len -> LU.recv buf pos len [])
;


value proc_connect client_socket =
  (enum_socket client_socket)
  (
   Ds_it.read
     ~get_piece_it :
       ( ~totalsize:int64 ->
         ~totalcount:int ->
         ~piecesize:int ->
         ~piecenumber:int ->
         I.iteratee char 'i
       )
      -> ~combine_it : (I.iteratee 'i 'a)
      -> I.iteratee char (option 'a)
  )
;


value run_listener_lwt bind_addr =
  let listening_socket = LU.socket U.PF_INET U.SOCK_STREAM 0 in
  let () = LU.bind listening_socket bind_addr in
  let () = LU.listen listening_socket listening_backlog in
  loop ()
  where rec loop () =
    LU.accept >>% fun (client_socket, _client_sockaddr) ->
    let () = Lwt.ignore_result (proc_connect client_socket) in
    loop ()
;


value run_listen bind_addr =
  ( host_id_opt := some & sprintf "%s-master" host_key
  ; Lwt.ignore_result (run_listener_lwt bind_addr)
  )
;


value run_connect server_addr =
  .
;


value () =
  match Config_env.action with
  [ `Listen a -> run_listen a
  | `Connect a -> run_connect a
  ]
;