Source

parvel / topoman.ml

Full commit
(*
PARVEL__LISTEN
PARVEL__CONNECT
  {ip/host}:port, break by ':'
  (maybe http spec for uri?)
*)



open Cd_All; open Cdt;


open Unix
;

value listen_interface = inet_addr_loopback
;

value listen_port = 22528
;

value listen_addr : sockaddr =
  ADDR_INET listen_interface listen_port
;


module I = Iteratees.Make(IO_Lwt);
module IO = I.It_IO;
open I.Ops;

module Ds_it = Dumbstreaming_it.Make(I);
module Ds_io = Dumbstreaming_io.Make(IO_Lwt);


value proc_request rq =
  ["echo"; String.concat ";" & List.map String.escaped rq]
;


value rq_max_totalsize = 10240L
;

exception Srv of string
;

value get_request =
    (
       Ds_it.read
         ~get_piece_it:
            (fun ~totalsize
                 ~totalcount
                 ~piecesize
                 ~piecenumber ->
               let () = ( ignore totalcount ; ignore piecesize
                        ; ignore piecenumber )
               in
               if totalsize > rq_max_totalsize
               then
                 let () = Printf.printf "totalsize=%Li rq_max_totalsize=%Li\n%!"
                   totalsize rq_max_totalsize in
                 I.throw_err & Srv "request too long"
               else
                 I.gather_to_string
            )
         ~combine_it:
            I.stream2list
    )
;


value send_response outch rs =
  Ds_io.write outch rs >>% fun () -> IO_Lwt.flush outch
;

value send_error outch err =
  let rec str err =
    match err with
    [ I.Iteratees_err_msg e -> str e
    | e -> Printexc.to_string e
    ]
  in
  Ds_io.write outch
    [ "error"
    ; str err
    ]
;


value proc_conn (inch, outch) : unit = Lwt_main.run &
  (I.enum_fd inch &
        inner ()
        where rec inner () =
          get_request >>= fun
          [ None -> I.return ()
          | Some rq ->
              let rs = proc_request rq in
              I.lift (send_response outch rs) >>= fun () ->
              inner ()
          ]
  )
  >>% fun i ->
  IO.catch
    (fun () -> I.run i)
    (fun e -> send_error outch e)
  >>% fun () ->
  IO_Lwt.close_out outch
;


value (server : IO_Lwt.server) = IO_Lwt.establish_server
  ~buffer_size:4096
  ~backlog:5
  listen_addr
  proc_conn
;

value () = Lwt_main.run (IO_Lwt.wait_server server)
;