Wiki

Clone wiki

parvel / concepts_rus

Описание концепций

Общее описание концепций

Мне нужна такая параллельность, чтобы внутри ОС-процесса крутилась куча других процессов, чтобы была куча самих ОС-процессов, в том числе на разных хостах.

Мне нужно actors-style message passing. Это -- общая и годная модель, и один из концептуальных моментов в том, что всё, что делает процесс, он делает в качестве ответа на одно конкретное сообщение.

Я попробовал отделить собственно приём сообщений от реакции на них, чтобы избежать багов, связанных с recv(), и одновременно для возможности вынести recv() куда-нибудь во внешний цикл (для прозрачного общения между процессами, например -- по моим прикидкам, это проще сделать так).

И, независимо от разработчиков actors, я пришёл к их модели.

Есть куча процессов -- будем для общности считать, что они на разных хостах, то есть, никакого shared memory параллелизма. То есть, всё решается в виде сообщений, будь то синхронные "вопрос-ответ", будь то асинхронные "я послал, а вы сами разбирайтесь с сообщением".

Синхронные вызовы и сервера

Синхронные сообщения реализованы так: сообщение представляет собой значение с типом call 'i 'o = ('i * dest 'o), где dest 'o -- "дырка" для значения с типом 'o, я требую её от библиотеки ввода-вывода. В эту дырку можно положить значение с одной стороны, и из неё можно блокирующим образом ожидать значения с другой стороны. (в случае lwt дырка реализована как Lwt.{wait,task}, когда внутри процесса.)

Является ли вызов "синхронным" -- решает и заказчик, и исполнитель. Фактически, полиморфизм делает так, что в случае, когда надо просто передать задание, оно просто передаётся, каким бы оно ни было: задания с типом ('i * dest 'o) тут не исключение.

С синхронными заданиями у меня связано понятие не "процесс", а "сервер", но это тоже такой процесс. Если синхронное задание получит обычный процесс, то он получит сообщение с типом ('i * dest 'o), и он сам решает, что с ним делать -- например, может ответить на него, или передать его другому процессу или серверу.

Например, в комбинаторе-над-серверами, который в зависимости от ключа кидает задание нужному рабочему, внешний процесс-комбинатор не трогает dest 'o, и вообще, передаёт сообщение реальному рабочему, который сам уже отвечает. А может оказаться так, что этот реальный рабочий -- это process_limit, параллельно запускающий не более 10 процессов и передающий этот вызов какому-нибудь из свободных процессов.

Синхронность выделена в отдельный примитив dest 'o вот ещё почему: заказчик результата должен висеть в пределах процесса ОС, дав директиву наподобие "вот, придёт значение в такой-то слот -- десериализуй и разбуди меня, как только". И не требуется открывать по отдельному коннекту на каждый синхронный вызов, и простой "мультиплексор" на входе tcp/ip-потока в процесс сам разберётся, кого будить, и может использовать канал для других целей. (Однако надо предусмотреть возможность сделать несколько каналов, так как бывают большие сообщения и медленная связь одновременно с желанием получать быстрые ответы на небольшие сообщения.)

Передача самих значений

В пределах процесса нет нужды в сериализации, значения любых типов передаются как есть.

В общем случае есть необходимость передавать значения языка (окамла) в сообщениях между процессами. Причём с балансом между "надёжно" и "быстро", с различными форматами сериализации, с различными типами сообщений (для лёгкости отладки и для кое-какой защиты от ошибок типизации -- правда, с участием рук программиста, но это лучше, чем ничего).

Различные форматы сообщений полезны для того, чтобы можно было изобразить "ноду парвела" на любом языке программирования.

Конкретно в окамле есть очень быстрая [де]сериализация (стандартный Marshal), но десериализация может уронить программу. И есть более медленные варианты (через тот же json, biniou, ещё варианты), но более защищённые от ошибки типизации: максимум, что будет -- ошибка "подсунули значение неправильного типа!".

Очередь сообщений процесса

У каждого процесса есть очередь сообщений.

Чтение из очереди сообщений всегда происходит неявным образом -- ровно тогда, когда процесс создан либо когда освободился от текущей работы.

В очередь можно положить сообщение только определённого типа -- того, чем параметризован тип очереди (IO.Mq.t 'a).

С практической точки зрения удобно (есть хорошие свойства), когда очередь обладает свойством упорядоченности сообщений. В качестве полумеры, но тоже практически-полезной, я выбрал "упорядоченность сообщений в пределах отправителя": то есть, если данный процесс отправляет два сообщения последовательно, то в случае, если они таки приходят процессу-получателю, первое отправленное приходит строго до второго отправленного.

Однако, очередь не может хранить слишком много сообщений -- это быстро забьёт память и приведёт к проблемам доступности и безопасности программы. Поэтому есть некоторый лимит на количество сообщений, которые очередь может непосредственно хранить. Если же сообщений в очереди больше, чем этот лимит, процессы, отправляющие сообщения, блокируются, и будут разблокированы только тогда, когда в очередь можно будет что-то записать. Эта блокировка помогает обеспечить упорядоченность по отправителю: если процесс заблокировался, то следующие сообщения будут явно ждать, пока это сообщение не уйдёт. Данный лимит называется "block_limit" (лимит количества сообщений, хранимых в очереди так, что отправитель не блокируется).

Однако, заблокированные отправители тоже занимают ресурсы (память, процессор), поэтому неразумно было бы не ограничивать их тоже. Существует лимит на количество заблокированных процессов, после которого отправитель, пытающийся отправить сообщение, не отправляет его и даже не блокируется, а получает IO-ошибку IO.Mq.Full. Этот лимит называется "fail_limit".

todo: для надёжных вещей может сделать put_blocking такой, чтобы он не выдавал Full, а висел независимо от fail_limit?

Для удобства реализации process_limit оказалось, что удобно сделать так, чтобы можно было создавать процессы с общей очередью сообщений, где очередное сообщение читает один из процессов, ничего не делающих в данный момент.

Однако, общие очереди сообщений ведут к следующим особенностям:

  1. параметры очереди едины, и один процесс, меняющий параметры "своей" очереди, меняет их для общей очереди процессов, читающих с ним из одной очереди.
  2. послать сообщение именно своему процессу теперь стало труднее: либо нужно указывать в сообщении, какому точно процессу оно шлётся, либо нужно запустить нужный код напрямую, без посылки сообщения.
  3. послать сообщение конкретному процессу стало невозможно. Однако это и не нужно: в тех случаях, когда создаются процессы с общей очередью сообщений, приемлемо, чтобы сообщение попало к любому из них, а функции для создания процессов/серверов с уже имеющейся очередью не экспортируются наружу, то есть, вполне можно уследить за тем, чтобы этот принцип не нарушался.

Фабрики / Factories

Часто бывает так, что процесс/сервер требует какой-либо инициализации перед непосредственно обработкой сообщений.

В ML-стиле подобные вопросы решаются обычно так: инициализация выполняется путём создания let-привязок, видимых непосредственно обработчиком, содержащих ресурсы -- предварительно-посчитанные значения, открытые файлы, хендлы БД, сетевые соединения.

todo: применить with-идиому или её аналог с {con;fin} для освобождения ресурсов при выходе процесса

То есть, инициализация сама по себе должна содержать сайд-эффекты, как и обработчик сообщений. Поэтому выродился такой некрасивый тип, однако, позволяющий решать все эти задачи: context -> IO.m (process_message_req 'i -> IO.m (process_result 'o))

Если же инициализация не нужна, можно использовать комбинатор noinit, который берёт диспетчер сообщений и возвращает фабрику.

Мониторы

Текущий процесс A может "мониторить" любой другой процесс B, чей pid (это в будущем, а пока -- "чей process _") ему известен.

В случае выхода процесса B процесс A получает сообщение вида Cmd (`Exited (process_b, status)), где status показывает, был ли процесс завершён нормально, либо вышел с ошибкой (в этом случае ошибка будет передана в вариантном типе). Также смотрите раздел про передачу ошибок в целом.

(todo: видимо, стоит сделать атомарное создание процесса с его мониторингом, так как процесс может выйти сразу же, а мониторящий процесс будет его ждать вечно.)

Передача ошибок

В окамле "исключения не сериализуются". Это -- сознательное дизайн-решение авторов языка, но нам с этим как-то надо жить.

В случае выхода мониторимого процесса ошибка передаётся в значениях с типом [= `Exn of exn | `Exn_str of string ], позволяя в случае локальных процессов видеть нормальное исключение, а в случае удалённых процессов -- хотя бы какую-нибудь текстовую диагностику этой ошибки.

В случае вызовов (call 'i 'o) ошибка передаётся в виде текстового сообщения всегда (может пофиксить как в процессах?).

Некоторые готовые комбинаторы

Советские учёные не такие дураки, и успели написать некоторые комбинаторы над процессами, серверами, фабриками.

Сервер

Обычный процесс получает сообщения с типом process_message 'i, позволяющим передать процессу как непосредственно сообщение с типом 'i (Msg 'i), так и сообщения служебного характера (Cmd команда), например, сообщения "убей себя" либо "мониторимый процесс такой-то вышел".

В случае, когда процесс представляет собой концептуально что-то в виде функции, берущей значение с типом 'i и возвращающей значение с типом 'o, при этом совершая сайд-эффекты (то есть, тип 'i -> IO.m 'o), такой процесс я обозвал словом "сервер" и решил, что он будет создаваться на основании функции с типом 'i -> IO.m 'o. Разворачивание сообщений, обработку служебных сообщений и заворачивание результатов в значение типа, который возвращают все процессы, делается комбинатором "create_server", который берёт на вход фабрику, создающую процесс.

server_switch

Существует необходимость обрабатывать сообщения разными процессами в зависимости от того, какой вид сообщений обрабатывает каждый из процессов. Например, если целью является обработка сообщений разных видов (или сообщений к разным логическим адресатам), и каждый вид сообщений обрабатывается специальным образом, вполне логично было бы зарегистрировать обработчики заданий конкретных видов, и вместе с непосредственно сообщением передавать вид сообщения, чтобы "диспетчер" смог определить, какому из обработчиков это задание необходимо направить.

process_limit

Существует необходимость запускать параллельно много процессов, обрабатывающих сообщения так, что в случае занятости всех работающих процессов должен запускаться новый процесс такого же типа, как и в случае смерти какого-либо из процессов, однако так, чтобы всего было не более определённого числа однотипных процессов. Это реализовано комбинатором process_limit.

stream_transform

Имея фабрику, принимающую ('i * 's) и возвращающую ((list 'o) * 's), имея начальное состояние с типом 's, и имея адресата, принимающего сообщения с типом 'o, можно создать процесс, принимающий сообщения, содержащие тип 'i, обрабатывающий их исходной фабрикой, и отправляющий адресату все сообщения из возвращённого фабрикой списка. При этом возвращённое состояние будет использовано для обработки следующего входящего сообщения с типом 'i.

Port

Возможно запустить внешний процесс ОС, имея командную строку запуска, так, что процесс парвела будет принимать сообщения с типами [ Stdin of string | Kill of int ] и отправлять указанному адресату сообщения с типом [ Stdout of string | Stderr of string | Exited of Unix.process_status ], реализуя классическое общение с процессом через стандартные файловые дескрипторы, возможность принудительно убить данный процесс и получение информации о завершении процесса (код возврата, вышел ли из-за сигнала).

Проект об именованных процессах

inter-process-protocol

Updated