Source

amall / tests / test_websocket_service.ml

Full commit
open Am_All
;
open Amall_types
;

module IO = IO_Lwt;

module I = Iteratees.Make(IO);

(*
module II : sig exception EIO of (exn * It_Types.place); end = I;
*)

module S = Amall_http_service.Service(IO)(I);
module Ws = S.H.Ws;


value () = dbg "point1"
;

value (my_listener, _http_root, ws_root) = S.listener_create (`Inet_any 12346)
;

value () = dbg "point2"
;

value my_endpoint = (ws_root, `Service ([""; "localhost:12346"], "wstest"))
;

value () = dbg "point3"
;

open Amall_http;

value () = dbg "point4"
;

open Printf;

open Cd_All;

open I.Ops;

value srv_num = ref 0;


type in_msg =
  [= `Chat of string
  ]
;

type chat_out_msg =
  { com_text : string
  ; com_dt : int
  }
;

type out_msg =
  [= `Chat of chat_out_msg
  |  `Error of string
  ]
;

open Cdt
;

value ti_in_msg : #ti in_msg = new ti_sum_type
  ~constr:
  [| ti_ctr_variant1 "chat" ti_string & fun s -> `Chat s |]
  (fun _ -> failwith "not destructible")
;

value ti_chat_out_msg : #ti chat_out_msg = new ti_record
  (fun [ { com_text ; com_dt } ->
     [| ("text", ubox ti_string com_text)
      ; ("dt", ubox ti_int com_dt)
      |]
   ]
  )
;

value ti_out_msg : #ti out_msg = new ti_sum_type
  (fun
   [ `Chat com -> ti_variant "chat" [| ubox ti_chat_out_msg com |]
   | `Error str -> ti_variant "error" [| ubox ti_string str |]
   ]
  )
;

value () =
  let open Cd_Json in
  ( ti_add_json ti_in_msg ()
  ; ti_add_json ti_chat_out_msg ()
  ; ti_add_json ti_out_msg ()
  )
;


value clients : Hashtbl.t int Ws.ws_out_socket = Hashtbl.create 17
;


value my_func = Partapp3.make
  (fun segpath _rq outws ->
     let txt = sprintf "[%s]" &
       String.concat " ; " &
       List.map (sprintf "%S") &
       segpath
     in
     let n = srv_num.val in let () = incr srv_num in
     let () = dbg "service: [%i] got websocket request on %s" n txt in
     let () = Hashtbl.add clients n outws in
     fun opcode_in ->
       let () = dbg "service: [%i] entering" n in
       match opcode_in with
       [ `Text ->
           I.gather_to_string >>= fun s ->
           let () = dbg "service: [%i] got text frame: %S" n s in
           I.lift
             (IO.catch
                (fun () ->
                   let in_msg = Cd_Json.from_json_string ti_in_msg s in
                   match in_msg with
                   [ `Chat text ->
                       let om = `Chat
                         { com_text = text
                         ; com_dt = int_of_float (Unix.time ())
                         }
                       in
                       let om_txt = Cd_Json.to_json_string ti_out_msg om in
                       let () = Hashtbl.iter
                         (fun sn s ->
                            IO.run_and_ignore_result
                              (if Ws.is_close_sent s
                               then
                                 IO.return ()
                               else
                                 let () = dbg "service: [%i]: send msg to %i"
                                 n sn in
                                 Ws.send s `Text om_txt
                              )
                         )
                         clients
                       in
                         IO.return ()
                   ]
                )
                (let send_error msg =
                   Ws.send outws `Text &
                   Cd_Json.to_json_string ti_out_msg &
                   `Error msg
                 in
                 fun
                 [ Cd_Json.Jt.Json_error msg ->
                     send_error & "json error: " ^ msg
                 | e ->
                     send_error & Printexc.to_string e
                 ]
                )
             ) >>= fun () ->
           I.return ()
       | `Close opt_code ->
           let () = dbg "service: [%i] got `Close (%s)" n
             (match opt_code with
              [ None -> "None"
              | Some c -> sprintf "Some %i" c
              ]
             )
           in
           let () = Hashtbl.remove clients n in
           Ws.it_close outws opt_code
       | _ ->
           I.gather_to_string >>= fun s ->
           let () = dbg "service: [%i] got non-text non-close frame: %S" n s in
           I.return ()
       ]
  )
;

value () = dbg "point5"
;

value () = (dbg "pre"; S.mount_websocket my_endpoint my_func; dbg "post")
;

value () = let open Sys in ignore (signal sigpipe Signal_ignore)
;

value () = S.listener_run my_listener
;