1. Dmitry Grebeniuk
  2. amall

Commits

Dmitry Grebeniuk  committed 41af39d

on websockets closing

  • Participants
  • Parent commits 8e5fa4c
  • Branches default

Comments (0)

Files changed (3)

File src/amall_http_server.ml

View file
  • Ignore whitespace
   (ws_worker : Ws.websocket_service_func_worker_gen unit)
   (enumpart_state : Ws.enumpart_state Byte.t)
  :
-  IO.m unit
+  IO.m [= `Socket_closed | `Socket_not_closed ]
  =
   let mrc = Ws.message_reading_context
     enumpart_state
   in
   loop ()
   where rec loop () =
-    if Ws.is_finished mrc
-    then
-      IO.return ()
-    else
-      Ws.websocket_message mrc ws_worker >>% loop
+    match (Ws.is_finished mrc, Ws.is_close_sent outsock) with
+    [ (True, False) -> IO.return `Socket_not_closed
+    | (False, False) -> Ws.websocket_message mrc ws_worker >>% loop
+    | (_, True) -> IO.return `Socket_closed
+    ]
 ;
 
 
  =
   try
     (let do_close = lazy begin
-       let () = if ad then hsdbg "http_server_func: before close" else () in
        IO.catch
          (fun () ->
+            let () = if ad then hsdbg "http_server_func: before close"
+              else () in
             IO.close_fd fd >>% fun () ->
             let () =
               if ad then hsdbg "http_server_func: after close" else () in
                                  ws_out_socket
                                  worker
                                  (sl_l, cont)
-                               >>% fun () ->
-                               IO.return & `Ok ()
+                               >>% fun socket_closed ->
+                               let r = IO.return & `Ok () in
+                               match socket_closed with
+                               [ `Socket_closed -> r
+                               | `Socket_not_closed ->
+                                   (Lazy.force do_close) >>% fun () -> r
+                               ]
                             )
                             (fun e ->
                                let () = if ad then amdbg

File src/amall_types.ml

View file
  • Ignore whitespace
     value write_fd_from : fd -> string -> int -> int -> m int;
     value close_fd : fd -> m unit;
 
+    value shutdown : fd -> Unix.shutdown_command -> m unit;
+    value flush_socket : fd -> m unit;
+    value close_socket : fd -> m unit;
 
     type server;
 

File src/websocket.ml

View file
  • Ignore whitespace
 
     open I.Ops;
 
+    value rec string_of_exn = fun
+      [ I.Iteratees_err_msg e -> "it/" ^ string_of_exn e
+      | e -> Printexc.to_string e
+      ]
+    ;
+
     (* todo: to iteratees: *)
     type enumpart_state 'el = (Lazy.t (I.sl 'el) * I.enumpart_poly 'el)
     ;
       not (c < 1000 || c >= 5000 || c = 1005 || c = 1006 || c = 1015)
     ;
 
+    (* not closes socket, but shuts it down on send.
+       Actual closing is: for server in Amall_http_server, for client
+       in Websocket.Client.close.
+     *)
     value close sock opt_code_data =
       if sock.close_sent
       then
           ]
         in
         send sock (`Ctrl 0x8) body >>% fun () ->
-        (* TODO: IO.shutdown SD ; recv till end ; then --     *)
-        IO.close_fd sock.fd
+        let () = if ad then amdbg "Ws.send: before flush_socket" else () in
+        IO.flush_socket sock.fd >>% fun () ->
+        IO.return ()
     ;
 
     value it_close sock opt_code : I.iteratee char unit =
     exception Ws_service_error of exn
     ;
 
+    value () = Printexc.register_printer & fun
+      [ Ws_service_error e -> some &
+          "websocket service error: " ^ Printexc.to_string e
+      | _ -> None
+      ]
+    ;
+
 
     (* TODO: переписать с прямым доступом к массиву и _прямо_. *)
     value ws_len64 =
               (match opt_err_msg with
                [ None -> "normal EOF"
                | Some e -> sprintf "error %s"
-                  (Printexc.to_string e)
+                  (string_of_exn e)
                ]
               )
             else ()
        ws_worker
      ->
       let close_to_new_worker code : I.iteratee char 'a =
+        let () = if ad then amdbg "it_message: close %i to new" code else () in
         let () = assert (not mrc.mrc_worker_got_close) in
         let () = mrc.mrc_worker_got_close := True in
         (wrap_feeding & fun () ->
        : [= `Going of I.iteratee char 'a | `Done ] ->
          I.iteratee Byte.t (I.iteratee Byte.t 'a)
        = fun data_state ->
+(*
         I.peek >>= (fun
          [ None ->
              let () = if ad then amdbg "it_message: eof" else () in
              I.return & `Return (close_to_new_worker 1005)  (* EOF *)
          | Some _ ->
+*)
              ws_it_frame_header >>= fun fh ->
              ( let has_masking = String.length fh.masking_key <> 0
                and expected_masking = outsock.is_serverside
                | (`Pong, data_state) -> I.return & `Pong fh data_state
                ]
              )
+(*
          ]
-        ) >>= fun header_parse_res ->
+        )
+*)
+          >>= fun header_parse_res ->
         (
          match header_parse_res with
          [ `Feed fh (it : I.iteratee char 'a) ->
         then
           IO.error Worker_got_close
         else
-          let () = if ad then amdbg "websocket_message: enter" else () in
+          let () = if ad then amdbg
+            "websocket_message: enter, close sent = %B"
+              (is_close_sent mrc.mrc_outsock) else () in
           enumpart_continue enumpart_state (it_message `Done) >>%
           fun (it, sl_l, enp_opt) ->
           let () = if ad then amdbg "websocket_message: got it_message"
             (fun () -> I.run (I.joinI it) >>% fun r -> IO.return & `Ok r)
             (fun e ->
                let () = if ad then amdbg "websocket_message: run error: %s"
-                 (Printexc.to_string e) else () in
+                 (string_of_exn e) else () in
                IO.return & `Error e
             )
           >>% fun res ->
           | `Error e ->
               let () = if ad then amdbg
                 "websocket_message: error from it's res: %s"
-                (Printexc.to_string e) else () in
+                (string_of_exn e) else () in
               IO.error e
           ]
 
 
             (let proc_error e =
                let () = if ad then amdbg "ws: io_loop: error: %s"
-                 (Printexc.to_string &
+                 (string_of_exn &
                   (loop e
                    where rec loop e =
                      match e with
                 else
                   match e with
                   [ Ws_service_error e ->
-                      let msg = Printexc.to_string e in
+                      let msg = string_of_exn e in
                       let msg =
                         if String.length msg > 125
                         then
 value return_unit = Lwt.return ()
 ;
 
+value rec string_of_exn = fun
+  [ I.Iteratees_err_msg e -> "it/" ^ string_of_exn e
+  | e -> Printexc.to_string e
+  ]
+;
 
 value ws_invalid_arg fmt =
   Printf.ksprintf
   ) >>% fun rq ->
   let () = if ad then amdbg "open_connection: rq = %S" rq else () in
   let sock = Lwt_unix.socket socket_domain socket_type 0 in
-  let close_sock_lazy = lazy (Lwt_unix.close sock) in
+  let close_sock_lazy = lazy (
+    let () = if ad then amdbg "Wc.open_connection: before close" else () in
+    Lwt_unix.close sock
+  ) in
   let close_sock () = Lazy.force close_sock_lazy in
   IO.catch
     (fun () ->
     )
     (fun e ->
        let () =
-         if ad then amdbg "wc: open_connection: %s" (Printexc.to_string e)
+         if ad then amdbg "wc: open_connection: %s" (string_of_exn e)
          else () in
        close_sock () >>% fun () ->
        failed e
 
 value close conn optreason =
   let open S in
-  close conn.mrc_outsock optreason
+  close conn.mrc_outsock optreason >>% fun () ->
+  IO.close_fd conn.mrc_outsock.fd
 ;
 
-value it_close conn opt_code = S.(it_close conn.mrc_outsock opt_code);
+value it_close conn opt_code =
+  let open S in
+  it_close conn.mrc_outsock opt_code >>= fun () ->
+  I.lift (IO.close_fd conn.mrc_outsock.fd)
+;
 
 end
 ;