Commits

Anonymous committed e2a4a71

functors failed

  • Participants
  • Parent commits 1c43609

Comments (0)

Files changed (6)

-type program_exec_type =
-  [= `Main | `Per_process ]
-;
-
-
 module type COUNTER
  =
   sig
 (* Всё, что зависит от конкретной реализации ввода-вывода,
    закатываем в функтор: *)
 
-module Make (IO : Parvel_types.MonadIO)
- =
+module Make : Parvel_types.F = functor (IO : Parvel_types.MonadIO) ->
   struct
 
     (* локальный pid в пределах процесса *)
     ;
 
     type addrt 'o =
-      { aty : typeinfot 'o
-      ; asend : process_message_req 'o -> IO.m unit  (* cheating! *)
+      { (* aty : typeinfot 'o
+      ;*)
+
+        asend : process_message_req 'o -> IO.m unit  (* cheating! *)
+
       }
     ;
 
 
     value send proc msg = proc.p_send_msg msg;
 
+    value addrt_of_proc proc =
+      { asend = proc.p_send_msg
+      }
+    ;
+
     value (sendt : addrt 'o -> process_message_req 'o -> IO.m unit)
      addrt msg =
       addrt.asend msg
     ;
 
-    value addrt_of_proc ty proc =
-      { aty = ty
-      ; asend = proc
-      }
-    ;
-
     value rec io_iter f lst =
       match lst with
       [ [] -> IO.return ()
      `Per_cpu -- ... on this CPU core ...
  *)
 
-type program_exec_type =
-  [= `Main | `Per_process ]
-;
-
 
 (* Всё, что зависит от конкретной реализации ввода-вывода,
    закатываем в функтор: *)
 
-module Make (IO : Parvel_types.MonadIO)
- :
-  sig
-
-    (* registering programs for execution in main / workers *)
-
-    (* [register_program exec_type program_name program_body]
-       registers program for later execution
-     *)
-
-    value register_program :
-      program_exec_type ->
-      string ->
-      (unit -> IO.m unit) ->
-      unit
-    ;
-
-    value run_programs : unit -> IO.m unit
-    ;
-
-    (*********************************************************)
-
-    type pid
-    ;
-
-    (* статус завершения и ошибки процесса: *)
-
-    type process_exit_status =
-      [ PE_Normal
-      | PE_Error of process_exit_error
-      ]
-    and process_exit_error =
-      [= `Exn of exn
-      |  `Exn_string of string
-      ]
-    ;
-
-    (*********************************************************)
-
-    type process_command_req =
-      [= `Shutdown
-      |  `Exited of (pid * process_exit_status)
-      ]
-    ;
-
-    type process_message_req 'i =
-      [ Msg of 'i
-      | Cmd of process_command_req
-      ]
-    ;
-
-    type call_resp_error =
-      [= `Exn of string
-      ]
-    ;
-
-    type process 'i;
-
-    (* группа процессов *)
-    type process_group 'i;
-
-    type server_group 'i 'o;
-
-    type dest 'o
-    ;
-
-    (* вызов = аргументы + место для результата *)
-    type call 'i 'o = ('i * dest 'o)
-    ;
-
-    (* тип не абстрактный, так как пока это самое простое для
-       обеспечения возможности передавать синхронные вызовы другим
-       процессам/серверам. *)
-    type server 'i 'o = process (call 'i 'o)
-    ;
-
-
-    (***************************************************************)
-
-    (* пока абстрактный тип, значения создаются функциями ниже. *)
-
-    type process_result 'i;
-
-    (* ответ сервера: *)
-
-    type call_resp 'o =
-      [ CR_Ok of 'o
-      | CR_Error of call_resp_error
-      ]
-    ;
-
-    (* диспетчер сообщений -- то, что содержит пользовательский код,
-       непосредственно обрабатывающий сообщения.
-       В случае процессов и серверов типы разные сознательно.
-       Кроме того, отличается логика: сервер после обработки сообщения
-       будет этим же кодом обрабатывать следующее сообщение, тогда
-       как процесс определяет время своей жизни и продолжение обработки
-       сообщений через возврат значений типа process_result 'i
-       функциями process_{exit,continue}.
-     *)
-
-    type dispatcher 'a 'b = 'a -> IO.m 'b
-    ;
-
-    type process_dispatcher 'i =
-      dispatcher (process_message_req 'i) (process_result 'i)
-    ;
-
-    type server_dispatcher 'i 'o =
-      dispatcher 'i (call_resp 'o)
-    ;
-
-    (* фабрика -- функция, выполняющаяся один раз в IO-потоке,
-       подготавливающая окружение для непосредственно диспетчера.
-       (даёшь энтерпрайз!)
-       Фабрике на вход даётся контекст.
-       Видимо, надо будет привязывать к этому контексту функции
-       вида "at_exit", чтобы закрывать всё, что наоткрывали.
-       Подумать, может стоит заэксплуатировать with-идиому как-то
-       (но вроде никак), либо WithM (которое {cons:...;fin:...} ).
-     *)
-
-    type context
-    ;
-
-    (* установить лимиты (см. Lwt_mq_parvel) на очередь, привязанную
-       к текущему контексту.
-       Ахтунг!  Из одной очереди может читать сразу много процессов,
-       учитывайте это.  Если не нравится -- создавайте "прокси"
-       перед нужным процессом.
-     *)
-
-    value mq_set_block_limit : context -> int -> unit;
-    value mq_set_fail_limit : context -> int -> unit;
-
-
-    type factory 'i 'o = context -> IO.m (dispatcher 'i 'o)
-    ;
-
-    type process_factory 'i =
-      factory (process_message_req 'i) (process_result 'i)
-    ;
-
-    type server_factory 'i 'o =
-      factory 'i (call_resp 'o)
-    ;
-
-    (* если инициализация не нужна, можно сделать всё без неё,
-       вызвав [noinit your_dispatcher]: *)
-
-    value noinit : dispatcher 'i 'o -> factory 'i 'o
-    ;
-
-    (* процесс завершается: *)
-
-    value process_exit : unit -> IO.m (process_result 'i)
-    ;
-
-    (* процесс продолжает своё выполнение с указанного диспетчера: *)
-
-    value process_continue :
-      process_dispatcher 'i -> IO.m (process_result 'i)
-    ;
-
-    (* процесс завершается с ошибкой: *)
-
-    value process_exit_error : process_exit_error -> IO.m (process_result 'i)
-    ;
-
-    (* сервер возвращает результат: *)
-
-    value server_return : 'o -> IO.m (call_resp 'o)
-    ;
-
-    (* сервер возвращает ошибку: *)
-
-    value server_error : call_resp_error -> IO.m (call_resp 'o)
-    ;
-
-    (***************************************************************)
-
-    (* переименовать в reply/replyto? *)
-    value dest_put : dest 'o -> call_resp 'o -> IO.m unit
-    ;
-
-    value create_process_group : unit -> IO.m (process_group 'i);
-
-    value create_process :
-      process_group 'i ->
-      process_factory 'i ->
-      IO.m (process 'i);
-
-    value create_server_group : unit -> IO.m (server_group 'i 'o);
-
-    value create_server :
-      server_group 'i 'o ->
-      server_factory 'i 'o ->
-      IO.m (server 'i 'o);
-
-    value send : process 'i -> process_message_req 'i -> IO.m unit;
-
-    value send_group : process_group 'i -> process_message_req 'i -> IO.m unit;
-
-    value call : server 'i 'o -> 'i -> IO.m (call_resp 'o);
-
-    exception ECall of call_resp_error;
-
-    (* same as [call], but throws IO error [Call] on error: *)
-
-    value call_io : server 'i 'o -> 'i -> IO.m 'o;
-
-
-    value call_group : server_group 'i 'o -> 'i -> IO.m (array 'o);
-
-
-    (* switch-servers *)
-
-    type switch 'k 'i 'o;
-
-    exception Switch_error of string
-    ;
-
-    value create_switch :
-      ?key_cmp : ('k -> 'k -> int) ->
-      unit ->
-      IO.m (switch 'k 'i 'o)
-    ;
-
-    value add_worker : switch 'k 'i 'o -> 'k -> server 'i 'o -> IO.m unit
-    ;
-
-    type switch_resp_call 'o =
-      [ SRC_Res of 'o
-      | SRC_No_worker_for_key
-      | SRC_Call_error of call_resp_error
-      ]
-    ;
-
-    value call_switch : switch 'k 'i 'o ->
-      'k -> 'i -> IO.m (switch_resp_call 'o)
-    ;
-
-    value call_switch_io : switch 'k 'i 'o -> 'k -> 'i -> IO.m 'o
-    ;
-
-    value switch_keys : switch 'k 'i 'o -> IO.m (list 'k)
-    ;
-
-
-    (* servers with state.
-       passing state in and out of server function,
-       but this implies no parallel requests processing can be made.
-     *)
-
-    value add_state_to_server :
-      server_factory ('i * 's) ('o * 's) ->
-      's ->
-      server_factory 'i 'o
-    ;
-
-    (* processes with parallel execution, with strict limit
-       of concurrently executing processes.
-     *)
-
-    value process_limit :
-      ?nmax:int ->
-      ?dbg:(string -> unit) ->
-      process_factory 'i ->
-      process_factory 'i
-    ;
-
-  end
+module Make : Parvel_types.F
 ;
 Parvel_types
 Parvel_ops
 Lwt_mvar_parvel
+Parvel_lwt
 
 (* "ports" -- OS processes with message passing interface. *)
 
-module Port
+module Port (Parvel_F : Parvel_types.F)
  :
   sig
 
       ]
     ;
 
-    value run : port_command -> addrt port_out -> IO.m (process port_in)
+    module IO : Parvel_types.MonadIO;
+
+    module Parvel
+     :
+      sig
+        type process 'i;
+        type addrt 'o;
+      end
+    ;
+
+    value run
+     : port_command ->
+       Parvel.addrt port_out ->
+       IO.m (Parvel.process port_in)
     ;
 
   end
  =
   struct
 
+    module IO = Parvel_IO;
+    module Parvel = Parvel_F (IO);
+    open Parvel;
+
+    value ( >>= ) m f = IO.bind f m
+    ;
+
+    value return_unit = IO.return ()
+    ;
+
+
     type port_command =
       [ Shell of string
       ]
 
     value run command addrt =
 
-      create_process_group () >>= fun group ->
+      create_process_group () >>= fun rgroup ->
+      create_process_group () >>= fun sgroup ->
 
       let lwt_command =
         match command with
 
       let sender_factory _ctx =
         IO.return sender
-        where rec sender () =
+        where rec sender msg =
+         match msg with
+         [ Msg () ->
           let send_ch mapf ch =
             let buf = String.make bufsz '\x00' in
             let rec loop () =
                    ; (send_ch (fun x -> Msg (Stderr x))
                        process_full#stderr)
                    ; (process_full#status >>= fun st ->
-                      sendt addrt (Exited st)
+                       sendt addrt (Msg (Exited st))
                      )
                    ]
                  >>= fun () ->
                  process_exit ()
               )
-              (fun e -> process_exit_error e)
+              (fun e -> process_exit_error (`Exn e))
+         | Cmd (_ : [> ]) ->
+             process_exit ()
+         ]
       in
 
       let receiver_factory _ctx =
         IO.return receiver_loop
         where rec receiver_loop msg =
           (match msg with
-           [ Stdin txt ->
+           [ Msg (Stdin txt) ->
                IO.write process'_stdin txt
-           | Kill signum ->
+           | Msg (Kill signum) ->
                let () = process_full#kill signum in
                return_unit
+           | Cmd (_ : [> ]) ->
+               return_unit
            ]
           ) >>= fun () ->
           process_continue receiver_loop
       in
 
-      create_process group receiver_factory >>= fun rp ->
-      create_process group sender_factory >>= fun _sp ->
+      create_process rgroup receiver_factory >>= fun rp ->
+      create_process sgroup sender_factory >>= fun _sp ->
 
       IO.return rp
     ;
 
   end
 ;
+
+
+type program_exec_type =
+  [= `Main | `Per_process ]
+;
+
+
+module type F = functor (IO : MonadIO) ->
+  sig
+
+    (* registering programs for execution in main / workers *)
+
+    (* [register_program exec_type program_name program_body]
+       registers program for later execution
+     *)
+
+    value register_program :
+      program_exec_type ->
+      string ->
+      (unit -> IO.m unit) ->
+      unit
+    ;
+
+    value run_programs : unit -> IO.m unit
+    ;
+
+    (*********************************************************)
+
+    type pid
+    ;
+
+    (* статус завершения и ошибки процесса: *)
+
+    type process_exit_status =
+      [ PE_Normal
+      | PE_Error of process_exit_error
+      ]
+    and process_exit_error =
+      [= `Exn of exn
+      |  `Exn_string of string
+      ]
+    ;
+
+    (*********************************************************)
+
+    type process_command_req =
+      [= `Shutdown
+      |  `Exited of (pid * process_exit_status)
+      ]
+    ;
+
+    type process_message_req 'i =
+      [ Msg of 'i
+      | Cmd of process_command_req
+      ]
+    ;
+
+    type call_resp_error =
+      [= `Exn of string
+      ]
+    ;
+
+    type process 'i;
+
+    (* группа процессов *)
+    type process_group 'i;
+
+    type server_group 'i 'o;
+
+    type dest 'o
+    ;
+
+    (* вызов = аргументы + место для результата *)
+    type call 'i 'o = ('i * dest 'o)
+    ;
+
+    (* тип не абстрактный, так как пока это самое простое для
+       обеспечения возможности передавать синхронные вызовы другим
+       процессам/серверам. *)
+    type server 'i 'o = process (call 'i 'o)
+    ;
+
+    (***************************************************************)
+
+    type tyname = string
+    ;
+
+    type tyver = int
+    ;
+
+    type typeinfou =
+      < tyname : tyname
+      ; tyver : tyver
+      >
+    ;
+
+    type ty_error =
+      < expected : tyname
+      ; received : tyname
+      >
+    ;
+
+    exception Ty_error of ty_error
+    ;
+
+    exception Deserialize_error of tyname and string
+    ;
+
+    type ty_ver_error =
+      < tyname : tyname
+      ; expected : int
+      ; got : int
+      >
+    ;
+    
+    exception Ty_ver of ty_ver_error;
+
+
+
+    type typeinfot 'a =
+      <
+        tyname : tyname;
+        tyver : tyver;
+        to_string : 'a -> string;
+        to_output :
+          (Substring.t -> IO.m unit) ->
+          'a ->
+          IO.m unit
+        ;
+
+        from_input :
+          tyver ->
+          int ->
+          (Substring.t -> IO.m unit) ->
+          IO.m 'a
+        ;
+
+        (* raises exception on failure, returns deserialized
+           value and the rest of the string on success. *)
+        from_substring
+          : tyver -> Substring.t -> ('a * Substring.t);
+
+        from_string
+          : tyver -> string -> 'a;
+       >
+    ;
+
+    (***************************************************************)
+
+    (* Адреса -- значения, говорящие, куда можно отправить сообщение
+       с определённым типом.
+     *)
+
+    type addrt 'o;
+
+    value addrt_of_proc : process 'o -> addrt 'o
+    ;
+
+    (***************************************************************)
+
+    (* пока абстрактный тип, значения создаются функциями ниже. *)
+
+    type process_result 'i;
+
+    (* ответ сервера: *)
+
+    type call_resp 'o =
+      [ CR_Ok of 'o
+      | CR_Error of call_resp_error
+      ]
+    ;
+
+    (* диспетчер сообщений -- то, что содержит пользовательский код,
+       непосредственно обрабатывающий сообщения.
+       В случае процессов и серверов типы разные сознательно.
+       Кроме того, отличается логика: сервер после обработки сообщения
+       будет этим же кодом обрабатывать следующее сообщение, тогда
+       как процесс определяет время своей жизни и продолжение обработки
+       сообщений через возврат значений типа process_result 'i
+       функциями process_{exit,continue}.
+     *)
+
+    type dispatcher 'a 'b = 'a -> IO.m 'b
+    ;
+
+    type process_dispatcher 'i =
+      dispatcher (process_message_req 'i) (process_result 'i)
+    ;
+
+    type server_dispatcher 'i 'o =
+      dispatcher 'i (call_resp 'o)
+    ;
+
+    (* фабрика -- функция, выполняющаяся один раз в IO-потоке,
+       подготавливающая окружение для непосредственно диспетчера.
+       (даёшь энтерпрайз!)
+       Фабрике на вход даётся контекст.
+       Видимо, надо будет привязывать к этому контексту функции
+       вида "at_exit", чтобы закрывать всё, что наоткрывали.
+       Подумать, может стоит заэксплуатировать with-идиому как-то
+       (но вроде никак), либо WithM (которое {cons:...;fin:...} ).
+     *)
+
+    type context
+    ;
+
+    (* установить лимиты (см. Lwt_mq_parvel) на очередь, привязанную
+       к текущему контексту.
+       Ахтунг!  Из одной очереди может читать сразу много процессов,
+       учитывайте это.  Если не нравится -- создавайте "прокси"
+       перед нужным процессом.
+     *)
+
+    value mq_set_block_limit : context -> int -> unit;
+    value mq_set_fail_limit : context -> int -> unit;
+
+
+    type factory 'i 'o = context -> IO.m (dispatcher 'i 'o)
+    ;
+
+    type process_factory 'i =
+      factory (process_message_req 'i) (process_result 'i)
+    ;
+
+    type server_factory 'i 'o =
+      factory 'i (call_resp 'o)
+    ;
+
+    (* если инициализация не нужна, можно сделать всё без неё,
+       вызвав [noinit your_dispatcher]: *)
+
+    value noinit : dispatcher 'i 'o -> factory 'i 'o
+    ;
+
+    (* процесс завершается: *)
+
+    value process_exit : unit -> IO.m (process_result 'i)
+    ;
+
+    (* процесс продолжает своё выполнение с указанного диспетчера: *)
+
+    value process_continue :
+      process_dispatcher 'i -> IO.m (process_result 'i)
+    ;
+
+    (* процесс завершается с ошибкой: *)
+
+    value process_exit_error : process_exit_error -> IO.m (process_result 'i)
+    ;
+
+    (* сервер возвращает результат: *)
+
+    value server_return : 'o -> IO.m (call_resp 'o)
+    ;
+
+    (* сервер возвращает ошибку: *)
+
+    value server_error : call_resp_error -> IO.m (call_resp 'o)
+    ;
+
+    (***************************************************************)
+
+    (* переименовать в reply/replyto? *)
+    value dest_put : dest 'o -> call_resp 'o -> IO.m unit
+    ;
+
+    value create_process_group : unit -> IO.m (process_group 'i);
+
+    value create_process :
+      process_group 'i ->
+      process_factory 'i ->
+      IO.m (process 'i);
+
+    value create_server_group : unit -> IO.m (server_group 'i 'o);
+
+    value create_server :
+      server_group 'i 'o ->
+      server_factory 'i 'o ->
+      IO.m (server 'i 'o);
+
+    value send : process 'i -> process_message_req 'i -> IO.m unit;
+
+    value sendt : addrt 'o -> process_message_req 'o -> IO.m unit;
+
+    value send_group : process_group 'i -> process_message_req 'i -> IO.m unit;
+
+    value call : server 'i 'o -> 'i -> IO.m (call_resp 'o);
+
+    exception ECall of call_resp_error;
+
+    (* same as [call], but throws IO error [Call] on error: *)
+
+    value call_io : server 'i 'o -> 'i -> IO.m 'o;
+
+
+    value call_group : server_group 'i 'o -> 'i -> IO.m (array 'o);
+
+
+    (* switch-servers *)
+
+    type switch 'k 'i 'o;
+
+    exception Switch_error of string
+    ;
+
+    value create_switch :
+      ?key_cmp : ('k -> 'k -> int) ->
+      unit ->
+      IO.m (switch 'k 'i 'o)
+    ;
+
+    value add_worker : switch 'k 'i 'o -> 'k -> server 'i 'o -> IO.m unit
+    ;
+
+    type switch_resp_call 'o =
+      [ SRC_Res of 'o
+      | SRC_No_worker_for_key
+      | SRC_Call_error of call_resp_error
+      ]
+    ;
+
+    value call_switch : switch 'k 'i 'o ->
+      'k -> 'i -> IO.m (switch_resp_call 'o)
+    ;
+
+    value call_switch_io : switch 'k 'i 'o -> 'k -> 'i -> IO.m 'o
+    ;
+
+    value switch_keys : switch 'k 'i 'o -> IO.m (list 'k)
+    ;
+
+
+    (* servers with state.
+       passing state in and out of server function,
+       but this implies no parallel requests processing can be made.
+     *)
+
+    value add_state_to_server :
+      server_factory ('i * 's) ('o * 's) ->
+      's ->
+      server_factory 'i 'o
+    ;
+
+    (* processes with parallel execution, with strict limit
+       of concurrently executing processes.
+     *)
+
+    value process_limit :
+      ?nmax:int ->
+      ?dbg:(string -> unit) ->
+      process_factory 'i ->
+      process_factory 'i
+    ;
+
+  end
+;
   ; do_run ()
   )
 ;
+
+
+module T4
+ =
+  struct
+
+    module Port = Parvel_lwt.Port(Parvel.Make)
+    ;
+
+value () = P.register_program `Main "tests4/io_main" & fun () ->
+
+  P.create_process_group () >>= fun group_recv ->
+
+  let receiver_factory = P.noinit &
+    receive_msg
+    where rec receive_msg msg =
+      let cont () = P.process_continue receive_msg in
+      let pr_out title txt =
+        IO.printf "T4: receiver: %s: %S\n" title txt >>= cont in
+      match msg with
+      [ P.Msg a ->
+          match a with
+          [ Port.Stdout txt -> pr_out "stdout" txt
+          | Port.Stderr txt -> pr_out "stderr" txt
+          | Port.Exited st ->
+              let pr msg =
+                IO.printf "T4: receiver: exited: %s\n" msg >>= fun () ->
+                P.process_exit ()
+              in
+              match st with
+              [ Unix.WEXITED rc -> pr & sprintf "WEXITED %i" rc
+              | Unix.WSIGNALED sn -> pr & sprintf "WSIGNALED %i" sn
+              | Unix.WSTOPPED sn -> pr & sprintf "WSTOPPED %i" sn
+              ]
+          ]
+      | P.Cmd (_ : [> ]) ->
+          cont ()
+      ]
+  in
+
+  P.create_process group_recv receiver_factory >>= fun receiver_proc ->
+
+  let raddrt = P.addrt_of_proc receiver_proc in
+
+  Port.run (Port.Shell "nc google.com 80") raddrt
+;
+
+  end
+;
+
+
+value () =
+  ( printf "go 4!\n%!"
+  ; do_run ()
+  )
+;