Commits

Arkadi Shishlov committed ba769d0 Draft

Distributed GREP

Comments (0)

Files changed (8)

+.*\.iml
+\.idea
+ebin
+_.*
+reloader.erl

class/data/test.txt

+before abc@def.com after
+abc1@def.com

class/data/test2.txt

+before newabc@def.com after
+abc1@def.com

class/src/grep.app.src

+{application, grep,
+ [
+  {description, ""},
+  {vsn, "1"},
+  {registered, []},
+  {applications, [
+                  kernel,
+                  stdlib
+                 ]},
+  {mod, { grep, []}},
+  {env, []}
+ ]}.

class/src/grep.erl

+-module(grep).
+-behaviour(application).
+
+-export([start/2, stop/1, match_emails/1, email_match_from_dir/1, email_match_from_dir/2]).
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+start(_StartType, _StartArgs) ->
+    ok.
+
+stop(_State) ->
+    ok.
+
+match_emails(Text) ->
+    case re:run(Text, "[\\w\\.-]+@[\\w\\.-]+", [{capture, all, list}, global]) of
+        {match, What} -> lists:usort(lists:append(What));
+        nomatch -> []
+    end.
+
+read_file(Name) ->
+    case file:read_file(Name) of
+        {ok, Result} -> binary_to_list(Result);
+        {error, Reason} -> throw({error, Name, Reason})
+    end.
+
+%list_files(Path) ->
+%    filelib:fold_files(Path, "", true, fun(File_name, Acc)-> [File_name|Acc] end, []).
+
+
+email_match_from_dir(Path, Pattern) ->
+    lists:usort(lists:append(filelib:fold_files(Path,
+                       Pattern,
+                       true,
+                       fun(File_name, Acc)->
+                           [match_emails(read_file(File_name)) | Acc] end,
+                       []))).
+
+email_match_from_dir(Path) ->
+    email_match_from_dir(Path, "").
+
+get_worker_nodes() ->
+    net_kernel:start([test,shortnames]),
+    erlang:set_cookie(node(),dojo),
+    net_adm:ping(bootstrap@dojo),
+    timer:sleep(1000),
+    Data = nodes(),
+
+    lists:filter(fun(Atom) ->
+                        string:substr(atom_to_list(Atom), 1, 6) == "worker" end,
+                     Data).
+
+-ifdef(TEST).
+
+-define(EMAIL, "abc@def.com").
+-define(EMAIL1, "abc1@def.com").
+-define(EMAIL2, "newabc@def.com").
+-define(TESTFILE1, "../data/test.txt").
+-define(TESTFILE2, "../data/test2.txt").
+-define(DIR, "../data").
+-define(DIR2, "/usr/share/doc/").
+
+match_email_test() ->
+    Text = "before " ++ ?EMAIL ++ " after",
+    ?assertEqual([?EMAIL], match_emails(Text)).
+
+match_emails_test() ->
+    Text = "before " ++ ?EMAIL ++ " after\n new line " ++ ?EMAIL1,
+    ?assert(lists:usort([?EMAIL, ?EMAIL1]) == match_emails(Text)).
+
+match_unique_email_test() ->
+    Text = "before " ++ ?EMAIL ++ " after\n new line " ++ ?EMAIL,
+    ?assert([?EMAIL] == match_emails(Text)).
+
+file_read_test() ->
+    Text = read_file(?TESTFILE1),
+    ?assert( "before abc@def.com after\nabc1@def.com\n" == Text).
+
+email_match_from_file_test() ->
+    File = read_file(?TESTFILE1),
+    ?assert(lists:usort([?EMAIL, ?EMAIL1]) == match_emails(File)).
+
+email_match_from_dir_test() ->
+    ?assert(lists:usort([?EMAIL, ?EMAIL1, ?EMAIL2]) == email_match_from_dir(?DIR)).
+
+
+tralala_test() ->
+    %Nodes = [worker@dojo,worker2@dojo,bootstrap@dojo,worker@hal9011,worker2@hal9011,bootstrap@hal9011],
+    ?assert(lists:usort([worker@dojo,worker2@dojo,worker@hal9011,worker2@hal9011]) == lists:sort(get_worker_nodes())).
+
+-endif.

sensei/rebar.config

+{cover_enabled, true}.

sensei/src/grep.app.src

+{application, grep,
+ [
+  {description, ""},
+  {vsn, "1"},
+  {registered, []},
+  {applications, [
+                  kernel,
+                  stdlib
+                 ]},
+  {mod, { grep, []}},
+  {env, []}
+ ]}.

sensei/src/grep.erl

+-module(grep).
+
+-behaviour(application).
+
+%% Module API
+-export([start/0, stop/0, grep/1, local_grep/1, maintain_workers/0, start_worker/0]).
+
+%% OTP application behaviour callbacks
+-export([start/2, stop/1]).
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+%% OTP application behaviour callbacks
+
+start(_StartType, _StartArgs) ->
+    ok = ensure_contact(),
+    Pid = spawn(fun maintain_workers/0),
+    {ok, Pid}.
+
+stop(_State) ->
+    ok.
+
+% clustered search
+start() -> application:start(?MODULE).
+stop() -> application:stop(?MODULE).
+
+grep(Dir) ->
+    Files = scan_dir(Dir),
+    NFiles = length(Files),
+    Nodes = worker_nodes(),
+
+    {Workers, _} = lists:split(NFiles, lists:append(lists:duplicate(round(NFiles/length(Nodes))+1, Nodes))),
+
+    Tasks = lists:zip(Files, Workers),
+    lists:foreach(
+        fun({FileName, Worker}) ->
+            Text = read_file(FileName),
+            %io:fwrite("sending to ~p~n", [Worker]),
+            {grep_worker, Worker} ! {self(), Text}
+        end, Tasks),
+    lists:usort(lists:append(recv(NFiles))).
+
+recv(N) -> c:flush(), recv(N, []).
+recv(0, Acc) -> Acc;
+recv(N, Acc) ->
+    receive
+        Emails -> recv(N-1, [Emails | Acc])
+    after 1000 -> Acc
+    end.
+
+worker_nodes() ->
+    lists:filter(
+        fun (Node) ->
+            string:str(atom_to_list(Node), "worker") == 1
+        end, nodes()).
+
+maintain_workers() -> maintain_workers([]).
+maintain_workers(WorkerNodes) ->
+    AllNodes = worker_nodes(),
+    NewNodes = lists:subtract(AllNodes, WorkerNodes),
+    InitializedNodes = init_workers(NewNodes),
+    timer:sleep(1000),
+    maintain_workers(WorkerNodes ++ InitializedNodes).
+
+init_workers([]) -> [];
+init_workers([Node | Nodes]) -> [ init_worker(Node) | init_workers(Nodes) ].
+
+init_worker(Node) ->
+    io:fwrite("init_worker initializing ~p~n", [Node]),
+    {Mod, Bin, _} = code:get_object_code(?MODULE),
+    rpc:call(Node, code, purge, [Mod]),
+    rpc:call(Node, erlang, load_module, [Mod, Bin]),
+    spawn_link(Node, ?MODULE, start_worker, []),
+    io:fwrite("init_worker initialized ~p~n", [Node]),
+    Node.
+
+start_worker() ->
+    register(grep_worker, self()), % {grep_worker, Node} ! Msg
+    worker().
+
+worker() ->
+    receive
+        {From, Content} -> From ! match_emails(Content);
+        Whatever -> io:fwrite("worker unexpectedly received ~p~n", [Whatever])
+    end,
+    worker().
+
+
+% ripped from Erlang and OTP in Action
+
+ensure_contact() ->
+    ContactNodes = [bootstrap@dojo],
+    % ContactNodes = [bootstrap@dojo, bootstrap@hal9011],
+    ok = ensure_contact(ContactNodes),
+    wait_for_nodes(ContactNodes, 1000).
+
+ensure_contact(ContactNodes) ->
+    Answering = [N || N <- ContactNodes, net_adm:ping(N) =:= pong],
+    case Answering of
+        [] -> {error, no_contact_nodes_reachable};
+        _ -> wait_for_nodes(length(Answering), 1000)
+    end.
+
+wait_for_nodes(MinNodes, WaitTime) ->
+    Slices = 10,
+    SliceTime = round(WaitTime/Slices),
+    wait_for_nodes(MinNodes, SliceTime, Slices).
+
+wait_for_nodes(_MinNodes, _SliceTime, 0) -> ok;
+wait_for_nodes(MinNodes, SliceTime, Iterations) ->
+    case length(nodes()) > MinNodes of
+        true -> ok;
+        false ->
+            timer:sleep(SliceTime),
+            wait_for_nodes(MinNodes, SliceTime, Iterations - 1)
+    end.
+
+
+% regexp and directory walking
+
+-spec match_emails(string()) -> [string()].
+match_emails(Str) when is_list(Str) ->
+    case re:run(Str, "[\\w\\._-]+@[\\w\\._-]+", [{capture, all, list}, global]) of
+        {match, Emails} -> lists:append(Emails);
+        nomatch -> []
+    end.
+
+scan_dir(Dir) -> filelib:fold_files(Dir, "^\\w+$", true, fun(FileName, Acc) -> [FileName | Acc] end, []).
+
+read_file(FileName) ->
+    case file:read_file(FileName) of
+        {ok, Binary} -> binary_to_list(Binary);
+        {error, Reason} -> throw({error, Reason})
+    end.
+
+% local search
+local_grep(Dir) ->
+    Files = scan_dir(Dir),
+    AllEmails = lists:foldl(
+        fun(FileName, Acc) ->
+            Text = read_file(FileName),
+            [match_emails(Text) | Acc]
+        end, [], Files),
+    lists:usort(lists:append(AllEmails)).
+
+
+-ifdef(TEST).
+
+-define(EMAIL1, "azx@b.com").
+-define(EMAIL2, "d@efg.net").
+-define(TEXT, "some text " ++ ?EMAIL1 ++ " around\n" ++ ?EMAIL2 ++ " on second line").
+
+cluster_search_test() ->
+    net_kernel:start([eunit, shortnames]),
+    erlang:set_cookie(node(),dojo),
+    ok = application:start(?MODULE),
+    timer:sleep(1000),
+    Emails = grep("/usr/share/doc/python-gi"),
+    ?assert(length(Emails) > 0).
+
+match_emails_test() ->
+    Emails = match_emails(?TEXT),
+    ?assert([?EMAIL1, ?EMAIL2] == Emails).
+
+match_noemails_test() ->
+    Emails = match_emails("test no emails"),
+    ?assert([] == Emails).
+
+random() -> integer_to_list(random:uniform(1000000000)).
+
+os_tmp_dir() ->
+    case os:getenv("TMPDIR") of
+        false -> "/tmp";
+        Dir -> Dir
+    end.
+
+tmp_dir() ->
+    Name = os_tmp_dir() ++ "/" ++ random(),
+    case file:make_dir(Name) of
+        ok -> Name;
+        {error, Reason} -> throw({error, Reason})
+    end.
+
+tmp_file(File) ->
+    case file:open(File, write) of
+        {ok, Dev} -> Dev;
+        {error, Reason} -> throw({error, Reason})
+    end.
+
+create_file(File, Content) -> create_file(File, "~s~n", [Content]).
+create_file(File, Format, Args) ->
+    Dev = tmp_file(File),
+    io:fwrite(Dev, Format, Args).
+
+local_grep_test() ->
+    random:seed(now()),
+    Dir = tmp_dir(),
+    Name1 = Dir ++ "/" ++ random(),
+    Name2 = Dir ++ "/" ++ random(),
+    create_file(Name1, ?TEXT),
+    create_file(Name2, ?TEXT),
+
+    FileNames = scan_dir(Dir),
+    ?assert(lists:sort([Name1, Name2]) == lists:sort(FileNames)),
+    ?assert(lists:sort([?EMAIL1, ?EMAIL2]) == local_grep(Dir)),
+
+    file:delete(Name1),
+    file:delete(Name2),
+    file:del_dir(Dir).
+
+-endif.