Commits

arkdro committed 45a42df

Imported from svn by Bitbucket

  • Participants

Comments (0)

Files changed (41)

+name = "ocamerl"
+version = "0.0.1"
+description = "An distributed Erlang node in Ocaml (kind of ml_interface)."
+requires = "unix"
+archive(byte) = "ocamerl.cma"
+archive(native) = "ocamerl.cmxa"

lib/ocamerl/Makefile

+TEST_SOURCES=${wildcard test/*Test.ml}
+TEST_BYTE   =${TEST_SOURCES:.ml=.byte}
+TEST_SHELL  =${wildcard test/*.sh}
+
+EX_ML_SOURCES=${wildcard ex/*.ml}
+EX_BYTE   =${EX_ML_SOURCES:.ml=.byte}
+EX_BEAM   =$(patsubst ex/%.erl,beam/%.beam,$(wildcard ex/*.erl))
+
+OCAMLBUILD=ocamlbuild -classic-display
+
+NAME=ocamerl
+ARCHIVE=$(NAME).cma
+XARCHIVE=$(NAME).cmxa
+
+LIB_TARGETS=\
+    $(ARCHIVE) \
+    $(XARCHIVE) \
+    ocamerl/ocamerl.docdir/index.html
+
+TARGETS=\
+	$(LIB_TARGETS) \
+    $(EX_BYTE)
+
+TEST_TARGETS=\
+	$(LIB_TARGETS) \
+    $(EX_BYTE) \
+    $(TEST_BYTE)
+
+
+.PHONY: main
+main: main_build test
+
+.PHONY: main_build
+main_build: ex_build
+	$(OCAMLBUILD) $(TARGETS)
+
+
+.PHONY: dev
+dev: dev_build utest test
+
+.PHONY: dev_build
+dev_build: ERLBUILDOPT = +debug_info -DEUNIT -Wall
+dev_build: generated ex_build
+	$(OCAMLBUILD) $(TEST_TARGETS)
+
+
+.PHONY: ex_build
+ex_build: ERLBUILDOPT = +debug_info -DEUNIT -Wall
+ex_build: $(EX_BEAM)
+
+
+.PHONY: install
+install: uninstall main_build
+	test ! -f _build/$(XARCHIVE) || extra="_build/$(XARCHIVE) _build/"`basename $(XARCHIVE) .cmxa`".a" ; \
+	ocamlfind install $(NAME) META _build/$(NAME)/*.mli _build/$(NAME)/*.cmi _build/$(NAME).cmi _build/$(ARCHIVE) $$extra
+
+.PHONY: uninstall
+uninstall:
+	ocamlfind remove $(NAME)
+
+
+.PHONY: clean
+clean:
+	$(OCAMLBUILD) -clean
+	@rm -vf test/dataSet.ml
+	@rm -vf beam/*.beam
+
+
+.PHONY: test
+# no rule dependency ... rule used as a function
+test:
+	# start erlang first (ensure epmd is running)
+	@echo "Run and stop a distributed erlang node to ensure epmd is running."
+	erl -sname e1 -setcookie cookie -noshell -s init stop
+	@for i in ${TEST_SHELL} ; do \
+		echo -n "Running '$$i': " ; \
+		$$i > /dev/null 2>&1 ; \
+		r=$$? && echo "$$r" ; \
+	done
+
+.PHONY: utest
+# no rule dependency ... rule used as a function
+utest:
+	@for i in ${TEST_BYTE} ; do ./`basename $$i` ; done
+
+
+.PHONY: generated
+generated: test/dataSet.ml
+
+test/dataSet.ml: beam/datagen.beam
+	erl -noshell -pa beam -s datagen doit -s init stop > test/dataSet.ml
+
+beam/%.beam : test/%.erl
+	mkdir -p beam
+	erlc ${ERLBUILDOPT} -o beam $<
+
+beam/%.beam : ex/%.erl
+	mkdir -p beam
+	erlc ${ERLBUILDOPT} -o beam $<
+
+

lib/ocamerl/TODO.txt

+Info:
+    * External term format and distribution is now part of ERTS doc:
+      http://www.erlang.org/doc/apps/erts/part_frame.html
+
+Build/Distrib:
+    * mlpack or mllib?
+    * GODI
+    * what about a common lib (mainly eterm) usable for F#
+
+Logging:
+    * pa_log http://www.cs.nyu.edu/~cconway/tools/index.html
+
+Look at:
+    * Mbox could have a threaded version, a forked version (using pipe for communication), 
+    * socket / concurrency
+        * ocamlnet/equeue
+        * jocaml
+        * ocaml-event (libevent) http://www.xs4all.nl/~mmzeeman/ocaml
+        * Ocaml-NAE (iom, cf) http://sourceforge.net/projects/ocnae

lib/ocamerl/_tags

+<ocamerl/*.cmx>: for-pack(Ocamerl)

lib/ocamerl/ex/_tags

+<*.ml>: pkg_threads
+<*.byte>: pkg_threads

lib/ocamerl/ex/ex_epmc.ml

+open Ocamerl
+
+let get_port epmc n =
+    let port = Epmc.node_port epmc n in
+    match port with
+    | Some p ->
+        Trace.inf "ex_epmc" "node:%s port: %d\n" n p
+    | None ->
+        Trace.wrn "ex_epmc" "node:%s port: Not_found\n" n
+
+let rec doit epmc args =
+    match args with
+        | [] ->
+            ()
+        | "get" :: n :: rest ->
+            get_port epmc n;
+            doit epmc rest
+        | "set" :: n :: p :: rest ->
+            ignore(Epmc.connect epmc n (int_of_string p));
+            doit epmc rest
+        | _ ->
+            failwith "unknow command"
+
+let usage exe =
+    Trace.inf "ex_epmc" "Usage: %s [set <name> <port>]{1} [get <name>]* [getm]{1}" exe
+
+let _  =
+    Trace.inf "ex_epmc" "EPMD inspect\n";
+    let args = Array.to_list Sys.argv in
+    let exe = List.hd args in
+    let rest = List.tl args in
+    if List.length rest == 0 then usage exe;
+    try
+        let epmc = Epmc.create () in
+        doit epmc rest
+    with
+        exn -> usage exe
+    
+

lib/ocamerl/ex/ex_node_double.ml

+open Ocamerl
+
+let create_double_process node name =
+    let mbox = Enode.create_mbox node in
+    let _ = Enode.register_mbox node mbox name in
+    let recvCB = fun msg -> match msg with
+        | Eterm.ET_tuple [|toPid; ref; Eterm.ET_int i;|] ->
+            Enode.send
+                node
+                toPid
+                (Eterm.ET_tuple [| ref; Eterm.ET_int (Int32.mul i 2l); |])
+        | _ ->
+            (* skip unknown message *)
+            ()
+    in
+    Enode.Mbox.create_activity mbox recvCB
+
+let doit () =
+    try
+        Trace.inf "Node_double" "Creating node\n";
+        let name = match Sys.argv with
+            | [| _; s; |] -> s
+            | _ -> "ocaml"
+        in
+        let n = Enode.create name ~cookie:"cookie" in
+        let _ = Thread.sigmask Unix.SIG_BLOCK [Sys.sigint] in
+        let _ = Enode.start n in
+        let _ = create_double_process n "bytwo" in
+        let _ = Thread.wait_signal [Sys.sigint] in
+        Enode.stop n
+    with
+        exn -> Printf.printf "ERROR:%s\n" (Printexc.to_string exn)
+
+let _  = doit ()

lib/ocamerl/ex/ex_node_mult.ml

+open Ocamerl
+
+let create_worker_process node operand =
+    let mbox = Enode.create_mbox node in
+    let recvCB = fun msg -> match msg with
+    | Eterm.ET_tuple [|toPid; ref; Eterm.ET_int i;|] ->
+        Enode.send
+            node
+            toPid
+            (Eterm.ET_tuple [| ref; Eterm.ET_int (Int32.mul i operand); |])
+    | _ ->
+        (* skip unknown message *)
+        ()
+    in
+    Enode.Mbox.create_activity mbox recvCB;
+    Enode.Mbox.pid mbox
+
+let create_main_process node name =
+    let mbox = Enode.create_mbox node in
+    let _ = Enode.register_mbox node mbox name in
+    let recvCB = fun msg -> match msg with
+    | Eterm.ET_tuple [|toPid; Eterm.ET_int i;|] ->
+        let slave = create_worker_process node i in
+        Enode.send
+            node
+            toPid
+            slave
+    | _ ->
+        (* skip unknown message *)
+        ()
+    in
+    Enode.Mbox.create_activity mbox recvCB
+
+let doit () =
+    try
+        let name = ref "ocaml" in
+        let cookie = ref "" in
+        Arg.parse
+            [
+                ("-cookie", Arg.String ((:=) cookie), "erlang node cookie");
+                ("-name", Arg.String ((:=) name), "erlang node name");
+                ("-debug", Arg.Unit (fun () -> Trace.set_level Trace.lvl_debug), "print debug messages");
+            ]
+            ignore
+            "";
+        Trace.inf "Node_double" "Creating node; name: %s; cookie: %s\n" !name !cookie;
+        let n = Enode.create !name ~cookie:!cookie in
+        let _ = Thread.sigmask Unix.SIG_BLOCK [Sys.sigint] in
+        let _ = Enode.start n in
+        let _ = create_main_process n "byn" in
+        let _ = Thread.wait_signal [Sys.sigint] in
+        Enode.stop n
+    with
+        exn -> Printf.printf "ERROR:%s\n" (Printexc.to_string exn)
+
+let _  = doit ()
+

lib/ocamerl/ex/ex_node_wc.ml

+open Ocamerl
+
+let prog = "Ex_node_wc"
+
+let (|>) left right = right left
+
+
+let words_of_file fn =
+    let buf = Scanf.Scanning.from_file fn in
+    let scanner () = Scanf.bscanf buf " %s " (fun s -> s) in
+    Stream.from (fun _ -> match scanner () with "" -> None | s -> Some s)
+
+let send_ok node self ref ctrlPid =
+    Enode.send
+        node
+        ctrlPid
+        (Eterm.ET_tuple [|Eterm.ET_atom "ok"; ref; self;|])
+
+let create_mapper_process node ref ctrlPid =
+    let mbox = Enode.create_mbox node in
+    let self = Enode.Mbox.pid mbox in
+    let recvCB = fun msg -> match msg with
+    | Eterm.ET_tuple [| ref; Eterm.ET_atom "from_file"; Eterm.ET_string filename; destPid; ctrlPid;|] ->
+        let sender w =
+            let item = (Eterm.ET_tuple [| Eterm.ET_string w; Eterm.ET_int 1l;|]) in
+            Trace.dbg prog "Pid %s sending item %s\n" (Eterm.to_string self) (Eterm.to_string item);
+            Enode.send node destPid item
+        in
+        words_of_file filename |> Stream.iter sender;
+        send_ok node self ref ctrlPid
+    | Eterm.ET_atom "stop" ->
+        Enode.destroy_mbox node mbox
+    | m ->
+        (* skip unknown message *)
+        Trace.inf prog "Skip unknow message: %s\n" (Eterm.to_string m)
+    in
+    Enode.Mbox.create_activity mbox recvCB;
+    send_ok node self ref ctrlPid
+
+let create_main_process node name =
+    let mbox = Enode.create_mbox node in
+    let _ = Enode.register_mbox node mbox name in
+    let recvCB = fun msg -> match msg with
+    | Eterm.ET_tuple [| Eterm.ET_atom "new_mapper"; ref; ctrlPid; |] ->
+        create_mapper_process node ref ctrlPid
+    | m ->
+        (* skip unknown message *)
+        Trace.inf prog "Skip unknow message: %s\n" (Eterm.to_string m)
+    in
+    Enode.Mbox.create_activity mbox recvCB
+
+let doit () =
+    let name = ref "ocaml" in
+    let cookie = ref "" in
+    Arg.parse
+        [
+            ("-cookie", Arg.String ((:=) cookie), "erlang node cookie");
+            ("-name", Arg.String ((:=) name), "erlang node name");
+            ("-debug", Arg.Unit (fun () -> Trace.set_level Trace.lvl_debug), "print debug messages");
+        ]
+        ignore
+        "";
+    Trace.inf prog "Creating node; name: %s; cookie: %s\n" !name !cookie;
+    let n = Enode.create !name ~cookie:!cookie in
+    let _ = Thread.sigmask Unix.SIG_BLOCK [Sys.sigint] in
+    let _ = Enode.start n in
+    let _ = create_main_process n "wc" in
+    let _ = Thread.wait_signal [Sys.sigint] in
+    Enode.stop n
+
+let _  =
+    try doit ()
+    with exn -> Printf.printf "ERROR:%s\n" (Printexc.to_string exn)
+

lib/ocamerl/ex/ex_wc.erl

+-module(ex_wc).
+
+-ifdef(NDEBUG).
+-export([doit/0]).
+-else.
+-compile([export_all, debug_info]).
+-endif.
+
+
+take_cycle(_L, N) when N < 0 ->
+    error;
+take_cycle([], N) when N > 0 ->
+    error;
+take_cycle(L, N) ->
+    take_cycle(L, L, N, []).
+
+take_cycle(_O, _L, 0, Acc) ->
+    lists:reverse(Acc);
+take_cycle(O, [], N, Acc) ->
+    take_cycle(O, O, N, Acc);
+take_cycle(O, [H|Q], N, Acc) ->
+    take_cycle(O, Q, N-1, [H|Acc]).
+
+
+new_mapper(ONode) ->
+    R = make_ref(),
+    Manager = {wc, ONode},
+    Manager ! {new_mapper, R, self()},
+    receive {ok, R, M} -> M after 1000 -> error end.
+
+new_reducer(ENode) ->
+    spawn(ENode, ?MODULE, reduce, []).
+
+reduce() ->
+    reduce(dict:new()).
+
+reduce(Dict) ->
+    receive
+    {stop, P} ->
+        P ! {reduce_result, Dict},
+        ok;
+    _I = {W, C} ->
+        reduce(dict:update_counter(W, C, Dict))
+    end.
+
+collect_reducers(Reducers) ->
+    lists:foreach(fun(Red) -> Red ! {stop, self()} end, Reducers),
+    lists:foldl(
+        fun(_Red, D) ->
+            receive
+            {reduce_result, Res} ->
+                dict:merge(fun(_K,V1,V2) -> V1 + V2 end, D, Res)
+            after 5000 ->
+                error
+            end
+        end,
+        dict:new(),
+        Reducers
+    ).
+
+collect_mappers(Mappers) ->
+    lists:foreach(fun(M) -> M ! stop end, Mappers).
+
+process(Mappers, Reducers, Files) ->
+    TaskManager = self(),
+    Tasks = lists:foldl(
+        fun({File, Mapper, Reducer}, Acc) ->
+            R = make_ref(),
+            Mapper ! {R, from_file, File, Reducer, TaskManager},
+            [{R, Mapper, Reducer}|Acc]
+        end,
+        [],
+        lists:zip3(
+            Files,
+            take_cycle(Mappers, length(Files)),
+            take_cycle(Reducers, length(Files))
+        )
+    ),
+    wait_tasks(Tasks).
+
+wait_tasks([]) ->
+    ok;
+wait_tasks(Tasks) ->
+    receive
+    {ok, R, _Map} ->
+        wait_tasks(lists:keydelete(R, 1, Tasks))
+    % here could timeout and re-run some tasks ... partial tasks may need buffering in reduction
+    end.
+
+doit(Files, ONodes, MapperN, ENodes, ReducerN) ->
+    io:format("files=~w~n", [Files]),
+    Mappers = [new_mapper(ONode) || ONode <- take_cycle(ONodes, MapperN)],
+    Reducers = [new_reducer(ENode) || ENode <- take_cycle(ENodes, ReducerN)],
+    process(Mappers, Reducers, Files),
+    collect_mappers(Mappers),
+    Res = collect_reducers(Reducers),
+    io:format("map-reduce wc on files ~p produced result: ~p~n", [Files, dict:to_list(Res)]),
+    Res.

lib/ocamerl/myocamlbuild.ml

+open Ocamlbuild_plugin
+open Command
+(* http://brion.inria.fr/gallium/index.php/Using_ocamlfind_with_ocamlbuild *)
+
+(* these functions are not really officially exported *)
+let run_and_read = Ocamlbuild_pack.My_unix.run_and_read
+let blank_sep_strings = Ocamlbuild_pack.Lexers.blank_sep_strings
+   
+(* this lists all supported packages *)
+let find_packages () =
+    blank_sep_strings &
+        Lexing.from_string &
+        run_and_read "ocamlfind list | cut -d' ' -f1"
+                     
+(* this is supposed to list available syntaxes, but I don't know how to do it. *)
+let find_syntaxes () = ["camlp4o"; "camlp4r"]
+                        
+(* ocamlfind command *)
+let ocamlfind x = S[A"ocamlfind"; x]
+
+let _ = dispatch begin function
+    | Before_options ->
+        (* by using Before_options one let command line options have an higher priority *)
+        (* on the contrary using After_options will guarantee to have the higher priority *)
+
+        (* override default commands by ocamlfind ones *)
+        Options.ocamlc   := ocamlfind & A"ocamlc";
+        Options.ocamlopt := ocamlfind & A"ocamlopt";
+        Options.ocamldep := ocamlfind & A"ocamldep";
+        Options.ocamldoc := ocamlfind & A"ocamldoc"
+    | After_rules ->
+        (*make_tests (test_files ());*)
+        
+        (* When one link an OCaml library/binary/package, one should use -linkpkg *)
+        flag ["ocaml"; "link"] & A"-linkpkg";
+                
+        (* For each ocamlfind package one inject the -package option when
+           * compiling, computing dependencies, generating documentation and
+           * linking. *)
+        List.iter begin fun pkg ->
+            flag ["ocaml"; "compile";  "pkg_"^pkg] & S[A"-package"; A pkg];
+            flag ["ocaml"; "ocamldep"; "pkg_"^pkg] & S[A"-package"; A pkg];
+            flag ["ocaml"; "doc";      "pkg_"^pkg] & S[A"-package"; A pkg];
+            flag ["ocaml"; "link";     "pkg_"^pkg] & S[A"-package"; A pkg];
+        end (find_packages ());
+                                                                                                         
+        (* Like -package but for extensions syntax. Morover -syntax is useless
+        * when linking. *)
+        List.iter begin fun syntax ->
+            flag ["ocaml"; "compile";  "syntax_"^syntax] & S[A"-syntax"; A syntax];
+            flag ["ocaml"; "ocamldep"; "syntax_"^syntax] & S[A"-syntax"; A syntax];
+            flag ["ocaml"; "doc";      "syntax_"^syntax] & S[A"-syntax"; A syntax];
+        end (find_syntaxes ());
+        
+        (* The default "thread" tag is not compatible with ocamlfind.
+        Indeed, the default rules add the "threads.cma" or "threads.cmxa"
+        options when using this tag. When using the "-linkpkg" option with
+        ocamlfind, this module will then be added twice on the command line.
+        
+        To solve this, one approach is to add the "-thread" option when using
+        the "threads" package using the previous plugin.
+        *)
+        flag ["ocaml"; "pkg_threads"; "compile"] (S[A "-thread"]);
+        flag ["ocaml"; "pkg_threads"; "link"] (S[A "-thread"])
+    | _ -> ()
+end

lib/ocamerl/ocamerl.mlpack

+Econn
+Enode
+Epmc
+Eterm
+Fifo
+Fsm
+Handshake
+Serv
+Tools
+Trace

lib/ocamerl/ocamerl/_tags

+"handshake.ml": camlp4o
+"econn.ml": camlp4o
+"eterm.ml": camlp4o
+"epmc.ml": camlp4o
+<*.ml>: pkg_threads
+

lib/ocamerl/ocamerl/econn.ml

+module Packing = struct
+    
+    let tag_pass_through = 'p' (* 112 *)
+
+    type msg =
+        | Msg_tick
+        | Msg_p of
+              Eterm.t        (* control message *)
+            * Eterm.t option (* optional parameter *)
+
+    let message_to_string msg = match msg with
+        | Msg_tick               -> "Msg_tick"
+        | Msg_p (ctrl, None)     -> Printf.sprintf "Msg_p(%s)" (Eterm.to_string ctrl)
+        | Msg_p (ctrl, Some arg) -> Printf.sprintf "Msg_p(%s, %s)" (Eterm.to_string ctrl) (Eterm.to_string arg)
+
+    let rec message_of_stream = parser
+        | [< len = Tools.eint_n 4; stream >] ->
+            let p = match len with
+                | 0 ->
+                    begin
+                    parser [< >] ->
+                        Msg_tick
+                    end
+                | _ ->
+                    begin
+                    parser [< 'tag; msg = _tag_parse (len - 1) tag >] ->
+                        msg
+                    end
+            in
+            p stream
+    and _tag_parse len tag =
+        match tag with
+            | t when t = tag_pass_through -> _parse_p len
+            | _ -> failwith "unrecognize control message tag"
+    and _parse_p len =
+        parser [< data = Tools.string_n len; >] ->
+            let s = Stream.of_string data in
+            let ctrl = Eterm.of_stream s in
+            _parse_p_arg ctrl s
+    and _parse_p_arg ctrl = parser
+        | [< arg = Eterm.of_stream; >] ->
+            Msg_p (ctrl, Some arg)
+        | [< >] ->
+            Msg_p (ctrl, None)
+
+    let _message_to_chars msg = match msg with
+        | Msg_tick ->
+            []
+        | Msg_p (ctrl, None) ->
+            tag_pass_through
+            :: (Eterm.to_chars ctrl)
+        | Msg_p (ctrl, Some msg) ->
+            tag_pass_through
+            :: (Eterm.to_chars ctrl)
+            @  (Eterm.to_chars msg)
+
+    let pack msg =
+        let chars = _message_to_chars msg in
+        let len = List.length chars in
+        let head = Tools.chars_of_int len 4 in
+        let r = Tools.implode (head @ chars) in
+        r
+
+end (* module Packing *)
+
+
+module Ticker = struct
+    
+    type t = {
+        tickTime: float;
+        mutable tickCB: (unit -> unit) option;
+        actLock: Mutex.t;
+        mutable lastActivity: time;
+        stopLock: Mutex.t;
+        mutable stop: bool;
+    }
+    and time = float
+
+    let now () = Unix.time ()
+
+    let create tickTime = {
+        tickTime = tickTime;
+        tickCB = None;
+        actLock = Mutex.create ();
+        lastActivity = now ();
+        stopLock = Mutex.create ();
+        stop = false;
+    }
+    
+    let update_activity ticker =
+        Mutex.lock ticker.actLock;
+        ticker.lastActivity <- now ();
+        Mutex.unlock ticker.actLock;
+        Trace.dbg "Econn"
+            "Ticker: last activity: %f\n"
+            ticker.lastActivity
+
+    let set_cb ticker cb =
+        ticker.tickCB <- Some cb
+
+    let is_stop ticker =
+        Mutex.lock ticker.stopLock;
+        let stop = ticker.stop in
+        Mutex.unlock ticker.stopLock;
+        stop
+
+    let start ticker checkFreq =
+        (* checkFreq enable to thread to wakes-up and check
+        that it should continue to run or not  ... crap!*)
+        let rec _loop delay =
+            Thread.delay delay;
+            match is_stop ticker with
+                | false ->
+                    let since = (now ()) -. ticker.lastActivity in
+                    let delta = since -. ticker.tickTime in
+                    ignore (match delta >= 0.0 with
+                        | true ->
+                            Trace.dbg "Econn" "Ticker: %f is time to tick\n" (now ());
+                            let _ = match ticker.tickCB with
+                                | Some f -> f ()
+                                | None -> ()
+                            in
+                            update_activity ticker;
+                            Trace.flush ();
+                            (*_loop ticker.tickTime *)
+                            _loop checkFreq
+                        | _ ->
+                            (*_loop delta*)
+                            _loop checkFreq
+                    )
+                | _ ->
+                    Trace.dbg "Econn" "Ticker is stopping\n";
+                    Trace.flush ()
+        in
+        (*Thread.create _loop ticker.tickTime*)
+        Thread.create _loop checkFreq
+
+    let stop ticker =
+        Mutex.lock ticker.stopLock;
+        ticker.stop <- true;
+        Mutex.unlock ticker.stopLock
+
+end (* module Ticker *)
+
+
+module Sender = struct
+
+    type t = {
+        ochannel: out_channel;
+        queue: msg Fifo.t;
+        mutable thread: Thread.t option;
+        mutable ticker: Ticker.t option;
+    }
+    and msg =
+          Data of string
+        | Ctrl of int
+
+    let tickData = Data (Packing.pack Packing.Msg_tick)
+
+    let create oc =
+        {
+            ochannel = oc;
+            queue = Fifo.create ();
+            thread = None;
+            ticker = None;
+        }
+
+    let tick sender =
+        Trace.dbg "Econn" "Sender is ticking\n";
+        Trace.flush ();
+        Fifo.put sender.queue tickData
+
+    let send sender data =
+        let _ = Fifo.put sender.queue (Data data) in
+        match sender.ticker with
+        | Some ticker -> Ticker.update_activity ticker
+        | _ -> ()
+
+    let start sender =
+        let rec _loop () =
+            let msg = Fifo.get sender.queue in
+            match msg with
+            | Data data ->
+                Trace.dbg "Econn" "Sender sent one packet\n";
+                Trace.flush ();
+                output_string sender.ochannel data;
+                flush sender.ochannel;
+                _loop ()
+            | Ctrl code ->
+                (* any control message stop the loop *)
+                Trace.dbg "Econn" "Sender is stopping\n"
+        in
+        let thr = Thread.create _loop () in
+        sender.thread <- Some thr;
+        ()
+
+    let stop sender =
+        let _ = match sender.ticker with
+        | Some ticker ->
+            Ticker.stop ticker
+        | None ->
+            ()
+        in
+        match sender.thread with
+        | Some thr ->
+            Fifo.put sender.queue (Ctrl 0);
+            Thread.join thr
+        | None ->
+            ()
+
+
+    let start_ticker sender tickTime =
+        let ticker = Ticker.create tickTime in
+        let tickCB = fun () -> tick sender in
+        let _ = Ticker.set_cb ticker tickCB in
+        sender.ticker <- Some ticker;
+        Ticker.start ticker 1.0
+
+end (* module Sender *)
+
+
+module Connection = struct
+
+    type t = {
+        sender: Sender.t;
+    }
+
+    let create sender = {
+        sender = sender;
+    }
+
+    let send conn ctrl arg =
+        let msg = Packing.Msg_p (ctrl, arg) in
+        Trace.dbg "Econn"
+            "Sending control message: %s\n"
+            (Packing.message_to_string msg);
+        Trace.flush ();
+        let bin = Packing.pack msg in
+        Sender.send conn.sender bin
+
+end (* module Connection *)
+
+
+module ConnManager = struct
+
+    type t = (string, Connection.t) Hashtbl.t
+
+    let create () =
+        Hashtbl.create 10
+
+    let connection_up self peerName conn =
+        Hashtbl.add self peerName conn
+
+    let get self peerName =
+        Hashtbl.find self peerName
+
+end (* module ConnManager *)
+
+
+module Server = struct
+    (* Incoming connection handler. *)
+
+    type t = {
+        addr: Unix.inet_addr;
+        port: int;
+        sock: Unix.file_descr;
+        mutable thread: Thread.t option;
+    }
+
+    type handler_state_t = {
+        nodeName: string;
+        cookie: string;
+        flags: Int32.t;
+        tickTime: float;
+        connectionUpCB: string -> Connection.t -> unit;
+        controlCB: Eterm.t -> Eterm.t option -> unit;
+    }
+
+    
+    let distr_version = 5
+
+
+    let _handshake st istream sender =
+        let _get_message istream =
+            let msg = try
+                Handshake.message_of_stream istream
+            with
+                Stream.Failure ->
+                    failwith "Handshake stream failure"
+            in
+            Trace.dbg "Econn"
+                "Received handshake message: %s\n"
+                (Handshake.message_to_string msg);
+            msg
+        in
+        let _do_actions sender actions = List.iter
+            (fun msg ->
+                let bin = Handshake.pack msg in
+                Sender.send sender bin
+            )
+            actions
+        in
+        let rec _loop fsm istream sender =
+            let msg = _get_message istream in
+            match Fsm.send fsm msg with
+            | Fsm.Reply(Some result, actions) ->
+                _do_actions sender actions;
+                Trace.flush ();
+                result
+            | Fsm.Reply(None, actions) ->
+                _do_actions sender actions;
+                Trace.flush ();
+                _loop fsm istream sender
+            | Fsm.Finish ->
+                Trace.flush ();
+                (false, None)
+        in
+        let fsm = Handshake.create_fsm
+            distr_version
+            st.nodeName
+            st.cookie
+            st.flags
+        in
+        _loop fsm istream sender
+
+    let rec _control state istream sender =
+        let msg = try
+            Packing.message_of_stream istream
+        with
+            Stream.Failure ->
+                failwith "Control stream failure"
+        in
+        let _ = match msg with
+        | Packing.Msg_tick ->
+            Trace.dbg "Econn" "Received tick control message\n"
+            (* TODO check that peer continue to tick
+            and else set connection down *)
+        | Packing.Msg_p (ectrl, arg) ->
+            Trace.dbg "Econn"
+                "Received control message: %s\n"
+                (Packing.message_to_string msg)
+            ;
+            state.controlCB ectrl arg
+        in
+        Trace.flush ();
+        _control state istream sender
+
+    let _handler state id fd =
+        Random.self_init ();
+        let ic = Unix.in_channel_of_descr fd in
+        let oc = Unix.out_channel_of_descr fd in
+        let istream = Stream.of_channel ic in
+        (* sender is responsible of output connection *)
+        let sender = Sender.create oc in
+        let _senderThread = Sender.start sender in
+        (* first: do the handshake *)
+        begin
+        match _handshake state istream sender with
+        | (true, Some peerName) ->
+            Trace.dbg "Econn" "Handshake OK\n";
+            (* register the connection *)
+            state.connectionUpCB
+                peerName
+                (Connection.create sender)
+            ;
+            (* thread to tick the peer *)
+            ignore(Sender.start_ticker sender state.tickTime);
+            (* current thread receive from peer *)
+            let _ = try
+                _control state istream sender
+            with
+                exn ->
+                    Trace.err "Econn"
+                        "error in control loop: %s\n"
+                        (Printexc.to_string exn)
+            in
+            (*TODO register connection down *)
+            Sender.stop sender
+        | _ ->
+            Trace.inf "Econn" "Close connection comming from unauthorized node\n"
+        end;
+        Trace.dbg "Econn" "End of connection\n"
+        (*Unix.close fd*)
+
+    let port self =
+        self.port
+
+    let create () =
+        let sock = Serv.listen 0 in
+        let addr, port = Serv.inet_addr sock in
+        Trace.dbg "Econn"
+            "Node server listening on port %i\n"
+            port
+        ;
+        {
+            addr = addr;
+            port = port;
+            sock = sock;
+            thread = None;
+        }
+
+    let start self name cookie connUpCB controlCB =
+        let handler controlCB =
+            Serv.handle_in_thread ( Serv.trace_handler ( Serv.make_handler (
+                _handler {
+                        flags = (Int32.logor 4l 256l); (* TODO set correct flags *)
+                        cookie = cookie;
+                        nodeName = name;
+                        tickTime = 10.0; (*TODO which value? *)
+                        connectionUpCB = connUpCB;
+                        controlCB = controlCB;
+                }
+            )))
+        in
+        let server controlCB =
+            try
+                Serv.accept_loop 0 self.sock (handler controlCB)
+            with
+                exn ->
+                    Trace.inf "Econn" "Exception in server (may be stopping)\n"
+        in
+        let thr = Thread.create server controlCB in
+        self.thread <- Some thr
+
+    let stop self = 
+        let _ = Unix.shutdown self.sock Unix.SHUTDOWN_ALL in
+        match self.thread with
+        | Some thr ->
+            Thread.join thr;
+            self.thread <- None
+        | None ->
+            ()
+
+end (* module Server *)
+
+
+type t = {
+    server: Server.t;
+    connections: ConnManager.t;
+}
+
+
+let listen_port self = Server.port self.server
+
+let send self name ctrl arg =
+    try
+        let conn = ConnManager.get self.connections name in
+        Connection.send conn ctrl arg
+    with
+        Not_found ->
+            (* TODO try to establish the connection *)
+            Trace.dbg "Econn" "Cannot send message: connection not found";
+            failwith "Cannot send message: connection not found"
+
+let create () = {
+    server = Server.create ();
+    connections = ConnManager.create ();
+}
+
+let start self name cookie controlCB =
+    let connectionUpCB = ConnManager.connection_up self.connections in
+    Server.start
+        self.server
+        name
+        cookie
+        connectionUpCB
+        controlCB
+
+let stop self =
+    Trace.dbg "Econn" "Node connections are stopping\n";
+    Server.stop self.server;
+    Trace.todo "Econn" "connections must be stop/unregistered\n";
+    Trace.inf "Econn" "Node server stopped\n"

lib/ocamerl/ocamerl/enode.ml

+module Mbox = struct
+
+    type element =
+        | Data of Eterm.t
+        | Ctrl of int
+
+    type t = {
+        pid: Eterm.e_pid;
+        queue: element Fifo.t;
+        mutable name: string option;
+        mutable activity: Thread.t option;
+    }
+
+    let ctrl_stop = Ctrl 0
+
+    let _create pid = {
+        name = None;
+        pid = pid;
+        queue = Fifo.create ();
+        activity = None;
+    }
+
+    let name mbox =
+        mbox.name
+
+    let _set_name self name =
+        self.name <- Some name
+
+    let e_pid self =
+        self.pid
+
+    let pid self =
+        Eterm.ET_pid self.pid
+
+    let _new_message mbox msg =
+        (* no limit on number of message. *)
+        Fifo.put mbox.queue (Data msg)
+
+    let _receive mbox =
+        Fifo.get mbox.queue
+
+    let create_activity mbox recvCB =
+        match mbox.activity with
+        | Some t ->
+            failwith "activity alreeady exists (see stop_activity)."
+        | None ->
+            let rec recv_loop = fun () ->
+                let elt = _receive mbox in
+                match elt with
+                | Data msg ->
+                    recvCB msg;
+                    recv_loop ()
+                | ctrl_stop ->
+                    ()
+            in
+            let thr = Thread.create recv_loop () in
+            mbox.activity <- Some thr
+
+    let stop_activity mbox =
+        let _ = Fifo.put mbox.queue ctrl_stop in
+        Trace.dbg
+            "Enode"
+            "Mbox (%s%s) activity is stopping\n"
+            (Eterm.e_pid_to_string mbox.pid)
+            (match name mbox with
+            | Some name -> ", " ^ name
+            | None -> ""
+            );
+        Trace.flush ();
+        mbox.activity <- None
+
+end (* module Mbox *)
+
+
+module MboxManager : sig
+    type t
+    val create: unit -> t
+    val mboxes: t -> Mbox.t list
+    val make_mbox: t -> Eterm.e_pid -> Mbox.t
+    val destroy_mbox: t -> Mbox.t -> unit
+    val register: t -> Mbox.t -> string -> unit
+    val unregister: t -> Mbox.t -> unit
+    val find_by_name: t -> string -> Mbox.t
+    val find_by_pid: t -> Eterm.e_pid -> Mbox.t
+end = struct
+
+    (* TODO all that surely need to be thread safe ... *)
+    type t = {
+        pidMboxMap: (Eterm.e_pid, Mbox.t) Hashtbl.t;
+        nameMboxMap: (string, Mbox.t) Hashtbl.t;
+    }
+
+    let create () = {
+        pidMboxMap = Hashtbl.create 10;
+        nameMboxMap = Hashtbl.create 10;
+    }
+
+    let mboxes self =
+        Hashtbl.fold
+            (fun k v acc -> v::acc)
+            self.pidMboxMap
+            []
+
+    let make_mbox self pid =
+        let mbox = Mbox._create pid in
+        Hashtbl.add self.pidMboxMap pid mbox;
+        mbox
+
+    let destroy_mbox self mbox =
+        begin
+        match Mbox.name mbox with
+        | Some name ->
+            Hashtbl.remove self.nameMboxMap name
+        | None -> ()
+        end;
+        Hashtbl.remove self.pidMboxMap (Mbox.e_pid mbox);
+        Mbox.stop_activity mbox
+
+    let register self mbox name =
+        let _ = Mbox._set_name mbox name in
+        Hashtbl.add self.nameMboxMap name mbox
+
+    let unregister self mbox =
+        match Mbox.name mbox with
+        | Some name ->
+            Hashtbl.remove self.nameMboxMap name
+        | _ ->
+            ()
+
+    let find_by_name self name =
+        Hashtbl.find self.nameMboxMap name
+
+    let find_by_pid self pid =
+        Hashtbl.find self.pidMboxMap pid
+
+end (* module MboxManager *)
+
+
+module PidManager : sig
+    type t
+    val create: string -> t
+    val init: t -> int -> bool
+    val reset: t -> unit
+    val is_initialized: t -> bool
+    val make_pid: t -> Eterm.e_pid
+end = struct
+
+    type t = {
+        node: string;
+        mutable creation: int option;
+        mutable serial: int;
+        mutable count: int;
+    }
+
+    let create nodeName = {
+        node = nodeName;
+        creation = None;
+        serial = 0;
+        count = 0;
+    }
+    
+    let init self creation =
+        self.creation <- Some creation;
+        self.serial <- 0;
+        self.count <- 0;
+        true
+
+    let reset self =
+        self.creation <- None
+    
+    let is_initialized self =
+        match self.creation with
+        | None -> false
+        | Some _ -> true
+
+    let _increment pids =
+        pids.count <- pids.count + 1;
+        if pids.count > 0x7fff then (
+            pids.count <- 0;
+            pids.serial <- pids.serial + 1;
+            if pids.serial > 0x07 then pids.serial <- 0
+        )
+
+    let make_pid self =
+        match self.creation with
+        | None ->
+            failwith "cannot create pid: creation ID not defined"
+        | Some n ->
+            let pid = (self.node, self.count, self.serial, n) in
+            let _ = _increment self in
+            pid
+
+end (* module PidManager *)
+
+
+
+type t = {
+    name: string;
+    cookie: string;
+    epmc: Epmc.t;
+    connections: Econn.t;
+    pids: PidManager.t;
+    mboxes: MboxManager.t;
+}
+
+
+(* MBoxes. *)
+
+let create_mbox node =
+    let pid = PidManager.make_pid node.pids in
+    let mbox = MboxManager.make_mbox node.mboxes pid in
+    mbox
+
+let destroy_mbox node mbox =
+    MboxManager.destroy_mbox node.mboxes mbox
+
+let register_mbox node mbox name =
+    MboxManager.register node.mboxes mbox name
+
+
+(* Node in/out messages dispatch. *)
+
+let tag_link           = 1l  (* {1, FromPid, ToPid}                          *)
+let tag_send           = 2l  (* {2, Cookie, ToPid}                 + message *)
+let tag_exit           = 3l  (* {3, FromPid, ToPid, Reason}                  *)
+let tag_unlink         = 4l  (* {4, FromPid, ToPid}                          *)
+let tag_node_link      = 5l  (* {5}                                          *)
+let tag_reg_send       = 6l  (* {6, FromPid, Cookie, ToName}       + message *)
+let tag_group_leader   = 7l  (* {7, FromPid, ToPid}                          *)
+let tag_exit2          = 8l  (* {8, FromPid, ToPid, Reason}                  *)
+let tag_send_tt        = 12l (* {12, Cookie, ToPid, TraceToken}    + message *)
+let tag_exit_tt        = 13l (* {13, FromPid, ToPid, TraceToken, Reason}     *) 
+let tag_reg_send_tt    = 16l (* {16, FromPid, Cookie, ToName, TraceToken}    *)
+let tag_exit2_tt       = 18l (* {18, FromPid, ToPid, TraceToken, Reason}     *)
+(* shall not be used (only for Erlang node, not hidden node) *)
+let tag_monitor_p      = 19l
+let tag_demonitor_p    = 20l
+let tag_monitor_p_exit = 21l
+
+let _send_to finder msg =
+    try
+        let mbox = finder () in
+        Mbox._new_message mbox msg
+    with
+        Not_found ->
+            Trace.inf
+                "Enode"
+                "failed to dispatch message: destination not found.\n"
+
+let _send_to_name node name msg =
+    _send_to
+        (fun () -> MboxManager.find_by_name node.mboxes name)
+        msg
+
+let _send_to_pid node pid msg =
+    _send_to
+        (fun () -> MboxManager.find_by_pid node.mboxes pid)
+        msg
+
+let _receive node ectrl arg =
+    match ectrl, arg with
+    | (Eterm.ET_tuple [|
+        Eterm.ET_int tag_reg_send;
+        _;
+        _;
+        Eterm.ET_atom name;
+    |], Some msg) ->
+        _send_to_name node name msg
+    | (Eterm.ET_tuple [|
+        Eterm.ET_int tag_send;
+        _;
+        Eterm.ET_pid pid;
+    |], Some msg) ->
+        _send_to_pid node pid msg
+    | _ ->
+        Trace.dbg "Enode"
+            "not implemented: ignore control message: %s\n"
+            (Eterm.to_string ectrl)
+
+let send node dest msg =
+    match dest with
+    | Eterm.ET_pid _ ->
+        let ctrl = Eterm.ET_tuple [|
+            Eterm.ET_int tag_send;
+            Eterm.ET_atom "cookie"; (* TODO cookie ... *)
+            dest;
+        |] in
+        let arg = Some msg in
+        let name = Eterm.et_pid_node_name dest in
+        Econn.send node.connections name ctrl arg
+    | _ ->
+        failwith "Enode.send: dest is not valid"
+
+
+
+(* Internal node state. *)
+
+let _create_net_kernel node =
+    let mbox = create_mbox node in
+    let _ = register_mbox node mbox "net_kernel" in
+    let recvCB = fun msg ->
+        match msg with
+            | Eterm.ET_tuple [|
+                Eterm.ET_atom "$gen_call";
+                Eterm.ET_tuple [| toPid; ref; |];
+                Eterm.ET_tuple [| Eterm.ET_atom "is_auth"; _; |]
+            |] ->
+                let rsp = Eterm.ET_tuple [|ref; Eterm.ET_atom "yes"|] in
+                send node toPid rsp    
+            | _ ->
+                (* TODO kernel probably do other things *)
+                let s = Eterm.to_string msg in
+                failwith ("unexpected msg: " ^ s)
+    in
+    Mbox.create_activity mbox recvCB
+
+let _is_published node =
+    PidManager.is_initialized node.pids
+
+let _publish node =
+    if _is_published node
+    then
+        failwith "node already published"
+    else
+        let port = Econn.listen_port node.connections in
+        match Epmc.connect node.epmc node.name port with
+        | Some creation ->
+            PidManager.init node.pids creation
+        | None ->
+            failwith "failed to publish node"
+    
+let _unpublish node =
+    let _ = Epmc.disconnect node.epmc in
+    PidManager.reset node.pids
+
+
+(* Construction and connectivity. *)
+
+let create ?(cookie="") nodeName =
+    let name =
+        if String.contains nodeName '@'
+        then nodeName
+        else String.concat "@" [nodeName; Unix.gethostname ();]
+    in
+    Trace.inf "Enode" "Making node '%s'\n" name;
+    let epmc = Epmc.create () in
+    let connections = Econn.create () in
+    let pids = PidManager.create name in
+    let mboxes = MboxManager.create () in
+    {
+        name = name;
+        cookie = cookie;
+        epmc = epmc;
+        connections = connections;
+        pids = pids;
+        mboxes = mboxes;
+    }
+
+let start node =
+    let _ = _publish node in (* may fail *)
+    let _ = _create_net_kernel node in
+    Econn.start node.connections node.name node.cookie (_receive node)
+
+let run ?(cookie="") nodeName =
+    let n = create nodeName ~cookie:cookie in
+    start n;
+    n
+
+let stop node =
+    Trace.dbg "Enode" "Node '%s' is stopping\n" node.name;
+    let mboxes = MboxManager.mboxes node.mboxes in
+    let _ = List.iter Mbox.stop_activity mboxes in
+    let _ = List.iter
+        (MboxManager.unregister node.mboxes)
+        mboxes
+    in
+    Econn.stop node.connections;
+    _unpublish node;
+    Trace.flush ();
+    ()

lib/ocamerl/ocamerl/enode.mli

+(** Erlang node and mbox (equivalent of Erlang process message queue).
+
+See Enode interface for creation of Mbox.
+*)
+
+module Mbox : sig
+    
+    type t
+
+    (** Return name of the mbox; None if mbox is not registered*)
+    val name: t -> string option
+
+    (** Return pid term. *)
+    val pid: t -> Eterm.t
+
+    (** Start a thread wich call the callback function for each incoming message. *)
+    val create_activity: t -> (Eterm.t -> unit) -> unit
+
+    (** Stop the thread created by start, if any. *)
+    val stop_activity: t -> unit
+end
+
+type t
+
+(** Create a new Mbox (equivalent of erlang process message queue). *)
+val create_mbox: t -> Mbox.t
+
+(** Remove an existing Mbox from node. *)
+val destroy_mbox: t -> Mbox.t -> unit
+
+(** Register an mbox. *)
+val register_mbox: t -> Mbox.t -> string -> unit
+
+(** [Enode.send n dest msg] Send message [msg] to destination [dest]. *)
+val send: t -> Eterm.t -> Eterm.t -> unit
+
+(** [Enode.create cookie name] Create a node named [name] and with cookie [cookie]. *)
+val create: ?cookie:string -> string -> t
+
+(** Start the node: register it, start kernel processing, and listen for connections. *)
+val start: t -> unit
+
+(** [Enode.run name ~cookie:cookie] Create a node named [name] with cookie [cookie] and start it. *)
+val run: ?cookie:string -> string -> t
+
+(** Stop the node. *)
+val stop: t -> unit

lib/ocamerl/ocamerl/epmc.ml

+module Packing = struct
+
+    let tag_alive2_req = '\120'
+    let tag_alive2_rsp = '\121'
+    let tag_port2_req  = '\122'
+    let tag_port2_rsp  = '\119'
+
+    let node_type_normal = '\077'
+    let node_type_hidden = '\072'
+
+    let distr_vsn_range = (5, 5)
+
+    type node_desc_t =
+          int         (* node server port *)
+        * char        (* node type: hidden, normal *)
+        * int         (* protocol: 0 => tcp/ip-v4 *)
+        * (int * int) (* distribution version range *)
+        * string      (* node name *)
+        * string      (* extra *)
+
+    type message =
+        | Msg_alive2_req of
+              node_desc_t
+        | Msg_alive2_rsp of
+              int         (* result *)
+            * int         (* creation *)
+        | Msg_port2_req of
+              string      (* node name *)
+        | Msg_port2_rsp of
+              node_desc_t option
+
+    let rec message_of_stream =
+        parser [< 'tag; msg = tag_parse tag >] -> msg
+    and tag_parse tag =
+        match tag with
+            | n when n = tag_alive2_rsp -> parse_alive2_rsp
+            | n when n = tag_port2_rsp -> parse_port2_rsp
+            | _ ->
+                failwith "message tag not recognized"
+    and parse_alive2_rsp =
+        parser [<
+            result = Tools.eint_n 1;
+            creation = Tools.eint_n 2
+        >] ->
+            Msg_alive2_rsp (result, creation)
+    and parse_port2_rsp =
+        parser [<
+            result = Tools.eint_n 1;
+            stream
+        >] ->
+            print_endline "";
+            let p = match result with
+                | 0 -> parse_port2_rsp_info
+                | _ -> begin parser [< >] -> Msg_port2_rsp None end
+            in
+            p stream
+    and parse_port2_rsp_info =
+        parser [<
+            port = Tools.eint_n 2;
+            'nodeType;
+            prot = Tools.eint_n 1;
+            vsn1 = Tools.eint_n 2;
+            vsn2 = Tools.eint_n 2;
+            nLen = Tools.eint_n 2;
+            name = Tools.string_n nLen;
+            'lsb; (* if LSB is 0, there is no more data *)
+            extr = parse_port2_rsp_info_extra lsb
+        >] ->
+            Msg_port2_rsp (Some (
+                port,
+                nodeType,
+                prot,
+                (vsn1, vsn2),
+                name,
+                extr
+            ))
+    and parse_port2_rsp_info_extra lsb =
+        match int_of_char lsb with
+            | 0 -> begin parser [< >] -> "" end
+            | _ -> begin parser [<
+                'usb;
+                extr = Tools.string_n (Tools.int_of_chars [lsb; usb])
+            >] -> extr end
+
+    let message_to_string msg = match msg with
+        | Msg_alive2_rsp (result, creation) ->
+            Printf.sprintf
+                "Alive2Rsp(%s, creation=%i)"
+                (match result with
+                    | 0 -> "OK"
+                    | n -> Printf.sprintf "ERROR(%i)" n
+                )
+                creation
+        | Msg_port2_rsp (Some _) ->
+            "Port2Rsp(OK, ...)"
+        | Msg_port2_rsp (None) ->
+            "Port2Rsp(None)"
+        | _ -> failwith "message_to_string not implemented for this message"
+
+    let _message_to_chars msg = match msg with
+        | Msg_alive2_req (
+            nodePort,
+            nodeType,
+            protocol,
+            (distrRangeMin, distrRangeMax),
+            nodeName,
+            extra
+            ) ->
+               tag_alive2_req
+            :: (Tools.chars_of_int (nodePort) 2)
+            @  nodeType
+            :: char_of_int protocol
+            :: (Tools.chars_of_int (distrRangeMin) 2)
+            @  (Tools.chars_of_int (distrRangeMax) 2)
+            @  (Tools.chars_of_int (String.length nodeName) 2)
+            @  (Tools.explode nodeName)
+            @  (Tools.chars_of_int (String.length extra) 2)
+            @  (Tools.explode extra)
+        | Msg_port2_req nodeName ->
+               tag_port2_req
+            :: (Tools.explode nodeName)
+        | _ -> failwith "_message_to_chars not implemented for this message"
+
+    let pack_msg msg =
+        let chars = _message_to_chars msg in
+        let len = List.length chars in
+        let head = Tools.chars_of_int (len) 2 in
+        let bin = Tools.implode (head @ chars) in
+        bin
+
+end (* module Packing *)
+
+
+type host_t = {
+    name: string;
+    port: int;
+}
+
+type conn_t = {
+    input: in_channel;
+    output: out_channel;
+}
+
+type t = {
+    epmd: host_t;
+    mutable node: host_t option;
+    mutable conn: conn_t option;
+}
+
+
+(* Tools to manage TCP connection *)
+
+let make_host name port =
+    {name = name; port = port}
+
+let node_name name =
+    try
+        let len = String.index name '@' in
+        String.sub name 0 len
+    with
+        Not_found -> name
+
+let _close_connection conn =
+    (match conn with
+    | Some conn ->
+        Unix.shutdown_connection conn.input;
+        Trace.dbg "Epmc" "End of connection to EPMD\n"
+    | None ->
+        ()
+    )
+    
+let _open_connection epmd =
+    let ic, oc = (
+        try
+            let addr = (Unix.gethostbyname(epmd.name)).Unix.h_addr_list.(0) in
+            let sock_addr = Unix.ADDR_INET(addr, epmd.port) in
+            Unix.open_connection sock_addr
+        with
+            Unix.Unix_error(_, _, _) ->
+                Trace.dbg "Epmc" "Cannot open connection to EPMD\n";
+                failwith "Unix error cause end of thread"
+    ) in
+    {input=ic; output=oc;}
+
+let send_recv conn req =
+    let bin = Packing.pack_msg req in
+    output_string conn.output bin;
+    flush conn.output;
+    let istream = Stream.of_channel conn.input in
+    Packing.message_of_stream istream (*TODO may raise Stream.failure *)
+
+
+(* Exchanges req/rsp defined in EPMD protocol *)
+
+let _register conn node =
+    (* send alive2 req *)
+    let req = Packing.Msg_alive2_req (
+        node.port,
+        Packing.node_type_hidden,
+        0,
+        Packing.distr_vsn_range,
+        node_name node.name,
+        "" (* extra *)
+    ) in
+    (* recv alive2 rsp *)
+    match send_recv conn req with
+        | Packing.Msg_alive2_rsp (0, creation) ->
+            Trace.dbg "Epmc" "Registered\n";
+            Some creation
+        | _ ->
+            failwith "Epmc: Registration of node failed!"
+
+let _node_info conn name =
+    (* send port2 req *)
+    let req = Packing.Msg_port2_req name in
+    (* recv port2 rsp *)
+    match send_recv conn req with
+        | Packing.Msg_port2_rsp info ->
+            Trace.dbg "Epmc" "Retrieved node info from EPMD\n";
+            info
+        | _ ->
+            failwith "Epmc: Lookup of node failed!"
+
+
+(* Visible API *)
+
+let create ?(epmdName="localhost") ?(epmdPort=4369) () = {
+    epmd = make_host epmdName epmdPort;
+    conn = None;
+    node = None;
+}
+
+let connect epmc nodeName nodePort =
+    _close_connection epmc.conn;
+    let conn = _open_connection epmc.epmd in
+    let node = make_host nodeName nodePort in
+    epmc.node <- Some node;
+    epmc.conn <- Some conn;
+    _register conn node
+
+let disconnect epmc =
+    _close_connection epmc.conn;
+    epmc.node <- None;
+    epmc.conn <- None
+
+let node_port epmc name =
+    let conn = _open_connection epmc.epmd in
+    let res = match _node_info conn name with
+        | Some (port, _, _, _, _, _) -> Some port
+        | _ -> None
+    in
+    _close_connection (Some conn);
+    res

lib/ocamerl/ocamerl/epmc.mli

+(** EPMD protocol/transport. *)
+
+type t
+
+(** Create a EPMD client. *)
+val create : ?epmdName:string -> ?epmdPort:int -> unit -> t
+
+(** Connect client to EPMD server and register node name and port; return PID creation identifier. *)
+val connect : t -> string -> int -> int option
+
+(** Disonnect client from EPMD server (thus unregister the node). *)
+val disconnect : t -> unit
+
+(** Query EPMD to get port of registered node. *)
+val node_port: t -> string -> int option

lib/ocamerl/ocamerl/eterm.ml

+let magic_version      = '\131'
+
+let magic_small_int     = '\097'
+let magic_large_int     = '\098'
+let magic_float         = '\099'
+let magic_atom_or_bool  = '\100'
+let magic_pid           = '\103'
+let magic_small_tuple   = '\104'
+let magic_large_tuple   = '\105'
+let magic_nil           = '\106'
+let magic_string        = '\107'
+let magic_list          = '\108'
+let magic_binary        = '\109'
+let magic_new_reference = '\114'
+
+
+(* Mapping of Erlang term to Ocaml type *)
+
+type t =
+    | ET_int    of Int32.t
+    | ET_float  of float
+    | ET_atom   of string
+    | ET_bool   of bool
+    | ET_string of string
+    | ET_bin    of char array
+    | ET_tuple  of t array
+    | ET_list   of t list
+    | ET_improper_list of t list * t
+    | ET_pid    of e_pid
+    | ET_ref    of e_ref
+and e_pid = 
+        string (* node name *)
+        * int  (* pid number *)
+        * int  (* serial number *)
+        * int  (* node creation ID *)
+and e_ref =
+          string
+        * Int32.t list
+        * int          (* creation *)
+
+
+(* Specialized ETerm manipulation *)
+
+let et_pid_node_name pid = match pid with
+    | ET_pid (name, _, _, _) -> name
+    | _ -> failwith "ETerm.et_pid_node_name: eterm is not a PID"
+
+let e_pid_to_string pid = match pid with
+    (nodeName, pidNum, pidSerial, nodeCreation) ->
+        Printf.sprintf
+            "PID(%s, %i, %i, %i)"
+            nodeName
+            pidNum
+            pidSerial
+            nodeCreation
+
+
+(* ETerm API *)
+
+let rec to_string t = match t with
+    | ET_int n   -> Int32.to_string n
+    | ET_float n -> string_of_float n
+    | ET_atom a  -> a
+    | ET_bool b  -> string_of_bool b
+    | ET_bin arr ->
+        Array.fold_left (fun s e -> s ^ (String.make 1 e) ^ ",") "<<" arr ^ ">>"
+    | ET_tuple arr ->
+        Array.fold_left (fun s e -> s ^ (to_string e) ^ ",") "{" arr ^ "}"
+    | ET_string s -> "\"" ^ s ^ "\""
+    | ET_list s ->
+        (List.fold_left (fun acc e -> acc ^ (to_string e) ^ ",") "[" s) ^ "]"
+    | ET_improper_list (head, tail) ->
+        (List.fold_left (fun acc e -> acc ^ to_string e) "[" head) ^ (to_string tail) ^ "]"
+    | ET_pid pid -> e_pid_to_string pid
+    | ET_ref (node, idList, creation) ->
+        Printf.sprintf
+            "Ref(%s, %s, %i)"
+            node
+            (List.fold_left
+                (fun acc e -> Printf.sprintf "%s%lu" acc e)
+                ""
+                idList
+            )
+            creation
+
+(* TODO can probably be optimized ... at least look at buffer! *)
+let rec _to_chars t = match t with
+    | ET_int n when n < 256l ->
+        magic_small_int :: [char_of_int (Int32.to_int n)]
+    | ET_int n ->
+        magic_large_int :: (Tools.chars_of_int32 n 4)
+    | ET_float f ->
+        let s = Printf.sprintf "%.20e" f in
+        let pad = String.make (31 - (String.length s)) '\000' in
+        magic_float :: (Tools.explode s) @ (Tools.explode pad)
+    | ET_atom a  ->
+        magic_atom_or_bool
+        :: (Tools.chars_of_int (String.length a) 2)
+        @ (Tools.explode a)
+    | ET_bool b ->
+        _to_chars (ET_atom (string_of_bool b))
+    | ET_tuple arr ->
+        let acc0 = if (Array.length arr) < 256
+            then magic_small_tuple :: (char_of_int (Array.length arr)) :: []
+            else magic_large_tuple :: (Tools.chars_of_int (Array.length arr) 4)
+        in
+        Array.fold_left
+            (fun acc e -> acc @ (_to_chars e))
+            acc0
+            arr
+    | ET_string s ->
+        magic_string
+        :: (Tools.chars_of_int (String.length s) 2)
+        @ (Tools.explode s)
+    | ET_bin b ->
+        magic_binary
+        :: (Tools.chars_of_int (Array.length b) 4)
+        @ (Array.to_list b) (* TODO must be better to do than array to list to string ;) *)
+    | ET_list l ->
+        magic_list
+        :: (Tools.chars_of_int (List.length l) 4)
+        @ (List.fold_left (fun acc e -> acc @ (_to_chars e)) [] l)
+        @ [magic_nil]
+    | ET_improper_list (l, tail) ->
+        magic_list
+        :: (Tools.chars_of_int (List.length l) 4)
+        @ (List.fold_left (fun acc e -> _to_chars e) [] l)
+        @ (_to_chars tail)
+    | ET_pid (node, id, serial, creation) ->
+        magic_pid
+        :: (_to_chars (ET_atom node))
+        @  (Tools.chars_of_int id 4)
+        @  (Tools.chars_of_int serial 4)
+        @  (Tools.chars_of_int creation 1)
+    | ET_ref (node, ids, creation) ->
+        magic_new_reference
+        :: (Tools.chars_of_int (List.length ids) 2)
+        @  (_to_chars (ET_atom node))
+        @  (Tools.chars_of_int creation 1)
+        @  (List.fold_left
+            (fun acc e -> (acc @ Tools.chars_of_int32 e 4))
+            []
+            ids
+        )
+
+
+let to_chars t =
+    magic_version :: (_to_chars t)
+
+let to_binary t =
+    Tools.implode (to_chars t)
+
+
+let rec of_stream =
+    parser [< 'i ; stream >] ->
+            match i with
+                | n when n = magic_version ->
+                    (parser [< t = term >] -> t) stream
+                | _ -> failwith "magic version not recognized"
+
+and term =
+    parser [< 'magic; r = magic_parse magic >] -> r
+
+and terms n l =
+    match n with
+        | 0 -> begin parser [< >] -> List.rev l end
+        | _ -> begin parser [< t = term ; r = terms (n - 1) (t::l) >] -> r end
+
+and magic_parse tag =
+    match tag with
+        | n when n = magic_small_int     -> parse_small_int
+        | n when n = magic_large_int     -> parse_large_int
+        | n when n = magic_float         -> parse_float
+        | n when n = magic_atom_or_bool  -> parse_atom_or_bool
+        | n when n = magic_small_tuple   -> parse_small_tuple
+        | n when n = magic_large_tuple   -> parse_large_tuple
+        (* TODO reference *)
+        | n when n = magic_new_reference -> parse_new_reference
+        (* TODO port *)
+        | n when n = magic_pid      -> parse_pid
+        | n when n = magic_nil     -> parse_nil
+        | n when n = magic_string  -> parse_string
+        | n when n = magic_list    -> parse_list
+        | n when n = magic_binary    -> parse_binary
+        (* TODO small big *)
+        (* TODO large big *)
+        (* TODO new cache *)
+        (* TODO cached atom *)
+        (* TODO fun *)
+        | n -> failwith (Printf.sprintf 
+            "Eterm.of_stream: term magic tag not recognized: %c"
+            n
+        )
+
+and parse_small_int =
+    parser [< i = Tools.eint32_n 1 >] ->
+        ET_int i
+
+and parse_large_int =
+    parser [< n = Tools.eint32_n 4 >] ->
+        ET_int n
+
+and parse_float =
+    parser [< s = Tools.string_n 31 >] ->
+        ET_float (Tools.float_of_padded_string s)
+
+and parse_atom_or_bool =
+    parser [< n = Tools.eint_n 2; atom = Tools.string_n n >] ->
+        match atom with
+            | "true"  -> ET_bool true
+            | "false" -> ET_bool false
+            | str     -> ET_atom str 
+
+and parse_small_tuple =
+    parser [< n = Tools.eint_n 1; a = terms n [] >] ->
+        ET_tuple (Array.of_list a)
+
+and parse_large_tuple =
+    parser [< n = Tools.eint_n 4; a = terms n [] >] ->
+        ET_tuple (Array.of_list a)
+
+and parse_string =
+    parser [< n = Tools.eint_n 2; data = Tools.string_n n >] ->
+        ET_string data
+
+and parse_binary =
+    parser [< n = Tools.eint_n 4; data = Tools.next_n n >] ->
+        ET_bin (Array.of_list data)
+
+and parse_nil =
+    parser [< >] ->
+        ET_list []
+
+and parse_list =
+    parser [< n = Tools.eint_n 4; head = terms n []; tail = term >] ->
+        match tail with
+            | ET_list [] -> ET_list head
+            | _ -> ET_improper_list (head, tail)
+
+and parse_pid =
+    parser [<
+        t = term; (* TODO specify atom? *)
+        idBytes = Tools.next_n 4;
+        serial = Tools.eint_n 4;
+        creationBytes = Tools.next_n 1
+    >] ->
+        match t with
+            | ET_atom node ->
+                let id = Tools.int_x_of_chars 28 idBytes in
+                let creation = Tools.int_x_of_chars 2 creationBytes in
+                ET_pid (node, id, serial, creation)
+            | _ ->
+                failwith "Eterm.of_stream: bad pid construct"
+
+and parse_new_reference =
+    parser [<
+        idLen = Tools.eint_n 2;
+        t = term; (* TODO specify atom? *)
+        creationBytes = Tools.next_n 1;
+        id0Bytes = Tools.next_n 4;
+        stream
+    >] ->
+        match t with
+            | ET_atom node ->
+                let creation = Tools.int_x_of_chars 2 creationBytes in
+                let id0 = Int32.of_int (Tools.int_x_of_chars 18 id0Bytes) in
+                let ids = parse_ids (idLen - 1) [id0] stream in
+                ET_ref (node, ids, creation)
+            | _ ->
+                failwith "Eterm.of_stream: bad new reference construct"
+
+and parse_ids n acc stream =
+    match n with
+        | 0 ->
+            List.rev acc
+        | n when n > 0 ->
+            begin
+            parser [< id = Tools.eint32_n 4; s >] -> (* TODO no 18 bits mask? *)
+                parse_ids (n-1) (id :: acc) s
+            end
+            stream
+        | _ ->
+            failwith "Eterm.of_stream: bad id list construct"

lib/ocamerl/ocamerl/eterm.mli

+(** Erlang external term format. *)
+
+(** {5 Main Interface} *)
+
+(** Ocaml type corresponding to Erlang type.*)
+type t =
+    ET_int of Int32.t
+  | ET_float of float
+  | ET_atom of string
+  | ET_bool of bool
+  | ET_string of string
+  | ET_bin of char array
+  | ET_tuple of t array
+  | ET_list of t list
+  | ET_improper_list of t list * t (** Internal use only. *)
+  | ET_pid of e_pid
+  | ET_ref of e_ref
+and e_pid = string * int * int * int
+and e_ref = string * Int32.t list * int
+
+(** Return a representation of an Eterm. *)
+val to_string : t -> string
+
+(** Return the encoded binary of an Eterm. *)
+val to_binary : t -> string
+
+(** Return the encoded binary of an Eterm as byte sequence. *)
+val to_chars : t -> char list
+
+(** Parse one Eterm from binary form. *)
+val of_stream : char Stream.t -> t
+
+
+(** {5 Specialized functions} *)
+
+(** {6 PID management} *)
+
+(** Return node name part of a PID Eterm. *)
+val et_pid_node_name : t -> string
+