Anonymous avatar Anonymous committed 9ef021f

.

Comments (0)

Files changed (3)

       ]
     ;
 
-    type context 'i =
-      < send : ! 'i2 . process 'i2 -> 'i2 -> IO.m unit
-      ; send_group : ! 'i2 . group 'i2 -> 'i2 -> IO.m unit
-        (* полиморфность по 'i2 -- факт того, что можно слать сообщения
-           кому угодно, если знаешь тип и можешь сформировать сообщение.
-         *)
-      ; continue : ('i -> IO.m unit) -> unit -> IO.m unit
-      ; me : process 'i
-      ; my_group : group 'i
-      >
-    ;
+    value notimpl msg = failwith
+      (Printf.sprintf "function %S: not implemented" msg);
+
+    value send proc msg = notimpl "send";
+
+    value send_group = notimpl "send_group";
 
     (* "процесс" -- это функция, позволяющая послать процессу сообщение.
        в случае локальной передачи сообщений этого хватит.
 
     type server 'i 'o = process ('i * dest 'o);
 
+    type context 'i =
+      < continue : ('i -> IO.m unit) -> unit -> IO.m unit
+      ; me : process 'i
+      ; my_group : group 'i
+      >
+    ;
+
     (* диспетчер сообщений -- то, что содержит пользовательский код,
-       но обёрнуто в абстрактный тип. *)
+       но обёрнуто в абстрактный тип.
+       указать/накодить, что будет частичное применение сначала
+       контекста.
+     *)
     type dispatcher 'i = context 'i -> 'i -> IO.m unit;
 
     (* по идее, процесс за'loop'ляется так:
 
     value create_process_group () =
       { gid = Counter.next g_counter
-(*
       ; children = []
-*)
       }
     ;
 
     value create_process group _disp =
-      let pid = Counter.next p_counter in
-      ( (* group.children := [pid :: group.children ] *) ()
-      ; () (* зарегистрировать disp *)
-      ; `Local { lpid = pid }
+      (* let pid = Counter.next p_counter in *)
+      let mq = IO.Mq.create_empty () in
+      let process_ref = ref None in
+      let get_my_process = lazy (
+        match process_ref.contents with
+        [ None -> assert False
+        | Some p -> p
+        ])
+      in
+      let schedule_continuation cont =
+        let () = IO.run_and_ignore_result
+          (IO.Mq.take mq >>= fun msg -> ((cont msg) : IO.m unit)) in
+        IO.return ()
+      in
+      let rec context =
+        object
+          method me = Lazy.force get_my_process;
+          method my_group = group;
+          method continue cont () =
+            io_of_cont cont
+            (* todo: продумать, что делать с теми процессами, которые
+               добавили сообщение в очередь, но не дождались continue
+               (т.е. процесс вышел).
+             *)
+          ;
+        end
+      and process msg =
+        IO.Mq.put mq msg >>= ..
+      and (waiter, wakener) = IO.task ()
+      in
+
+    type process 'i = ('i -> IO.m unit);
+
+      ( group.children := [process :: group.children ]
+      ; process_ref := Some process
+      ; process
       )
     ;
 
-(*
-    value call : server 'i 'o -> 'i -> IO.m 'o;
+
+    (* синхронный вызов *)
+    value (call : server 'i 'o -> 'i -> IO.m 'o) _ _ = notimpl "call";
+
 
     (* нужно конкретно для частной задачи, поэтому не уверен в ценности:
        функция шлёт сообщения группе и дожидается ответа каждого из серверов
        этой группы, затем возвращает массив ответов *)
-    value call_group
-      : server_group 'i 'o -> 'i -> res (list 'o);
-    (* если будет call, то можно будет выразить это как отсылку call-запросов
-      и приём call-ответов *)
+
+    value (call_group
+      : server_group 'i 'o -> 'i -> res (list 'o)
+      ) _ _ = notimpl "call_group"
+    ;
 
     (* ответы на сообщения будут производиться через передаваемый
       в функцию контекст, где будут функции send для простых процессов
       и reply для call-подобных. *)
 
-    value call_group _ = failwith "not implemented"
-    ;
-
-*)
-
-
   end
 ;
  :
   sig
 
+    type dest 'o =
+      [= `Local of ('o -> IO.m unit)
+      ]
+    ;
+
+    value dest_put : dest 'o -> 'o -> IO.m unit
+    ;
+
     type context 'i =
-      < send : ! 'i2 . process 'i2 -> 'i2 -> IO.m unit
-      ; send_group : ! 'i2 . group 'i2 -> 'i2 -> IO.m unit
-       (* в send и send_group полиморфность по 'o2 достигается тем,
-          что он не должен использоваться в теле send.
-          полиморфность по 'i2 -- факт того, что можно слать сообщения
-          кому угодно, если знаешь тип и можешь сформировать сообщение.
-        *)
-      ; exit : unit -> IO.m unit
-
+      < continue : ('i -> IO.m unit) -> unit -> IO.m unit
       ; me : process 'i
       ; my_group : group 'i
       >
     value take = take;
   end
 ;
+
+
+value run_and_ignore_result = Lwt.ignore_result;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.