Commits

Anonymous committed defe4d7

refactoring, small optimizations, comments on Amall_http_service..listener_run about exceptions

Comments (0)

Files changed (3)

src/amall_http_server.ml

       IO.server
     ;
 
+    value get_active_connections : unit -> int
+    ;
+
   end
 =
 struct
 
 value dbg fmt = Printf.ksprintf (Printf.printf "hs: %s\n%!") fmt;
 
+value rec string_of_exn e =
+  match e with
+  [ I.Iteratees_err_msg e -> "it/" ^ string_of_exn e
+  | Ws_service_error e -> "wssrv/" ^ string_of_exn e
+  | _ -> Printexc.to_string e
+  ]
+;
+
+value io_unit = IO.return ()
+;
+
+value io_http_server_func
+  (userfunc : request -> (segpath * H.service_desc))
+  (inch, outch)
+ :
+  IO.m unit
+ =
+  try
+    (let do_close = lazy begin
+       let () = dbg "http_server_func: close 0" in
+       IO.catch
+         (fun () ->
+            let () = dbg "http_server_func: close 1" in
+            IO.close_in inch >>% fun () ->
+            let () = dbg "http_server_func: close 2" in
+            IO.close_out outch >>% fun () ->
+            let () = dbg "http_server_func: close 3" in
+            io_unit
+         )
+         (fun e ->
+           let () = dbg "http_server_func: close error: %s" (string_of_exn e)
+           in
+           io_unit
+         )
+     end
+     in
+     IO.catch
+       (fun () ->
+          let it_req = H.it_http userfunc in
+          I.enumpart_readchars
+            ~buffer_size:4096
+            ~read_func:IO.read_into
+            inch
+            I.Sl.empty
+            it_req
+          >>% loop True
+          where rec loop can_switch_to_ws (it, sl_l, opt_enp) =
+            let () = dbg "hs: entered loop" in
+            match opt_enp with
+            [ I.EP_None -> Lazy.force do_close
+            | I.EP_Some cont ->
+                let () = dbg "got cont" in
+                match it with
+                [ I.IE_cont (Some e) _ ->
+                    let () = dbg "cont: error %S" (Printexc.to_string e) in
+                    Lazy.force do_close
+                | I.IE_cont None _ ->
+                    let () = dbg "cont: cont" in
+                    failwith "cont-cont1111"
+                | I.IE_done http_or_ws_resp ->
+                    let () = dbg "hsf: got req/resp" in
+                    match http_or_ws_resp with
+                    [ (request, `Http it) ->
+                        I.run it >>% fun resp ->
+                        let is_head = (request.rq_method = `HEAD) in
+                        H.output_response ~is_head outch resp >>% fun () ->
+                        let need_to_close =
+                          (List.mem `Close request.rq_headers.connection) ||
+                          (List.exists
+                             (fun (hk, hv) ->
+                                hk = "Connection" && hv = "close")
+                             resp.rs_headers.rs_all
+                          )
+                        in
+                          if need_to_close
+                          then
+                            Lazy.force do_close
+                          else
+                            IO.flush outch >>% fun () ->
+                            let sl = Lazy.force sl_l in
+                            cont.I.enumpart_poly sl it_req >>% loop False
+ 
+                    | (request, `Ws handshake ws_service segpath) ->
+                        if can_switch_to_ws
+                        then
+                          H.output_response ~is_head:False outch
+                            (continue_ws_resp handshake)
+                          >>% fun () ->
+                          let ws_out_socket = H.Ws.of_out_channel outch in
+                          let worker = Partapp3.apply
+                            ws_service segpath request ws_out_socket
+                          in
+                          IO.catch
+                            (fun () ->
+                               do_websocket
+                                 ws_out_socket
+                                 sl_l
+                                 cont.I.enumpart_poly
+                                 worker
+                               >>% fun () ->
+                               IO.return & `Ok ()
+                            )
+                            (fun e -> IO.return & `Error e)
+                          >>% fun res ->
+                          match res with
+                          [ `Ok () -> io_unit
+                          | `Error e -> IO.error e
+                          ]
+                        else
+                          H.output_response
+                            ~is_head:False outch can't_switch_ws_resp
+                          >>% fun () ->
+                          Lazy.force do_close
+                    ]  (* match http_or_ws_resp *)
+                ]  (* match it *)
+            ]  (* match opt_enp *)
+       )
+       (fun e ->
+          ( Printf.eprintf "amall http: exception: %s\n%!" &
+              string_of_exn e
+          ; Lazy.force do_close
+          )
+       )
+    )
+  with
+  [ e ->
+      ( Printf.eprintf "amall http: uncaught exception: %s\n%!" &
+          string_of_exn e
+      ; io_unit
+      )
+  ]
+;
+
+
+
+value connections_active = ref 0
+;
+
+value get_active_connections () = connections_active.val
+;
 
 value http_server_func
- (userfunc : request -> (segpath * H.service_desc))
- (inch, outch)
-: unit
-=
-IO.run_and_ignore_result (
-  let string_of_exn e =
-            (loop e
-             where rec loop e =
-               match e with
-               [ I.Iteratees_err_msg e -> "it/" ^ loop e
-               | Ws_service_error e -> "wssrv/" ^ loop e
-               | _ -> Printexc.to_string e
-               ]
-            )
-  in
-try
-   (let closed = ref False in
-    let do_close () =
-      let () = dbg "http_server_func: close 0" in
-      if closed.val
-      then
-        IO.return ()
-      else
-        IO.catch
-          (fun () ->
-             let () = closed.val := True in
-             let () = dbg "http_server_func: close 1" in
-             IO.close_in inch >>% fun () ->
-             let () = dbg "http_server_func: close 2" in
-             IO.close_out outch >>% fun () ->
-             let () = dbg "http_server_func: close 3" in
-             IO.return ()
-          )
-          (fun e ->
-            let () = dbg "http_server_func: close error: %s"
-              (Printexc.to_string e)
-            in
-              IO.return ()
-          )
-    in
-    IO.catch
-      (fun () ->
-         let it_req = H.it_http userfunc in
-         I.enumpart_readchars
-           ~buffer_size:4096
-           ~read_func:IO.read_into
-           inch
-           I.Sl.empty
-           it_req
-         >>% loop True
-         where rec loop can_switch_to_ws (it, sl_l, opt_enp) =
-           let () = dbg "hs: entered loop" in
-           match opt_enp with
-           [ I.EP_None -> do_close ()
-           | I.EP_Some cont ->
-               let () = dbg "got cont" in
-
-               match it with
-               [ I.IE_cont (Some e) _ ->
-                   let () = dbg "cont: error %S" (Printexc.to_string e) in
-                   do_close ()
-               | I.IE_cont None _ ->
-                   let () = dbg "cont: cont" in
-                   failwith "cont-cont1111"
-               | I.IE_done http_or_ws_resp ->
-                   let () = dbg "hsf: got req/resp" in
-                   match http_or_ws_resp with
-                   [ (request, `Http it) ->
-                       I.run it >>% fun resp ->
-                       let is_head = (request.rq_method = `HEAD) in
-                       H.output_response ~is_head outch resp >>% fun () ->
-
-                       let need_to_close =
-                         (List.mem `Close request.rq_headers.connection) ||
-                         (List.exists
-                            (fun (hk, hv) -> hk = "Connection" && hv = "close")
-                            resp.rs_headers.rs_all
-                         )
-                       in
-                         if need_to_close
-                         then
-                           do_close ()
-                         else
-                           IO.flush outch >>% fun () ->
-                           let sl = Lazy.force sl_l in
-                           cont.I.enumpart_poly sl it_req >>% loop False
-
-                   | (request, `Ws handshake ws_service segpath) ->
-                       if can_switch_to_ws
-                       then
-                         H.output_response ~is_head:False outch
-                           (continue_ws_resp handshake)
-                         >>% fun () ->
-                         let ws_out_socket = H.Ws.of_out_channel outch in
-                         let worker = Partapp3.apply
-                           ws_service segpath request ws_out_socket
-                         in
-                         IO.catch
-                           (fun () ->
-                              do_websocket
-                                ws_out_socket
-                                sl_l
-                                cont.I.enumpart_poly
-                                worker
-                              >>% fun () ->
-                              IO.return & `Ok ()
-                           )
-                           (fun e -> IO.return & `Error e)
-                         >>% fun res ->
-                         match res with
-                         [ `Ok () -> IO.return ()
-                         | `Error e -> IO.error e
-                         ]
-                       else
-                         H.output_response
-                           ~is_head:False outch can't_switch_ws_resp
-                         >>% fun () ->
-                         do_close ()
-                   ]
-               ]
-           ]
-      )
-      (fun e ->
-        ( Printf.eprintf "amall http: exception: %s\n%!" &
-            string_of_exn e
-        ; do_close ()
-        )
-      )
-   )
-with [e -> ( Printf.eprintf "amall http: uncaught exception: %s\n%!" &
-               string_of_exn e
-           ; IO.return ()
-           )
-     ]
-)
+  (userfunc : request -> (segpath * H.service_desc))
+  inch_outch
+ :
+  unit
+ =
+  IO.run_and_ignore_result (
+    let () = incr connections_active in
+    ( io_http_server_func userfunc inch_outch >>% fun () ->
+      ( decr connections_active; io_unit )
+    )
+  )
 ;
 
 

src/amall_http_service.ml

     value io_listener_run : listener -> IO.server
     ;
 
-    (* really runs listening/accepting *)
+    (* really runs listening/accepting.
+       returns [()] on successful shutdown via [listener_stop].
+       raises exception on fatal errors in server loop, for example,
+       when [accept listening_socket] fails.
+     *)
 
     value listener_run : listener -> unit
     ;

tests/test_http_service.ml

 
 open Printf;
 
+value open_file () =
+  try (ignore (open_in_bin "README"); True) with [ Sys_error _ -> False ]
+;
+
 value rec open_files n =
-  if (try (ignore (open_in_bin "README"); True) with [ Sys_error _ -> False ])
+  if open_file ()
   then open_files (n + 1)
   else n
 ;
 
+open I.Ops;
+
 value my_func segpath rq =
-  (* let _files = open_files 0 in *)
+(*
+  I.lift (Lwt_unix.sleep 0.5) >>= fun () ->
+  let _files = open_file () in
+*)
   let txt = sprintf "path: [%s]\nparams string: %s\nparams parsed: %s\n"
     (String.concat " ; " &
      List.map (sprintf "%S") &
 ;
 
 value () =
-  try S.listener_run my_listener
-  with [ e -> failwith "listener_run exn: %s" (Printexc.to_string e) ]
+  try
+    S.listener_run my_listener
+  with
+  [ e ->
+      let conns = S.HS.get_active_connections () in
+      ( Printf.eprintf "listener_run exn: %s, active connections = %i\n%!"
+          (Printexc.to_string e)
+          conns
+      ; if conns = 0
+        then
+          Printf.eprintf "no active connections, exitting.\n%!"
+        else
+          ignore (IO.runIO (loop ()))
+          where rec loop () =
+            let conns = S.HS.get_active_connections () in
+            if conns = 0
+            then
+              ( Printf.eprintf "done, exitting.\n%!"; IO.return () )
+            else
+              ( Printf.eprintf "waiting for %i connection(s) to close...\n%!"
+                  conns
+              ; I.Ops.(Lwt_unix.sleep 0.1 >>% loop)
+              )
+      )
+  ]
 ;