Source

parvel / parvel.mli

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
(*
Параллельный* велосипед.

  * -- уточнение: пока не параллельный, а просто message passing.

  Текущая цель: обеспечить типизированный message passing поверх
lwt (в частности; и любого монадного интерфейса ввода-вывода в общем
случае), позволяющий:
  - порождать легковесные процессы, принимающие сообщения одного типа,
    отправляющие сообщения другого типа,
  - осуществлять ввод-вывод внутри процесса,
  - завершать текущий процесс,
  - слать широковещательные сообщения группе процессов,

  В планах на будущее (реализовать сразу после реализации локальных
процессов, но обязательно держать это в голове при планировании):
  - прозрачно порождать настоящие процессы ОС для распараллеливания
    вычислений между ядрами процессора и хостами, с целью
    максимально-полной утилизации процессорной мощности, памяти,
    дискового ввода-вывода (в том числе учитывая блокирующий
    ввод-вывод).

  В относительно-далёком будущем понадобятся:
  - пользовательская сериализация
  - передача данных через shared memory где можно
  - что-то наподобие work stealing queues.  Как тупой аналог -- рабочие,
    слушающие общий tcp/ip-сокет и берущие задания, отправляемые туда
    через connect+send+close (то есть, кто из рабочих свободен, тот и
    берёт задание.  Не совсем wsq, так как требует синхронного действия
    как рабочего, так и мастера, в отличие от wsq, где обворовываемый
    в неведении.).
*)

(* Есть большие сомнения, что всё заведётся с нижеуказанными типами,
   поэтому, очень вероятно, типы будут меняться.
 *)


(* Текущие концепции: мало, коротко, обрезанно.

   {process,server}_dispatcher -- просто кусок кода, обрабатывающий
     какой-то вход и возвращающий данные, возможно с сайд-эффектами.

   {process,server} -- штука, существующая в программе, обрабатывающая
     то, что ей пошлют через send/call/..

   Из диспетчеров создаются процессы и сервера соответствующими функциями.

   Есть подозрение, что всё или почти всё, что можно сделать, имея
   диспетчеры, можно сделать и с готовыми процессами и серверами.
   Однако пока будут именно такие штуки.
 *)


(* Принятые обозначения:
   'i -- тип принимаемых сообщений,
   'o -- тип отправляемых сообщений.
 *)


(* parameter for [register_program]:
     `Main -- only one instance of this program will run
     `Per_process -- each worker process will run this code
     will be added possibly:
     `Per_host -- either master or the first worker on this host
       will run this code
     `Per_cpu -- ... on this CPU core ...
 *)


open Parvel_types;

module IO : MonadIO with
      type m 'a = Lwt.t 'a
  and type input_channel = Lwt_io.input_channel
  and type output_channel = Lwt_io.output_channel
;


(* registering programs for execution in main / workers *)

type program_exec_type =
  [= `Main | `Per_process ]
;

(* [register_program exec_type program_name program_body]
   registers program for later execution
 *)

value register_program :
  program_exec_type ->
  string ->
  (unit -> IO.m unit) ->
  unit
;

value run_programs : unit -> IO.m unit
;

(*********************************************************)

type pid
;

(* статус завершения и ошибки процесса: *)

type process_exit_status =
  [ PE_Normal
  | PE_Error of process_exit_error
  ]
and process_exit_error =
  [= `Exn of exn
  |  `Exn_string of string
  ]
;

(*********************************************************)

type process_command_req =
  [= `Shutdown
  |  `Exited of (pid * process_exit_status)
  ]
;

type process_message_req 'i =
  [ Msg of 'i
  | Cmd of process_command_req
  ]
;

type call_resp_error =
  [= `Exn_string of string
  ]
;

type process 'i
;

type dest 'o
;

class ti_dest ['a] : [ (Cdt.ti 'b) ] -> [ unit ] ->
  object
    constraint 'a = dest 'b;
    inherit Cdt.tifull ['a];
  end
;


(* вызов = аргументы + место для результата *)
type call 'i 'o = ('i * dest 'o)
;

(* тип не абстрактный, так как пока это самое простое для
   обеспечения возможности передавать синхронные вызовы другим
   процессам/серверам. *)
type server 'i 'o = process (call 'i 'o)
;

(***************************************************************)

type tyname = string
;

type tyver = int
;

type typeinfou =
  < tyname : tyname
  ; tyver : tyver
  >
;

type ty_error =
  < expected : tyname
  ; received : tyname
  >
;

exception Ty_error of ty_error
;

exception Deserialize_error of tyname and string
;

type ty_ver_error =
  < tyname : tyname
  ; expected : int
  ; got : int
  >
;

exception Ty_ver of ty_ver_error;



type typeinfot 'a =
  <
    tyname : tyname;
    tyver : tyver;
    to_string : 'a -> string;
    to_output :
      (Substring.t -> IO.m unit) ->
      'a ->
      IO.m unit
    ;

    from_input :
      tyver ->
      int ->
      (Substring.t -> IO.m unit) ->
      IO.m 'a
    ;

    (* raises exception on failure, returns deserialized
       value and the rest of the string on success. *)
    from_substring
      : tyver -> Substring.t -> ('a * Substring.t);

    from_string
      : tyver -> string -> 'a;
   >
;

(***************************************************************)

(* Адреса -- значения, говорящие, куда можно отправить сообщение
   с определённым типом.
 *)

type addrt 'o;

value addrt_of_proc : process 'o -> addrt 'o
;

value map_addrt : ('o -> 'a) -> addrt 'a -> addrt 'o
;

(***************************************************************)

(* пока абстрактный тип, значения создаются функциями ниже. *)

type process_result 'i;

(* ответ сервера: *)

type call_resp 'o =
  [ CR_Ok of 'o
  | CR_Error of call_resp_error
  ]
;

(* диспетчер сообщений -- то, что содержит пользовательский код,
   непосредственно обрабатывающий сообщения.
   В случае процессов и серверов типы разные сознательно.
   Кроме того, отличается логика: сервер после обработки сообщения
   будет этим же кодом обрабатывать следующее сообщение, тогда
   как процесс определяет время своей жизни и продолжение обработки
   сообщений через возврат значений типа process_result 'i
   функциями process_{exit,continue}.
 *)

type dispatcher 'a 'b = 'a -> IO.m 'b
;

type process_dispatcher 'i =
  dispatcher (process_message_req 'i) (process_result 'i)
;

type server_dispatcher 'i 'o =
  dispatcher 'i (call_resp 'o)
;

(* фабрика -- функция, выполняющаяся один раз в IO-потоке,
   подготавливающая окружение для непосредственно диспетчера.
   (даёшь энтерпрайз!)
   Фабрике на вход даётся контекст.
   Видимо, надо будет привязывать к этому контексту функции
   вида "at_exit", чтобы закрывать всё, что наоткрывали.
   Подумать, может стоит заэксплуатировать with-идиому как-то
   (но вроде никак), либо WithM (которое {cons:...;fin:...} ).
 *)

type context
;

(* установить лимиты (см. Lwt_mq_parvel) на очередь, привязанную
   к текущему контексту.
   Ахтунг!  Из одной очереди может читать сразу много процессов,
   учитывайте это.  Если не нравится -- создавайте "прокси"
   перед нужным процессом.
 *)

value mq_set_block_limit : context -> int -> unit;
value mq_set_fail_limit : context -> int -> unit;

(* добавить финализацию -- функцию, которая выполнится при
   завершении процесса.  Функции выполняются в порядке,
   обратном порядку добавления в контекст. *)
value add_finalizer : context -> (unit -> unit) -> unit;


type factory 'i 'o = context -> IO.m (dispatcher 'i 'o)
;

type process_factory 'i =
  factory (process_message_req 'i) (process_result 'i)
;

type server_factory 'i 'o =
  factory 'i (call_resp 'o)
;

(* если инициализация не нужна, можно сделать всё без неё,
   вызвав [noinit your_dispatcher]: *)

value noinit : dispatcher 'i 'o -> factory 'i 'o
;

(* процесс завершается: *)

value process_exit : unit -> IO.m (process_result 'i)
;

(* процесс продолжает своё выполнение с указанного диспетчера: *)

value process_continue :
  process_dispatcher 'i -> IO.m (process_result 'i)
;

(* процесс завершается с ошибкой: *)

value process_exit_error : process_exit_error -> IO.m (process_result 'i)
;

(* сервер возвращает результат: *)

value server_return : 'o -> IO.m (call_resp 'o)
;

(* сервер возвращает ошибку: *)

value server_error : call_resp_error -> IO.m (call_resp 'o)
;

(***************************************************************)

(* пока так, чтобы долго не думать.  потом разберёмся. *)
value dest_local : unit -> dest 'o
;

value dest : #Cdt.ti 'o -> dest 'o
;


value dest_put : dest 'o -> call_resp 'o -> unit
;

value dest_put_val : dest 'o -> 'o -> unit
;

value dest_get : dest 'o -> IO.m (call_resp 'o)
;

value dest_get_val : dest 'o -> IO.m 'o
;



value create_process :
  process_factory 'i ->
  IO.m (process 'i);

value process_of_server : server_factory 'i 'o -> process_factory (call 'i 'o)
;

value create_server :
  server_factory 'i 'o ->
  IO.m (server 'i 'o);

value send : process 'i -> process_message_req 'i -> IO.m unit;

value sendt : addrt 'o -> process_message_req 'o -> IO.m unit;

value call : server 'i 'o -> 'i -> IO.m (call_resp 'o);

exception ECall of call_resp_error;

(* same as [call], but throws IO error [Call] on error: *)

value call_io : server 'i 'o -> 'i -> IO.m 'o;


(* switch-servers *)

type switch 'k 'i 'o;

exception Switch_error of string
;

value create_switch :
  ?key_cmp : ('k -> 'k -> int) ->
  unit ->
  IO.m (switch 'k 'i 'o)
;

value add_worker : switch 'k 'i 'o -> 'k -> server 'i 'o -> IO.m unit
;

type switch_resp_call 'o =
  [ SRC_Res of 'o
  | SRC_No_worker_for_key
  | SRC_Call_error of call_resp_error
  ]
;

value call_switch : switch 'k 'i 'o ->
  'k -> 'i -> IO.m (switch_resp_call 'o)
;

value call_switch_io : switch 'k 'i 'o -> 'k -> 'i -> IO.m 'o
;

value switch_keys : switch 'k 'i 'o -> IO.m (list 'k)
;


(* servers with state.
   passing state in and out of server function,
   but this implies no parallel requests processing can be made.
 *)

value add_state_to_server :
  server_factory ('i * 's) ('o * 's) ->
  's ->
  server_factory 'i 'o
;

(* processes with parallel execution, with strict limit
   of concurrently executing processes.
 *)

value process_limit :
  ?nmax:int ->
  ?dbg:(string -> unit) ->
  process_factory 'i ->
  process_factory 'i
;


(* [pipe 'i 'o] is constructor that takes [addrt 'o] to write
   "output" messages to this typed address, and returns
   IO-wrapped [process 'i] that receives process messages
   of type ['i], does anything that processes usually do,
   and writes to specified [addrt 'o].
 *)

type pipe 'i 'o = addrt 'o -> IO.m (process 'i)
;


(* stream transform: many to many *)

value stream_transform :
  factory ('i * 's) ((list 'o) * 's) ->
  's ->
  pipe 'i 'o
;


(* combine_pairs: combines messages tagged by `Fst and `Snd variants
   pairwise, in the order of their arrival.
   [`Exit_filling_snd x] is like sending [`Snd x] until all [`Fst _]
   will be processed, then exitting the process.
 *)

value combine_pairs :
  ('a -> 'b -> IO.m unit) ->
  process_factory
    [= `Fst of 'a
    |  `Snd of 'b
    |  `Exit_filling_snd of 'b
    ]
;


(* [server_of_pipe_seq] creates a server from pipe, assuming
   that sequential messages sent by pipe are replies to sequential
   requests sent to pipe.
 *)

value server_of_pipe_seq : pipe 'i 'o -> IO.m (server 'i 'o)
;


(* Some Iteratees support.
   It is a functor, so Parvel has no dependancy on ocaml-iteratees package.
 *)

module type IT_TYPE
 =
  sig

    module Subarray
     :
      sig
        type t 'el;
        value of_string : string -> t char;
        value of_array : array 'el -> t 'el;
      end
    ;

    type err_msg = exn
    ;

    exception Divergent_iteratee of string
    ;

    type stream 'el =
      [ EOF of option err_msg
      | Chunk of Subarray.t 'el
      ]
    ;

    type iteratee 'el 'a =
      [ IE_done of 'a
      | IE_cont of option err_msg
                and (stream 'el ->
                     IO.m (iteratee 'el 'a  * stream 'el))
      ]
    ;

  end
;


exception Iteratee_doesn't_want_data;
exception Iteratee_fails_without_data of exn;

(* [It_pipe(Iteratees).pipe_of_iteratee] is the pipe that receives
   chunks of data, processes it with given iteratee and sends
   results to addressee (once the result is got, stream will be
   processed by the original iteratee once again, until EOF / error).
   If the iteratee is the 'ready' state, IO error
   [Iteratee_doesn't_want_data] will be raised, if the iteratee already
   has error, IO error [Iteratee_fails_without_data its_error] will
   be raised.  If iteratee fails with error when processing stream,
   the process will exit with this error.  (monitors will receive
   [`Exited (_, this_error)]).
 *)

(* todo: посмотреть точно, не вылезут ли ошибки итератов где-нибудь
   так, что будет не process_exit_exn, а что-то другое.
 *)

module It_pipe (I : IT_TYPE)
 :
  sig
    value pipe_of_iteratee :
      I.iteratee 'el 'a -> pipe (I.Subarray.t 'el) 'a
    ;
  end
;


value server_respawn
 : ?max_count:int -> server_factory 'i 'o -> server_factory 'i 'o
;


(* wraps process_factory into new process_factory that allows to
   use "protocols".  Protocol is the type ['p] that has values
   of type [dest 'o] in it.  Protocol typeinfo [Cdt.ti 'p] should
   allow deconstruction of every value of type ['p] to access
   all [dest 'o] in runtime.  This is required in local protocol
   communication to enforce the protocol (to make sure that all
   [dest 'o] are filled (todo), this is required in remote protocol
   communication to pass [dest]s from caller to server and to pass
   [dest]s' values back from server to caller over network, using
   some serialization (todo: processes should declare, based on
   [Cdt.ti 'p], which serialization they accept/require/deny).
   Note: it's bad idea to wrap [process_factory] with [proto_server]
   more than once, but it's hard/clumsy to enforce this constraint
   with types. (or.. any ideas?)
 *)
value proto_server : Cdt.ti 'p -> process_factory 'p -> process_factory 'p
;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.