Commits

Anonymous committed d5a7fcf

websocket client; massive code move

Comments (0)

Files changed (14)

 module Filename = Filename_new;
 include Am_Common;
 include Printf;
+
+value ad =
+  try Sys.getenv "AMALL_DEBUG" = "1"
+  with [ Not_found -> False ]
+;

src/amall_http.ml

 value max_headers_size = 10240;
 value default_uri_scheme = "http";
 
-value ad =
-  try Sys.getenv "AMALL_DEBUG" = "1"
-  with [ Not_found -> False ]
-;
-
 (**********)
 
 type request_method = [= `GET | `POST | `HEAD ]
 ;
 
 type response =
-  { rs_status_code : int
+  { rs_http_ver : (int * int)
+  ; rs_status_code : int
   ; rs_reason_phrase : string
   ; rs_headers : rs_headers
   ; rs_body : rs_body
 ;
 
 
-type ws_opcode_in_out =
-  [= `Text
-  |  `Binary
-  |  `Ctrl of int
-  |  `Nonctrl of int
-  ]
-;
-
-type ws_opcode_in =
-  [= ws_opcode_in_out
-  |  `Close of option int (* status code, when body is present *)
-  ]
-;
-
-type ws_opcode =
-  [= ws_opcode_in
-  |  `Cont
-  |  `Ping
-  |  `Pong
-  ]
-;
-
-type ws_opcode_out =
-  [= ws_opcode_in_out
-  |  `Ping
-  |  `Pong
-  ]
-;
-
-type ws_frame_header =
-  { fin : bool
-  ; opcode : ws_opcode
-  ; len : int64
-  ; masking_key : string  (* of length 4 *)
-  }
-;
-
-
 (*****************************)
 
 
 
 module Make
   (IO : Amall_types.IO_Type)
-  (I : It_type.IT with type It_IO.m 'a = IO.m 'a)
+  (I : It_type.IT with type It_IO.m 'a = IO.m 'a
+         and type It_IO.input_channel = IO.input_channel)
 =
 struct
 
 open I.Ops;
 
 
-type http_server_func = request -> I.iteratee char response
-;
-
-type http_service_func =
-  segpath ->
-  request ->
-  I.iteratee char response
-;
-
-
-module Ws
- :
-  sig
-
-    type ws_out_socket;
-
-    value of_fd : IO.fd -> ws_out_socket;
-
-    exception Output_closed;
-
-    exception Close_received of int;
-
-    value send : ws_out_socket -> ws_opcode_out -> string -> IO.m unit;
-    (* + send buffered + flush, maybe *)
-
-    value close : ws_out_socket -> option (int * string) -> IO.m unit;
-
-    (* useful for iteratees processing [`Close code] *)
-    value it_close : ws_out_socket ->
-                     option int (* code *) ->
-                     I.iteratee char unit
-    ;
-
-    value is_close_sent : ws_out_socket -> bool;
-
-  end
- =
-  struct
-
-    type ws_out_socket =
-      { fd : IO.fd
-      ; close_sent : mutable bool
-      }
-    ;
-
-    value of_fd fd = { fd = fd; close_sent = False };
-
-    value is_close_sent sock = sock.close_sent;
-
-    value masking_key_random_state = Random.State.make_self_init ();
-
-    exception Output_closed;
-
-    exception Close_received of int;
-
-    (* note: [`Close _] body must be prepared here in [body] *)
-    value send_gen sock fh body =
-      let () = assert fh.fin in
-      let opcode_int =
-        match fh.opcode with
-        [ `Cont -> 0x0
-        | `Text -> 0x1
-        | `Binary -> 0x2
-        | `Nonctrl o ->
-            let () = assert (0x3 <= o && o <= 0x7) in
-            o
-        | `Ctrl o ->
-            let () = assert (0xB <= o && o <= 0xF) in
-            o
-        | `Close _ -> 0x8
-        | `Ping -> 0x9
-        | `Pong -> 0xA
-        ]
-      in
-      let len = String.length body in
-      let () = assert (fh.len = Int64.of_int len) in
-      let hdr_len = 2 +
-        (if len <= 125
-         then 0
-         else if len < 0x10000
-         then 2
-         else 8
-        )
-      in
-      let module String = Bytes in
-      let msg = String.make (hdr_len + len) '\x00' in
-      let () = msg.[0] := Char.chr (0x80 lor opcode_int) in
-      let () =
-        if len <= 125
-        then
-          ( msg.[1] := Char.chr len
-          )
-        else if len < 0x10000
-        then
-          ( msg.[1] := Char.chr 126
-          ; msg.[2] := Char.chr (len lsr 8)
-          ; msg.[3] := Char.chr (len land 0xFF)
-          )
-        else
-          ( msg.[1] := Char.chr 127
-          ; msg.[2] := Char.chr  (len lsr 56)
-          ; msg.[3] := Char.chr ((len lsr 48) land 0xFF)
-          ; msg.[4] := Char.chr ((len lsr 40) land 0xFF)
-          ; msg.[5] := Char.chr ((len lsr 32) land 0xFF)
-          ; msg.[6] := Char.chr ((len lsr 24) land 0xFF)
-          ; msg.[7] := Char.chr ((len lsr 16) land 0xFF)
-          ; msg.[8] := Char.chr ((len lsr  8) land 0xFF)
-          ; msg.[9] := Char.chr ( len         land 0xFF)
-          )
-      in
-      let () = String.blit body 0 msg hdr_len len in
-      (* IO.write (in lwt and in direct io) is atomic *)
-      let () =
-        if ad then amdbg "Ws.send: %S" (Bytes.hexdump ~style:`Line msg) else ()
-      in
-      IO.write_fd sock.fd msg
-    ;
-
-    value send sock (opc : ws_opcode_out) body =
-      if sock.close_sent
-      then IO.error Output_closed
-      else
-        let masking_key =
-          Int64.to_int32
-            (Random.State.int64
-               masking_key_random_state
-               0x1_0000_0000L
-            )
-        in
-        let opc =
-          match opc with
-          [ `Ctrl 0x8 ->
-              ( sock.close_sent := True
-              ; `Close None (* won't be used in send *)
-              )
-          | `Ctrl _ | `Nonctrl _ | `Text | `Binary | `Ping | `Pong as x -> x
-          ]
-        in
-        let fh =
-          { fin = True
-          ; opcode = (opc :> ws_opcode)
-          ; len = Int64.of_int (String.length body)
-          ; masking_key =
-              let module String = Bytes in
-              let r = Bytes.make 4 '\x00' in
-              let m n =
-                r.[3 - n] := Char.chr
-                  (     Int32.to_int
-                          (Int32.shift_right_logical masking_key (8 * n))
-                   land 0xFF
-                  )
-              in
-              let () = ( m 0; m 1; m 2; m 3 ) in
-              r
-          }
-        in
-        send_gen sock fh body
-    ;
-
-    value close sock opt_code_data =
-      if sock.close_sent
-      then
-        IO.error Output_closed
-      else
-        let body =
-          match opt_code_data with
-          [ None -> ""
-          | Some (c, d) ->
-              if c < 1000 || c >= 5000 || c = 1005 || c = 1006 || c = 1015
-              then invalid_arg "Ws.close: code must be in [1000 .. 5000) \
-                                except 1005, 1006, 1015 (code = %i)" c
-              else
-                let dlen = String.length d in
-                if dlen > (125 - 2)
-                then
-                  invalid_arg "Ws.close: All control frames MUST have a \
-                               payload length of 125 bytes or less \
-                               (including 2-byte status code)."
-                else
-                  let module String = Bytes in
-                  let b = String.make (dlen + 2) '\x00' in
-                  ( Bytes.blit d 0 b 2 dlen
-                  ; b.[0] := Char.chr (c lsr 8)
-                  ; b.[1] := Char.chr (c land 0xFF)
-                  ; b
-                  )
-          ]
-        in
-        send sock (`Ctrl 0x8) body >>% fun () ->
-        IO.close_fd sock.fd
-    ;
-
-    value it_close sock opt_code : I.iteratee char unit =
-      match opt_code with
-      [ None -> I.lift (close sock None)
-      | Some c ->
-          I.gather_to_string >>= fun body ->
-          I.lift (close sock (Some (c, body)))
-      ]
-    ;
-
-  end
-;
-
-open Ws;
-
-
-type websocket_service_func_worker =
-  ws_opcode_in ->
-  I.iteratee char unit
-;
-
-type websocket_service_func =
-  Partapp3.partapp3
-    segpath
-    request
-    ws_out_socket
-    websocket_service_func_worker
-;
-
-type service_desc =
-  [= `Service_http of http_service_func
-  |  `Service_ws of websocket_service_func
-  ]
-;
 
 
 
   let reas = rs.rs_reason_phrase in
   let () = check_reason_phrase reas (String.length reas) 0 in
   let processed_headers = response_headers rs rs.rs_headers.rs_all in
-  "HTTP/1.1 " ^ (string_of_int code) ^ " " ^ reas ^ "\r\n" ^
+  "HTTP/" ^ string_of_int (fst rs.rs_http_ver) ^
+    "." ^ string_of_int (snd rs.rs_http_ver) ^ " " ^
+    (string_of_int code) ^ " " ^ reas ^ "\r\n" ^
   (String.concat "" &
    (* todo: buffer instead of list of strings *)
    List.map string_of_header &
 ;
 
 
+value read_component ~mkerr ~limit ~name ~break_pred =
+  I.break_limit ~pred:break_pred ~limit >>= fun (status, s) ->
+  match status with
+  [ `Hit_limit -> I.throw_err & mkerr & sprintf "bad %s (too big)" name
+  | `Hit_eof -> I.throw_err & mkerr & sprintf "eof reading %s" name
+  | `Found -> I.return & I.Subarray.to_string s
+  ]
+;
+
+value read_the_char ~mkerr c =
+  (I.catch
+    (fun () -> I.mapI some I.head)
+    (fun [ End_of_file -> I.return None
+         | x -> I.throw_err x])
+  )
+  >>= fun
+  [ None -> I.throw_err & mkerr & sprintf "expected %C, got eof" c
+  | Some c' when c = c' -> I.return ()
+  | Some c' -> I.throw_err & mkerr & sprintf "expected %C, got %C" c c'
+  ]
+;
+
+value read_uint ~mkerr name =
+  let max_digits = 9 in
+  let not_digit c = (c > '9' || c < '0') in
+  (* ignore_zeroes >>= fun () -> *)
+  read_component
+    ~limit:(max_digits + 1)
+    ~name
+    ~break_pred:not_digit
+    ~mkerr
+  >>= fun uint_txt ->
+  try I.return & int_of_string uint_txt
+  with [ _ -> I.throw_err & mkerr "integer is too big" ]
+;
+
+value read_http_version ~mkerr ~bad_http_err =
+  read_the_string "HTTP/" bad_http_err >>= fun () ->
+  read_uint ~mkerr "http major version" >>= fun ver_maj ->
+  read_the_char ~mkerr '.' >>= fun () ->
+  read_uint ~mkerr "http minor version" >>= fun ver_min ->
+  I.return (ver_maj, ver_min)
+;
+
+value read_line_terminators =
+  ( I.heads ['\r'; '\n'] >>= fun n ->
+    if n = 0
+    then I.heads ['\n']
+    else I.return n
+  ) >>= fun n ->
+  I.return (n <> 0)
+;
+
+value read_header_lines ~mkerr =
+  let read_headers_rev =
+    let rec read_headers acc_rev =
+      read_component ~limit:max_header_len ~name:"header"
+        ~break_pred:is_line_term ~mkerr >>= fun header_line ->
+      read_line_terminators >>= fun t ->
+      if t
+      then
+        if String.length header_line = 0
+        then
+          I.return acc_rev
+        else
+          (* process the header *)
+          if is_whitespace header_line.[0]
+          then
+            match acc_rev with
+            [ [] -> I.throw_err & mkerr "first header starts with whitespace"
+            | [last :: others] ->
+                read_headers [(last ^ header_line) :: others]
+            ]
+          else
+            read_headers [header_line :: acc_rev]
+      else
+        I.throw_err & mkerr "premature end of headers"
+    in
+      read_headers []
+  in
+  I.limit max_headers_size read_headers_rev >>= fun rh_it ->
+  match rh_it with
+  [ I.IE_cont (Some e) _k -> I.throw_err e
+  | I.IE_cont None _ ->
+      it_eof_ignore rh_it >>= fun () ->
+      I.throw_err & mkerr & sprintf "headers too large (max %i bytes allowed)"
+        max_headers_size
+  | I.IE_done headers_rev -> I.return headers_rev
+  ] >>= fun header_lines_rev ->
+  (try
+     I.return &
+     List.rev_map
+       (fun line ->
+          let (hname, sep, hval_sp) =
+            String.split_by_first ( (=) ':' ) line in
+          if String.length sep = 0
+          then
+            raise (mkerr "header without ':'")
+          else
+            let hval = String.trim is_whitespace hval_sp in
+            (hname, hval)
+       )
+       header_lines_rev
+   with [ e -> I.throw_err e ]
+  )
+;
+
+value read_eol ~mkerr =
+  read_line_terminators >>= fun t ->
+  if t
+  then I.return ()
+  else I.throw_err & mkerr "end-of-line not found"
+;
+
 (************************************************************)
 
+
 value (it_http :
-  (request -> (segpath * service_desc)) ->
+  (request ->
+   ( segpath
+   * [= `Service_http of segpath -> request -> I.iteratee char 'a
+     | `Service_ws of _
+     ]
+   )
+  ) ->
   I.iteratee char
     ( request
     * [= `Http of I.iteratee char 'a
-      |  `Ws of (string (* handshake *) * websocket_service_func * segpath)
+      |  `Ws of (string (* handshake *) * _ * segpath)
       ]
     )
 )
 process_request =
-  let fail fmt = Printf.ksprintf (fun s -> I.throw_err & Bad_request s) fmt in
-  let read_the_char c =
-    (I.catch
-      (fun () -> I.mapI some I.head)
-      (fun [ End_of_file -> I.return None
-           | x -> I.throw_err x])
-    )
-    >>= fun
-    [ None -> fail "expected %C, got eof" c
-    | Some c' when c = c' -> I.return ()
-    | Some c' -> fail "expected %C, got %C" c c'
-    ]
-  in
-  let read_component ~limit ~name ~break_pred =
-    I.break_limit ~pred:break_pred ~limit >>= fun (status, s) ->
-    match status with
-    [ `Hit_limit -> fail "bad %s (too big)" name
-    | `Hit_eof -> fail "eof reading %s" name
-    | `Found -> I.return & I.Subarray.to_string s
-    ]
-  in
-  let read_line_terminators =
-    ( I.heads ['\r'; '\n'] >>= fun n ->
-      if n = 0
-      then I.heads ['\n']
-      else I.return n
-    ) >>= fun n ->
-    I.return (n <> 0)
-  in
+  let mkerr s = Bad_request s in
+  let fail fmt = Printf.ksprintf (fun s -> I.throw_err & mkerr s) fmt in
+  let read_the_char = read_the_char ~mkerr in
+  let read_component = read_component ~mkerr in
   let read_method =
     read_component
       ~name:"method"
         meth_txt
         request_method_texts
     with
-    [ None -> fail "method not supported"
+    [ None -> fail "method not supported: %S" meth_txt
     | Some meth -> I.return meth
     ]
   and read_uri =
     [ None -> fail "bad uri"
     | Some u -> I.return u
     ]
-  and read_version =
-    let read_uint name =
-      let max_digits = 9 in
-      let not_digit c = (c > '9' || c < '0') in
-      (* ignore_zeroes >>= fun () -> *)
-      read_component
-        ~limit:(max_digits + 1)
-        ~name
-        ~break_pred:not_digit
-      >>= fun uint_txt ->
-      try I.return & int_of_string uint_txt
-      with [ _ -> fail "internal error" ]
-    in
-    read_the_string "HTTP/" (Bad_request "expected \"HTTP/\" string")
-    >>= fun () ->
-    read_uint "http major version" >>= fun ver_maj ->
-    read_the_char '.' >>= fun () ->
-    read_uint "http minor version" >>= fun ver_min ->
-    I.return (ver_maj, ver_min)
-  and read_eol =
-    read_line_terminators >>= fun t ->
-    if t
-    then I.return ()
-    else fail "end-of-line not found"
-  and read_headers_rev =
-    let rec read_headers acc_rev =
-      read_component ~limit:max_header_len ~name:"header"
-        ~break_pred:is_line_term >>= fun header_line ->
-      read_line_terminators >>= fun t ->
-      if t
-      then
-        if String.length header_line = 0
-        then
-          I.return acc_rev
-        else
-          (* process the header *)
-          if is_whitespace header_line.[0]
-          then
-            match acc_rev with
-            [ [] -> fail "first header starts with whitespace"
-            | [last :: others] ->
-                read_headers [(last ^ header_line) :: others]
-            ]
-          else
-            read_headers [header_line :: acc_rev]
-      else
-        fail "premature end of headers"
-    in
-      read_headers []
+  and read_version = read_http_version
+    ~bad_http_err:(Bad_request "expected \"HTTP/\" string")
+    ~mkerr
+  and read_eol = read_eol ~mkerr
   in
     read_method >>= fun meth ->
     read_the_char '\x20' >>= fun () ->
     read_the_char '\x20' >>= fun () ->
     read_version >>= fun version ->
     read_eol >>= fun () ->
-    I.limit max_headers_size read_headers_rev >>= fun rh_it ->
-    match rh_it with
-    [ I.IE_cont (Some e) _k -> I.throw_err e
-    | I.IE_cont None _ ->
-        it_eof_ignore rh_it >>= fun () ->
-        fail "headers too large (max %i bytes allowed)"
-          max_headers_size
-    | I.IE_done headers_rev -> I.return headers_rev
-    ] >>= fun header_lines_rev ->
-    (try
-       I.return &
-       List.rev_map
-         (fun line ->
-            let (hname, sep, hval_sp) =
-              String.split_by_first ( (=) ':' ) line in
-            if String.length sep = 0
-            then
-              raise (Bad_request "header without ':'")  (* inlined 'fail' *)
-            else
-              let hval = String.trim is_whitespace hval_sp in
-              (hname, hval)
-         )
-         header_lines_rev
-     with [ e -> I.throw_err e ]
-    ) >>= fun header_lines ->
+    read_header_lines ~mkerr >>= fun header_lines ->
     let rq_headers = make_headers header_lines in
     let rq_uri =
       match (request_uri__.authority, rq_headers.rqh_host) with
         , `Http it
         )
       in
-      let (segpath, (user_service_desc : service_desc)) =
+      let (segpath, user_service_desc) =
         process_request request
       in
       match user_service_desc with
                 }
             }
           in
-          let (segpath, (user_service_desc : service_desc)) =
+          let (segpath, user_service_desc) =
             process_request request
           in
           match user_service_desc with
     [ e ->
         let rs_error =
           { rs_status_code = 500
+          ; rs_http_ver = (1, 1)
           ; rs_reason_phrase = "Internal server error"
           ; rs_headers = { rs_all = [] }
           ; rs_body = Body_string (Printexc.to_string e)

src/amall_http_client_common.ml

+(* some functions common to http client code *)
+
+module Make
+  (IO : Amall_types.IO_Type)
+  (I : It_type.IT with type It_IO.m 'a = IO.m 'a
+                   and type It_IO.input_channel = IO.input_channel)
+=
+struct
+
+open I.Ops;
+
+open Amall_http
+;
+
+module H = Amall_http.Make(IO)(I)
+;
+
+exception Bad_response of string
+;
+
+open Cd_All
+;
+
+value mkerr s = Bad_response s
+;
+
+value fail fmt = Printf.ksprintf (fun s -> I.throw_err & mkerr s) fmt
+;
+
+
+value read_component =
+  H.read_component ~mkerr
+;
+
+value read_version =
+  H.read_http_version
+    ~mkerr
+    ~bad_http_err:(Bad_response  "expected \"HTTP/\" string")
+;
+
+value read_the_char = H.read_the_char ~mkerr
+;
+
+value read_uint = H.read_uint ~mkerr
+;
+
+
+value it_http_response_no_body =
+  read_version >>= fun ver ->
+  read_the_char '\x20' >>= fun () ->
+  read_uint "status code" >>= fun status_code ->
+  if status_code < 100 || status_code >= 1000
+  then fail "status code must be 3-digit" else
+  read_the_char '\x20' >>= fun () ->
+  read_component ~limit:max_header_len ~name:"reason phrase"
+    ~break_pred:is_line_term >>= fun reason_phrase ->
+  H.read_eol ~mkerr >>= fun () ->
+  H.read_header_lines ~mkerr >>= fun header_lines ->
+  I.return
+    { rs_http_ver = ver
+    ; rs_status_code = status_code
+    ; rs_reason_phrase = reason_phrase
+    ; rs_headers = { rs_all = header_lines }
+    ; rs_body = No_body
+    }
+;
+
+
+end
+;

src/amall_http_server.ml

 open Printf;
 open Amall_http;
 open Cd_All;
-
-type port = int;
-type addr_string = string;
-
-type listen_addr =
-  [= `Inet_any of port
-  |  `Inet_loopback of port
-  |  `Inet_str of (addr_string * port)
-  |  `Inet_addr of (Unix.inet_addr * port)
-  |  `Unix_socket of string
-  ]
-;
+open Strings.Latin1;
+open Am_All;
 
 module Http_server
   (IO : IO_Type)
  :
   sig
 
+    type http_server_func = request -> I.iteratee char response
+    ;
+
     value post_form : request ->
       I.iteratee char (request * list (string * string))
     ;
 
     value run :
       listen_addr ->
-      (request -> (segpath * (Amall_http.Make(IO)(I)).service_desc)) ->
+      (request ->
+        ( segpath
+        * [= `Service_http of
+                Amall_http.segpath ->
+                Amall_http.request ->
+                I.iteratee char Amall_http.response
+          |  `Service_ws of
+                Partapp3.partapp3
+                  Amall_http.segpath
+                  Amall_http.request
+                  (Websocket.Server(IO)(I)).ws_out_socket
+                  ( (Websocket.Server(IO)(I))
+                       .websocket_service_func_worker_gen unit)
+          ]
+        )
+      ) ->
       IO.server
     ;
 
 
 open I.Ops;
 
+type http_server_func = request -> I.iteratee char response
+;
 
 value (runA : IO.m (I.iteratee 'el 'a) -> It_Types.res 'a) i =
   IO.runIO (i >>% I.run)
   let fn = "post-req" in
   match
     runA & I.enum_file fn &
-      ((H.it_http & post_form)
+      ((H.it_h'ttp & post_form)
        >>= fun r -> dump_chars_chunks "after body" >>= fun () ->
        I.return r
       )
 
 value can't_switch_ws_resp =
   { rs_status_code = 400
+  ; rs_http_ver = (1, 1)
   ; rs_reason_phrase = "Bad request"
   ; rs_headers = { rs_all = [] }
   ; rs_body = Body_string "Can't switch to Websockets after HTTP requests"
 ;
 
 
-value ws_guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
-;
-
 open Am_Common
 ;
 
-value (continue_ws_resp : string -> response) =
-  let accept handshake =
-    let open Cryptokit in
-    transform_string
-      (Base64.encode_compact_pad ())
-      (hash_string
-         (Hash.sha1 ())
-         (handshake ^ ws_guid)
-      )
-  and other_headers =
-    [ ("Upgrade", "websocket")
-    ; ("Connection", "Upgrade")
-    ]
-  in
-    fun handshake ->
-      { rs_status_code = 101
-      ; rs_reason_phrase = "Switching Protocols"
-      ; rs_body = No_body
-      ; rs_headers =
-          { rs_all =
-              [    ("Sec-WebSocket-Accept", accept handshake)
-                :: other_headers
-              ]
-          }
-      }
-;
-
-
-(* TODO: переписать с прямым доступом к массиву. *)
-value it_uint16_as_64_netord =
-  I.head >>= fun b0 ->
-  I.head >>= fun b1 ->
-  I.return & Int64.of_int (((Char.code b0) lsl 8) lor (Char.code b1))
-;
-
-(* TODO: переписать с прямым доступом к массиву. *)
-value ws_it_close_code ~masking_key =
-  I.head >>= fun b0 ->
-  I.head >>= fun b1 ->
-  I.return
-    (
-         (((Char.code b0) lxor (Char.code masking_key.[0])) lsl 8)
-     lor ((Char.code b1) lxor (Char.code masking_key.[1]))
-    )
-;
-
-
-exception Ws_error of string
-;
-
-value ws_error msg = I.throw_err (Ws_error msg)
-;
-
-exception Ws_service_error of exn
-;
-
-
-(* TODO: переписать с прямым доступом к массиву и _прямо_. *)
-value ws_len64 =
-  let h = I.head >>= fun c -> I.return & Int64.of_int (Char.code c) in
-  h >>= fun i0 ->
-  let open Int64 in
-  let ( lor ) = Int64.logor
-  and ( land ) = Int64.logand
-  and ( lsl ) = Int64.shift_left in
-  if (i0 land 0x80L) == 0x80L
-  then ws_error "ws_len64: 'the most significant bit MUST be 0' [rfc6455]"
-  else
-    h >>= fun i1 ->
-    h >>= fun i2 ->
-    h >>= fun i3 ->
-    h >>= fun i4 ->
-    h >>= fun i5 ->
-    h >>= fun i6 ->
-    h >>= fun i7 ->
-    I.return
-      (      i7
-        lor (i6 lsl  8)
-        lor (i5 lsl 16)
-        lor (i4 lsl 24)
-        lor (i3 lsl 32)
-        lor (i2 lsl 40)
-        lor (i1 lsl 48)
-        lor (i0 lsl 56)
-      )
-;
-
-value ws_it_masking_key () =
-  I.head >>= fun a0 ->
-  I.head >>= fun a1 ->
-  I.head >>= fun a2 ->
-  I.head >>= fun a3 ->
-  let module String = Bytes in
-  let k = String.make 4 '\x00' in
-  ( k.[0] := a0
-  ; k.[1] := a1
-  ; k.[2] := a2
-  ; k.[3] := a3
-  ; I.return k
-  )
-;
-
-
-value ws_it_frame_header =
-  let no_masking_key = Bytes.make 4 '\x00' in
-  I.head >>= fun b0 ->
-  I.head >>= fun b1 ->
-  let fin = ((Char.code b0) land 0x80) == 0x80 in
-  let opcode_int = (Char.code b0) land 0x0F in
-  let mask = ((Char.code b1) land 0x80) == 0x80 in
-  let len1 = ((Char.code b1) land 0x7F) in
-  (if len1 = 126
-   then it_uint16_as_64_netord
-   else if len1 = 127
-   then ws_len64
-   else I.return (Int64.of_int len1)
-  ) >>= fun len ->
-  (if mask
-  then
-    ws_it_masking_key ()
-  else
-    I.return no_masking_key
-  ) >>= fun masking_key ->
-  (if opcode_int >= 0x8
-   then
-     if not fin
-     then
-       ws_error "All control frames MUST NOT be fragmented."
-     else if len > 125L
-     then
-       ws_error "All control frames MUST have a payload length \
-                 of 125 bytes or less."
-     else
-       match opcode_int with
-       [ 0x8 ->
-           if len = 0L
-           then I.return & `Close None
-           else if len = 1L
-           then ws_error "Close: 'If there is a body, the first two bytes of \
-                          the body MUST be a 2-byte unsigned integer', \
-                          but body has only 1 byte."
-           else
-             ws_it_close_code ~masking_key >>= fun status_code ->
-             let () =
-               if ad
-               then amdbg "ws_it_frame_header: Close: status_code = %i"
-                 status_code
-               else ()
-             in
-             let () =
-               let m0 = masking_key.[0]
-               and m1 = masking_key.[1]
-               and m2 = masking_key.[2]
-               and m3 = masking_key.[3]
-               in
-                 ( masking_key.[0] := m2
-                 ; masking_key.[1] := m3
-                 ; masking_key.[2] := m0
-                 ; masking_key.[3] := m1
-                 )
-             in
-             I.return & `Close (Some status_code)
-       | 0x9 -> I.return `Ping
-       | 0xA -> I.return `Pong
-       | x -> I.return & `Ctrl x
-       ]
-   else
-     I.return (match opcode_int with
-     [ 0x0 -> `Cont
-     | 0x1 -> `Text
-     | 0x2 -> `Binary
-     | x -> `Nonctrl x
-     ])
-  )
-  >>= fun opcode ->
-  let len =
-    match opcode with
-    [ `Close (Some _) -> Int64.sub len 2L
-    | _ -> len
-    ]
-  in
-  let open H in
-  I.return &
-    { fin = fin
-    ; opcode = opcode
-    ; len = len
-    ; masking_key = masking_key
-    }
-;
-
-type ws_it_feed_resp =
-  [= `Frame_going of
-       (  I.iteratee char unit
-       * [= `Finished
-         |  `Continues
-         ]
-       )
-  |  `Closed of int
-  ]
-;
-
-type ws_it_loop_data_state =
-  [= `Frame_going of I.iteratee char unit
-  |  `Frame_done
-  ]
-;
-
-type ws_it_loop_resp =
-  [= ws_it_feed_resp
-  |  `Closed of int (* status code *)
-  |  `Ping of (I.iteratee char string (* body *) * ws_it_loop_data_state)
-  |  `Proto_error of (int * string)  (* (code, reason) *)
-  |  `Frame_done
-  ]
-;
-
-
-value enee_unmask ~masking_key ~growing_array
-  : I.enumeratee Byte.t Byte.t 'a
-  = fun it ->
-      let mlen = String.length masking_key in
-      if mlen = 0
-      then
-        I.throw_err & Invalid_argument "enum_unmask: masking_key is empty"
-      else
-        let rec unmask ~mi it =
-          match it with
-          [ I.IE_done _ | I.IE_cont (Some _) _ -> I.return it
-          | I.IE_cont None k -> I.ie_cont (step ~mi it k)
-          ]
-        and step ~mi it k s =
-          match s with
-          [ I.EOF _ -> I.ie_doneM it s
-          | I.Chunk c ->
-              let open I.S.C in
-              let clen = c.len in
-              let tarr = Array.GrowingArray.request_array clen growing_array in
-              let carr = c.arr
-              and cofs = c.ofs in
-              let cmaxi = cofs + clen in
-              let rec loop ~mi ~ci ~ti =
-                if ci = cmaxi
-                then mi
-                else
-                  ( tarr.(ti) := Char.chr
-                      (     (Char.code carr.(ci))
-                       lxor (Char.code masking_key.[mi])
-                      )
-                  ; loop ~mi:((mi + 1) mod mlen) ~ci:(ci + 1) ~ti:(ti + 1)
-                  )
-              in
-              let mi = loop ~mi ~ci:cofs ~ti:0 in
-              let new_chunk = mk ~arr:tarr ~ofs:0 ~len:clen in
-              (k (I.Chunk new_chunk)) >>% fun (it, _s) ->
-              IO.return (unmask ~mi it, I.Sl.empty)
-          ]
-        in
-          unmask ~mi:0 it
-;
-
-
-
-value wrap_feeding it_f =
-  I.catch
-    it_f
-    (fun e ->
-       match e with
-       [ Ws_service_error e -> I.throw_err e
-       | e -> I.throw_err & Ws_service_error e
-       ]
-    )
-;
-
-
-value feed_going_it ~growing_array ~fh it
- =
-  let len_int = Int64.to_int fh.len in
-  let () = assert (fh.len = Int64.of_int len_int) in
-  (wrap_feeding
-    (fun () ->
-       (I.take_exact
-          len_int
-          (I.joinI & enee_unmask ~growing_array ~masking_key:fh.masking_key it)
-       )
-    )
-  )
-  >>= fun (ex, it) ->
-  match ex with
-  [ `Done ->
-      I.return &
-      `Frame_going it (if fh.fin then `Finished else `Continues)
-  | `Eof (has_read, opt_err_msg) ->
-      let () =
-        if ad
-        then amdbg
-          "ws: it_loop: feed_going_it: got %i bytes \
-           instead of %i because of %s; closing."
-          (len_int - has_read)
-          len_int
-          (match opt_err_msg with
-           [ None -> "normal EOF"
-           | Some e -> sprintf "error %s"
-              (Printexc.to_string e)
-           ]
-          )
-        else ()
-      in
-      (wrap_feeding (fun () ->
-        I.feed_it it (I.EOF (Some (H.Ws.Close_received 1006)))
-      )) >>= fun _it ->
-      I.return &
-      `Closed 1006
-  ]
-;        
-
+module Ws = Websocket.Server(IO)(I);
 
 value do_websocket
-  outsock
-  (sl_l : Lazy.t (I.sl char))
-  enum_part
-  (ws_worker : H.websocket_service_func_worker)
+  (outsock : Ws.ws_out_socket)
+  (ws_worker : Ws.websocket_service_func_worker_gen unit)
+  (enumpart_state : Ws.enumpart_state Byte.t)
  :
   IO.m unit
  =
-  let worker_got_close = ref False
-  and growing_array = Array.GrowingArray.make 100 '\x00' in
-  (
-    let rec it_loop (data_state : ws_it_loop_data_state)
-     : I.iteratee char ws_it_loop_resp
-     =
-        let () = if ad then amdbg "ws: it_loop" else () in
-
-        (I.ie_cont & fun s ->
-           match s with
-           [ I.EOF _ ->
-               let () = if ad then amdbg "it_loop: stream: EOF" else () in
-               I.ie_doneM () s
-           | I.Chunk c ->
-               let str = I.S.to_string c in
-               let () = if ad then amdbg "it_loop: stream: Chunk \"%s\""
-                 (Bytes.hexdump ~style:`Line str)
-                 else ()
-               in
-               I.ie_doneM () s
-           ]
-        ) >>= fun () ->
-        let () = if ad then amdbg "ws: it_loop: after dump" else () in
-
-        I.peek >>= fun p ->
-        let () = if ad then amdbg "ws: it_loop: peek = %s"
-          (match p with [None -> "None" | Some c -> sprintf "Some %C" c ])
-          else () in
-
-        ws_it_frame_header >>= fun fh ->
-        let () = if ad then amdbg "ws: it_loop: fh" else () in
-
-        let rec ret_closed ~opc ~code =
-          let () = worker_got_close.val := True in
-          (wrap_feeding (fun () -> feed_new_worker ~opc))
-          >>= fun () ->
-          I.return & `Closed code
-        
-        and feed : ~data_state : ws_it_loop_data_state -> ~opc : _ ->
-                   I.iteratee char ws_it_feed_resp
-         = fun ~data_state ~opc ->
-          let it =
-            match data_state with
-            [ `Frame_done ->
-                wrap_feeding (fun () -> ws_worker opc)
-            | `Frame_going it ->
-                it
-            ]
-          in
-            feed_going_it ~growing_array ~fh it
-        
-        and feed_new_worker ~opc : I.iteratee char unit =
-          feed ~data_state:`Frame_done ~opc >>= fun fr ->
-          match fr with
-          [ `Frame_going it _ ->
-              wrap_feeding (fun () -> I.feed_it it (I.EOF None))
-              >>= fun _it ->
-              I.return ()
-          | `Closed _code ->
-              I.return ()
-          ]
-
-        and ret_data_state () =
-          I.return &
-          match data_state with
-          [ `Frame_done as x -> x
-          | `Frame_going it -> `Frame_going it `Continues
-          ]
-        in
-
-        match fh.opcode with
-        [ (`Close opt_code) as opc ->
-            let code =
-              match opt_code with
-              [ Some x -> x
-              | None -> 1005
-              ]
-            in
-            match data_state with
-            [ `Frame_done ->
-                ret_closed ~opc ~code
-            | `Frame_going it ->
-                (wrap_feeding (fun () ->
-                  I.feed_it it (I.EOF (Some (H.Ws.Close_received code)))
-                )) >>= fun _it ->
-                (* итерат, получив EOF, должен перейти в
-                   "готовое состояние", а оно неинтересно. *)
-                ret_closed ~opc ~code
-            ]
-        | `Ping ->
-            feed_going_it ~growing_array ~fh I.gather_to_string >>= fun fr ->
-            match fr with
-            [ (`Closed _) as c -> I.return c
-            | `Frame_going gath_it _cont_fin -> I.return &
-                `Ping gath_it data_state
-            ]
-        | `Pong ->
-            ret_data_state ()
-        | (`Ctrl _) as opc ->
-            feed_new_worker ~opc >>= fun () ->
-            ret_data_state ()
-        | (`Text | `Binary | `Nonctrl _) as opc ->
-            let () =
-              if ad then amdbg "ws: it_loop: text/binary/nonctrl" else () in
-            match data_state with
-            [ `Frame_going it ->
-                let msg = "data frame without previous fin" in
-                (wrap_feeding (fun () ->
-                  I.feed_it it (I.EOF (Some (Ws_error msg)))
-                ))
-                (* ошибка, так как при фрейме с fin=true состояние станет
-                   _done, а при исходном _going должен быть fin=true перед
-                   другими фреймами. *)
-                >>= fun _it ->
-                I.return & `Proto_error 1002 msg
-            | `Frame_done ->
-                ( (feed ~data_state ~opc)
-                  :> I.iteratee char ws_it_loop_resp
-                )
-            ]
-        | `Cont ->
-            match data_state with
-            [ `Frame_done ->
-                I.return & `Proto_error 1003 "continuation frame without \
-                                              previous beginning frame"
-                (* продолжение без предыдущего начала *)
-            | `Frame_going it ->
-                ( (feed_going_it it ~growing_array ~fh)
-                  :> I.iteratee char ws_it_loop_resp
-                )
-            ]
-        ]
-
-    and io_loop (enum_part : I.enumpart char ws_it_loop_resp) sl_l data_state =
-      let () = if ad then amdbg "ws: io_loop" else () in
-      if H.Ws.is_close_sent outsock
-      then
-        IO.return ()
-      else
-        let sl = Lazy.force sl_l in
-        enum_part sl (it_loop data_state)
-        >>% fun (loop_it, sl_l, enp_opt) ->
-        let () = if ad then amdbg "ws: io_loop: enum_part returned" else () in
-
-        (let proc_error e =
-           let () = if ad then amdbg "ws: io_loop: error: %s"
-             (Printexc.to_string &
-              (loop e
-               where rec loop e =
-                 match e with
-                 [ Ws_service_error e | I.Iteratees_err_msg e -> loop e
-                 | _ -> e
-                 ]
-              )
-             )
-             else ()
-           in
-           let () =
-             if ad then amdbg "ws: io_loop: proc_error: 1: is_close_sent = %b"
-               (H.Ws.is_close_sent outsock)
-             else ()
-           in
-           (if H.Ws.is_close_sent outsock
-            then
-              IO.return ()
-            else
-              match e with
-              [ Ws_service_error e ->
-                  let msg = Printexc.to_string e in
-                  let msg =
-                    if String.length msg > 125
-                    then
-                      (String.sub msg 0 122) ^ "..."
-                    else
-                      msg
-                  in
-                    let () =
-                      if ad then amdbg "ws: io_loop: service error: %S" msg
-                      else () in
-                    H.Ws.close outsock (Some (1011, msg))
-              | _ ->
-                  H.Ws.close outsock None
-              ]
-           ) >>% fun () ->
-           let () =
-             if ad then amdbg "ws: it_loop: proc_error: 2: is_close_sent = %b"
-               (H.Ws.is_close_sent outsock)
-             else ()
-           in
-           IO.return & `Closed 1005
-         in
-         IO.catch
-           (fun () ->
-              I.run loop_it
-           )
-           (fun e ->
-              proc_error e
-           )
-        )
-        >>% fun loop_resp ->
-        let () = if ad then amdbg "io_loop: analyzing loop_resp" else () in
-        (match loop_resp with
-         [ `Frame_going it `Finished ->
-             I.run it >>% fun () ->
-             let () =
-               if ad then amdbg "ws: io_loop: finished service iteratee"
-               else () in
-             IO.return `Frame_done
-         | `Frame_going it `Continues ->
-             IO.return & `Frame_going it
-         | `Frame_done ->
-             IO.return `Frame_done
-         | `Closed code ->
-             (if not worker_got_close.val
-              then
-                let () = worker_got_close.val := True in
-                I.run (ws_worker (`Close (Some code)))
-              else
-                IO.return ()
-             )
-             >>% fun () ->
-             if H.Ws.is_close_sent outsock
-             then
-               IO.return `Frame_done
-             else
-               H.Ws.close outsock (Some (code, "")) >>% fun () ->
-               IO.return `Frame_done
-               (* will exit after entering io_loop *)
-         | `Ping it_body old_state ->
-             I.run it_body >>% fun body ->
-             H.Ws.send outsock `Pong body >>% fun () ->
-             IO.return old_state
-         | `Proto_error (code : int) (msg : string) ->
-             let () = worker_got_close.val := True in
-             I.run (ws_worker (`Close (Some code)))
-             >>% fun () ->
-             if H.Ws.is_close_sent outsock
-             then
-               IO.return `Frame_done
-             else
-               H.Ws.close outsock (Some (code, msg)) >>% fun () ->
-               IO.return `Frame_done
-         ]
-        ) >>% fun data_state ->
-        (match enp_opt with
-         [ I.EP_None ->
-             let () = if ad then amdbg "ws: io_loop: ep_none" else () in
-             (match data_state with
-              [ `Frame_done -> IO.return ()
-              | `Frame_going it ->
-                  I.run &
-                  I.feed_it it (I.EOF (Some (H.Ws.Close_received 1006)))
-              ]
-             ) >>% fun () ->
-             let () =
-               if ad then amdbg "ws: io_loop: ep_none: worker_got_close = %b"
-                 worker_got_close.val
-               else ()
-             in
-             if not worker_got_close.val
-             then
-               let () = worker_got_close.val := True in
-               I.run (ws_worker (`Close (Some 1006)))
-             else
-               IO.return ()  (* just exit *)
-         | I.EP_Some enum_part ->
-             let () = if ad then amdbg "ws: io_loop: ep_some" else () in
-             io_loop enum_part.I.enumpart_poly sl_l data_state
-         ]
-        )
-
-    in
-    io_loop enum_part sl_l `Frame_done
-  )
+  let mrc = Ws.message_reading_context
+    enumpart_state
+    outsock
+  in
+  loop ()
+  where rec loop () =
+    if Ws.is_finished mrc
+    then
+      IO.return ()
+    else
+      Ws.websocket_message mrc ws_worker >>% loop
 ;
 
 
 value rec string_of_exn e =
   match e with
   [ I.Iteratees_err_msg e -> "it/" ^ string_of_exn e
-  | Ws_service_error e -> "wssrv/" ^ string_of_exn e
+  | Ws.Ws_service_error e -> "wssrv/" ^ string_of_exn e
   | _ -> Printexc.to_string e
   ]
 ;
 ;
 
 value io_http_server_func
-  (userfunc : request -> (segpath * H.service_desc))
+  (userfunc : request -> (segpath * _))
   fd
  :
   IO.m unit
                           (List.mem `Close request.rq_headers.connection) ||
                           (List.exists
                              (fun (hk, hv) ->
-                                hk = "Connection" && hv = "close")
+                                String.eq_nocase_latin1 hk "Connection" &&
+                                String.eq_nocase_latin1 hv "close"
+                             )
                              resp.rs_headers.rs_all
                           )
                         in
                         if can_switch_to_ws
                         then
                           H.output_response ~is_head:False fd
-                            (continue_ws_resp handshake)
+                            (Ws.continue_ws_resp handshake)
                           >>% fun () ->
-                          let ws_out_socket = H.Ws.of_fd fd in
+                          let ws_out_socket = Ws.of_fd ~is_serverside:True fd
+                          in
                           let worker = Partapp3.apply
                             ws_service segpath request ws_out_socket
                           in
                             (fun () ->
                                do_websocket
                                  ws_out_socket
-                                 sl_l
-                                 cont.I.enumpart_poly
                                  worker
+                                 (sl_l, cont)
                                >>% fun () ->
                                IO.return & `Ok ()
                             )
 ;
 
 value http_server_func
-  (userfunc : request -> (segpath * H.service_desc))
+  (userfunc : request -> (segpath * _))
   fd
  :
   unit
 ;
 
 
-value run_addr listen_addr (userfunc : request -> (segpath * H.service_desc)) =
+value run_addr listen_addr (userfunc : request -> (segpath * _)) =
   IO.establish_server_fd listen_addr (http_server_func userfunc)
 ;
 
 
 value run listen_addr userfunc =
-  let a =
-    match listen_addr with
-    [ `Inet_any port -> Unix.ADDR_INET (Unix.inet_addr_any, port)
-    | `Inet_loopback port -> Unix.ADDR_INET (Unix.inet_addr_loopback, port)
-    | `Inet_str str port -> Unix.ADDR_INET (Unix.inet_addr_of_string str, port)
-    | `Inet_addr a p -> Unix.ADDR_INET (a, p)
-    | `Unix_socket name -> Unix.ADDR_UNIX name
-    ]
-  in
-    run_addr a userfunc
+  let a = sockaddr_of_listen_addr listen_addr in
+  run_addr a userfunc
 ;
 
 

src/amall_http_service.ml

 ;
 
 
+module Types
+  (IO : IO_Type)
+  (I : It_type.IT with
+         type It_IO.m 'a = IO.m 'a
+     and type It_IO.input_channel = IO.input_channel
+  )
+ =
+  struct
+
+    open Amall_http;
+
+    type websocket_service_func_worker =
+      (Websocket.Server(IO)(I)).websocket_service_func_worker_gen unit
+    ;
+
+    type websocket_service_func =
+      Partapp3.partapp3
+        segpath
+        request
+        (Websocket.Server(IO)(I)).ws_out_socket
+        websocket_service_func_worker
+    ;
+
+    type http_service_func =
+      segpath ->
+      request ->
+       I.iteratee char response
+    ;
+
+    type service_desc =
+      [= `Service_http of http_service_func
+      |  `Service_ws of websocket_service_func
+      ]
+    ;
+
+  end
+;
+
+
 module Service
   (IO : IO_Type)
   (I : It_type.IT with
 
     module H : module type of Amall_http.Make(IO)(I);
     module HS : module type of Http_server(IO)(I);
+    module T : module type of Types(IO)(I);
+    open T;
 
     (* Service receives path components relative to "mount point".
        For example, when service is registered on
     (* returns listener and mount point for its root *)
 
     value listener_create
-      : Amall_http_server.listen_addr ->
+      : listen_addr ->
         ( listener
-        * mount_point H.http_service_func
-        * mount_point H.websocket_service_func
+        * mount_point http_service_func
+        * mount_point websocket_service_func
         )
     ;
 
     (* creates a destination mount point in the specified segpath
        below the source mount point. *)
     value mount_rel_http :
-      mount_point H.http_service_func ->
+      mount_point http_service_func ->
       segpath ->
-      mount_point H.http_service_func
+      mount_point http_service_func
     ;
     value mount_rel_ws :
-      mount_point H.websocket_service_func ->
+      mount_point websocket_service_func ->
       segpath ->
-      mount_point H.websocket_service_func
+      mount_point websocket_service_func
     ;
 
     value mount_http :
-      endpoint H.http_service_func ->
-      H.http_service_func ->
+      endpoint http_service_func ->
+      http_service_func ->
       unit
     ;
 
 
     (* receives websocket messages *)
     type websocket_service_func_worker =
-      H.websocket_service_func_worker
+      T.websocket_service_func_worker
     ;
 
     (* returns websocket_service_func_worker, run once per connection *)
     type websocket_service_func =
-      H.websocket_service_func
+      T.websocket_service_func
     ;
 
     value mount_websocket :
-      endpoint H.websocket_service_func ->
+      endpoint websocket_service_func ->
       websocket_service_func ->
       unit
     ;
 
     module H = Amall_http.Make(IO)(I);
     module HS = Http_server(IO)(I);
+    module T = Types(IO)(I);
+    open T;
 
-    type websocket_service_func = H.websocket_service_func;
-    type websocket_service_func_worker = H.websocket_service_func_worker;
+    type websocket_service_func = T.websocket_service_func;
+    type websocket_service_func_worker = T.websocket_service_func_worker;
 
     value it_post_vars = HS.it_post_vars
     ;
       in
       I.return
         { rs_status_code = 404
+        ; rs_http_ver = (1, 1)
         ; rs_reason_phrase = "Not found"
         ; rs_headers = { rs_all = [] }
         ; rs_body = Body_string
 
     type listener =
       { io_server_lazy : Lazy.t ( IO.server )
-      ; root_http_disp_level : disp_level H.http_service_func
+      ; root_http_disp_level : disp_level http_service_func
           (* contains just '"" => root' binding when there exists
              at least one service, empty otherwise; and
              fallback with error "no such proto://host:port".
     value find_handler (type s)
       (disp_level : disp_level s)
       segpath
-      (wrap : s -> H.service_desc)
+      (wrap : s -> service_desc)
      :
-      (segpath * H.service_desc)
+      (segpath * service_desc)
      =
       match try_find_handler disp_level segpath with
       [ `Ok (p, f) -> (p, wrap f)
       )
     ;
 
+    value dump_segpath s =
+      List.dump (Printf.sprintf "%S") s
+    ;
+
+    value dump_endpoint (_mpoint, loc) =
+      match loc with
+      [ `Service (p, s) ->
+          "`Service " ^ dump_segpath p ^ " " ^ Printf.sprintf "%S" s
+      | `Fallback p ->
+          "`Fallback " ^ dump_segpath p
+      ]
+    ;
+
+
 
     value mount_rel ~fb mount_point segpath =
       go_or_create_bigstep
     (**********)
 
     value mount_http endpoint f =
-      let () = if ad then amdbg "S.mount" else () in
+      let () = if ad then amdbg "S.mount_http" else () in
       install_handler ~fb:default_http_fallback ~endpoint ~what:f
     ;
 
     value mount_websocket endpoint f =
-      let () = if ad then amdbg "S.mount" else () in
+      let () =
+        if ad then
+          amdbg "S.mount_websocket on %s" & dump_endpoint endpoint
+        else () in
       install_handler ~fb:default_ws_fallback ~endpoint ~what:f
     ;
 
 
 
     value (server_func :
-     disp_level H.http_service_func ->
-     disp_level H.websocket_service_func ->
+     disp_level http_service_func ->
+     disp_level websocket_service_func ->
      request ->
-     (segpath * H.service_desc)
+     (segpath * service_desc)
     )
       root_http_disp_level
       root_ws_disp_level

src/amall_types.ml

 type res 'a = [= `Ok of 'a | `Error of exn ]
 ;
 
+type port = int;
+type addr_string = string;
+
+type connect_addr =
+  [= `Inet_loopback of port
+  |  `Inet_str of (addr_string * port)
+  |  `Inet_addr of (Unix.inet_addr * port)
+  |  `Unix_socket of string
+  ]
+;
+
+type listen_addr =
+  [= connect_addr
+  |  `Inet_any of port
+  ]
+;
+
+value sockaddr_of_connect_addr (connect_addr : connect_addr) =
+  let open Unix in
+  match connect_addr with
+  [ `Inet_loopback port -> ADDR_INET (inet_addr_loopback, port)
+  | `Inet_str str port -> ADDR_INET (inet_addr_of_string str, port)
+  | `Inet_addr a p -> ADDR_INET (a, p)
+  | `Unix_socket name -> ADDR_UNIX name
+  ]
+;
+
+value sockaddr_of_listen_addr (listen_addr : listen_addr) =
+  match listen_addr with
+  [ #connect_addr as ca -> sockaddr_of_connect_addr ca
+  | `Inet_any port -> Unix.(ADDR_INET (inet_addr_any, port))
+  ]
+;
+
+value socket_domain_of_connect_addr (connect_addr : connect_addr) =
+  let open Unix in
+  match connect_addr with
+  [ `Unix_socket _ -> PF_UNIX
+  | `Inet_loopback _ | `Inet_str _ _ | `Inet_addr _ _ -> PF_INET
+  ]
+;
+
+value socket_domain_of_listen_addr (listen_addr : listen_addr) =
+  match listen_addr with
+  [ #connect_addr as ca -> socket_domain_of_connect_addr ca
+  | `Inet_any _ -> Unix.PF_INET
+  ]
+;
+
+
 module type IO_Type =
   sig
     type m +'a;
       iteratee 'el (it_exact * iteratee 'el 'a)
     ;
 
+    value it_ignore_or_fail : iteratee 'el unit
+    ;
+
   end
 ;
   | '\x5D' .. '\x7E' -> true
 | _ -> false
 
+let is_ctl = function
+| '\x00' .. '\x1F' | '\x7F' -> true
+| _ -> false
+
+let is_text_octet c = not (is_ctl c)
+
+let is_separator = function
+|   '(' | ')' | '<' | '>' | '@'
+  | ',' | ';' | ':' | '\\' | '\"'
+  | '/' | '[' | ']' | '?' | '='
+  | '{' | '}' | '\x20' | '\x09' -> true
+| _ -> false
+
+let is_token s =
+  if String.length s = 0
+  then false
+  else
+    let rec loop i =
+      if i = String.length s
+      then true
+      else
+        let c = s.[i] in
+        if is_ctl c || is_separator c
+        then false
+        else loop (i + 1)
+    in
+      loop 0
 }
 
 (* common part from rfc2616 *)
+open Cd_All;
+open Strings.Latin1;
+open Uri_type;
+open Am_All;
+
+exception Output_closed;
+
+exception Close_received of int;
+
+exception Worker_got_close;
+
+exception Proto_error of string;
+
+
+value ws_guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
+;
+
+
+value random_bytes state len =
+  let r = String.make len '\x00' in
+  ( for i = 0 to len - 1
+    do
+      Bytes.set r i (Char.chr (Random.State.int state 256))
+    done
+  ; r
+  )
+;
+
+
+open Ws_types;
+
+module Server
+  (IO : Amall_types.IO_Type)
+  (I : It_type.IT with
+        type It_IO.m 'a = IO.m 'a
+    and type It_IO.input_channel = IO.input_channel
+  )
+ =
+  struct
+
+    open I.Ops;
+
+    (* todo: to iteratees: *)
+    type enumpart_state 'el = (Lazy.t (I.sl 'el) * I.enumpart_poly 'el)
+    ;
+
+    open Amall_http;
+
+    type ws_out_socket =
+      { fd : IO.fd
+      ; close_sent : mutable bool
+      ; is_serverside : bool
+      }
+    ;
+
+    value of_fd ~is_serverside fd =
+      { fd = fd
+      ; close_sent = False
+      ; is_serverside = is_serverside
+      }
+    ;
+
+    value outsock_is_serverside s = s.is_serverside
+    ;
+
+    value is_close_sent sock = sock.close_sent;
+
+    value masking_key_random_state = Random.State.make_self_init ();
+
+    exception Output_closed;
+
+    exception Close_received of int;
+
+    (* note: [`Close _] body must be prepared here in [body] *)
+    value send_gen sock fh body =
+      let () = assert fh.fin in
+      let mkey = fh.masking_key in
+      let masking = String.length mkey <> 0 in
+      let opcode_int =
+        match fh.opcode with
+        [ `Cont -> 0x0
+        | `Text -> 0x1
+        | `Binary -> 0x2
+        | `Nonctrl o ->
+            let () = assert (0x3 <= o && o <= 0x7) in
+            o
+        | `Ctrl o ->
+            let () = assert (0xB <= o && o <= 0xF) in
+            o
+        | `Close _ -> 0x8
+        | `Ping -> 0x9
+        | `Pong -> 0xA
+        ]
+      in
+      let len = String.length body in
+      let () = assert (fh.len = Int64.of_int len) in
+      let hdr_len_without_mask = 2 +
+        (if len <= 125
+         then 0
+         else if len < 0x10000
+         then 2
+         else 8
+        )
+      in
+      let hdr_len = hdr_len_without_mask + (if masking then 4 else 0)
+      in
+      let module String = Bytes in
+      let msg = String.make (hdr_len + len) '\x00' in
+      let () = msg.[0] := Char.chr (0x80 lor opcode_int) in
+      let () =
+        let masking_bit n = if masking then 0x80 lor n else n in
+        if len <= 125
+        then
+          ( msg.[1] := Char.chr (masking_bit len)
+          )
+        else if len < 0x10000
+        then
+          ( msg.[1] := Char.chr (masking_bit 126)
+          ; msg.[2] := Char.chr (len lsr 8)
+          ; msg.[3] := Char.chr (len land 0xFF)
+          )
+        else
+          ( msg.[1] := Char.chr (masking_bit 127)
+          ; msg.[2] := Char.chr  (len lsr 56)
+          ; msg.[3] := Char.chr ((len lsr 48) land 0xFF)
+          ; msg.[4] := Char.chr ((len lsr 40) land 0xFF)
+          ; msg.[5] := Char.chr ((len lsr 32) land 0xFF)
+          ; msg.[6] := Char.chr ((len lsr 24) land 0xFF)
+          ; msg.[7] := Char.chr ((len lsr 16) land 0xFF)
+          ; msg.[8] := Char.chr ((len lsr  8) land 0xFF)
+          ; msg.[9] := Char.chr ( len         land 0xFF)
+          )
+      in
+      let () =
+        if masking
+        then
+          let rec mask_body src_i dst_i =
+            if src_i = len
+            then ()
+            else
+              ( msg.[dst_i] := Char.chr
+                 (     (Char.code body.[src_i])
+                  lxor (Char.code (mkey.[src_i land 0b11]))
+                 )
+              ; mask_body (src_i + 1) (dst_i + 1)
+              )
+          in
+            ( String.blit mkey 0 msg hdr_len_without_mask 4
+            ; mask_body 0 hdr_len
+            )
+        else
+          String.blit body 0 msg hdr_len len
+      in
+      (* IO.write (in lwt and in direct io) is atomic *)
+      let () =
+        if ad then amdbg "Ws.send: %S" (Bytes.hexdump ~style:`Line msg) else ()
+      in
+      IO.write_fd sock.fd msg
+    ;
+
+    value send sock (opc : ws_opcode_out) body =
+      if sock.close_sent
+      then IO.error Output_closed
+      else
+        let opc =
+          match opc with
+          [ `Ctrl 0x8 ->
+              ( sock.close_sent := True
+              ; `Close None (* won't be used in send *)
+              )
+          | `Ctrl _ | `Nonctrl _ | `Text | `Binary | `Ping | `Pong as x -> x
+          ]
+        in
+        let fh =
+          { fin = True
+          ; opcode = (opc :> ws_opcode)
+          ; len = Int64.of_int (String.length body)
+          ; masking_key =
+              if sock.is_serverside
+              then ""
+              else random_bytes masking_key_random_state 4
+          }
+        in
+        send_gen sock fh body
+    ;
+
+    value is_valid_close_code_to_send c =
+      not (c < 1000 || c >= 5000 || c = 1005 || c = 1006 || c = 1015)
+    ;
+
+    value close sock opt_code_data =
+      if sock.close_sent
+      then
+        IO.error Output_closed
+      else
+        let body =
+          match opt_code_data with
+          [ None -> ""
+          | Some (c, d) ->
+              if not (is_valid_close_code_to_send c)
+              then invalid_arg "Ws.close: code must be in [1000 .. 5000) \
+                                except 1005, 1006, 1015 (code = %i)" c
+              else
+                let dlen = String.length d in
+                if dlen > (125 - 2)
+                then
+                  invalid_arg "Ws.close: All control frames MUST have a \
+                               payload length of 125 bytes or less \
+                               (including 2-byte status code)."
+                else
+                  let module String = Bytes in
+                  let b = String.make (dlen + 2) '\x00' in
+                  ( Bytes.blit d 0 b 2 dlen
+                  ; b.[0] := Char.chr (c lsr 8)
+                  ; b.[1] := Char.chr (c land 0xFF)
+                  ; b
+                  )
+          ]
+        in
+        send sock (`Ctrl 0x8) body >>% fun () ->
+        IO.close_fd sock.fd
+    ;
+
+    value it_close sock opt_code : I.iteratee char unit =
+      if sock.close_sent
+      then I.return ()
+      else
+        match opt_code with
+        [ Some c when is_valid_close_code_to_send c ->
+            I.gather_to_string >>= fun body ->
+            I.lift (close sock (Some (c, body)))
+        | None | Some _ ->
+            (* "Some" for cases when there is an artificial code sent to
+               service iteratee.
+             *)
+            I.lift (close sock None)
+        ]
+    ;
+
+    (********************************************************)
+
+    value (continue_ws_resp : string -> response) =
+      let accept handshake =
+        let open Cryptokit in
+        transform_string
+          (Base64.encode_compact_pad ())
+          (hash_string
+             (Hash.sha1 ())
+             (handshake ^ ws_guid)
+          )
+      and other_headers =
+        [ ("Upgrade", "websocket")
+        ; ("Connection", "Upgrade")
+        ]
+      in
+        fun handshake ->
+          { rs_status_code = 101
+          ; rs_http_ver = (1, 1)
+          ; rs_reason_phrase = "Switching Protocols"
+          ; rs_body = No_body
+          ; rs_headers =
+              { rs_all =
+                  [    ("Sec-WebSocket-Accept", accept handshake)
+                    :: other_headers
+                  ]
+              }
+          }
+    ;
+
+
+    (* TODO: переписать с прямым доступом к массиву. *)
+    value it_uint16_as_64_netord =
+      I.head >>= fun b0 ->
+      I.head >>= fun b1 ->
+      I.return & Int64.of_int (((Char.code b0) lsl 8) lor (Char.code b1))
+    ;
+
+    (* TODO: переписать с прямым доступом к массиву. *)
+    value ws_it_close_code ~masking_key =
+      I.head >>= fun b0 ->
+      I.head >>= fun b1 ->
+      I.return
+        (if String.length masking_key = 0
+         then
+               ((Char.code b0) lsl 8)
+           lor (Char.code b1)
+         else
+               (((Char.code b0) lxor (Char.code masking_key.[0])) lsl 8)
+           lor ((Char.code b1) lxor (Char.code masking_key.[1]))
+        )
+    ;
+
+
+    exception Ws_error of string
+    ;
+
+    value ws_error msg = I.throw_err (Ws_error msg)
+    ;
+
+    exception Ws_service_error of exn
+    ;
+
+
+    (* TODO: переписать с прямым доступом к массиву и _прямо_. *)
+    value ws_len64 =
+      let h = I.head >>= fun c -> I.return & Int64.of_int (Char.code c) in
+      h >>= fun i0 ->
+      let open Int64 in
+      let ( lor ) = Int64.logor
+      and ( land ) = Int64.logand
+      and ( lsl ) = Int64.shift_left in
+      if (i0 land 0x80L) == 0x80L
+      then ws_error "ws_len64: 'the most significant bit MUST be 0' [rfc6455]"
+      else
+        h >>= fun i1 ->
+        h >>= fun i2 ->
+        h >>= fun i3 ->
+        h >>= fun i4 ->
+        h >>= fun i5 ->
+        h >>= fun i6 ->
+        h >>= fun i7 ->
+        I.return
+          (      i7
+            lor (i6 lsl  8)
+            lor (i5 lsl 16)
+            lor (i4 lsl 24)
+            lor (i3 lsl 32)
+            lor (i2 lsl 40)
+            lor (i1 lsl 48)
+            lor (i0 lsl 56)
+          )
+    ;
+
+    value ws_it_masking_key () =
+      I.head >>= fun a0 ->
+      I.head >>= fun a1 ->
+      I.head >>= fun a2 ->
+      I.head >>= fun a3 ->
+      let module String = Bytes in
+      let k = String.make 4 '\x00' in
+      ( k.[0] := a0
+      ; k.[1] := a1
+      ; k.[2] := a2
+      ; k.[3] := a3
+      ; I.return k
+      )
+    ;
+
+
+    value it_empty_string = I.return ""
+    ;
+
+    value ws_it_frame_header =
+      I.head >>= fun b0 ->
+      I.head >>= fun b1 ->
+      let fin = ((Char.code b0) land 0x80) == 0x80 in
+      let opcode_int = (Char.code b0) land 0x0F in
+      let mask = ((Char.code b1) land 0x80) == 0x80 in
+      let len1 = ((Char.code b1) land 0x7F) in
+      (if len1 = 126
+       then it_uint16_as_64_netord
+       else if len1 = 127
+       then ws_len64
+       else I.return (Int64.of_int len1)
+      ) >>= fun len ->
+      (if mask
+      then
+        ws_it_masking_key ()
+      else
+        it_empty_string
+      ) >>= fun masking_key ->
+      (if opcode_int >= 0x8
+       then
+         if not fin
+         then
+           ws_error "All control frames MUST NOT be fragmented."
+         else if len > 125L
+         then
+           ws_error "All control frames MUST have a payload length \
+                     of 125 bytes or less."
+         else
+           match opcode_int with
+           [ 0x8 ->
+               if len = 0L
+               then I.return & `Close None
+               else if len = 1L
+               then ws_error
+                 "Close: 'If there is a body, the first two bytes of \
+                  the body MUST be a 2-byte unsigned integer', \
+                  but body has only 1 byte."
+               else
+                 ws_it_close_code ~masking_key >>= fun status_code ->
+                 let () =
+                   if ad
+                   then amdbg "ws_it_frame_header: Close: status_code = %i"
+                     status_code
+                   else ()
+                 in
+                 let () =
+                   if mask
+                   then
+                     (* rotate mask by 2 bytes, since they were read by
+                        ws_it_close_code
+                      *)
+                     let m0 = masking_key.[0]
+                     and m1 = masking_key.[1]
+                     and m2 = masking_key.[2]
+                     and m3 = masking_key.[3]
+                     in
+                       ( Bytes.set masking_key 0 m2
+                       ; Bytes.set masking_key 1 m3
+                       ; Bytes.set masking_key 2 m0
+                       ; Bytes.set masking_key 3 m1