Anonymous avatar Anonymous committed fba3d1f

Getting 'rebar eunit' to work

Comments (0)

Files changed (25)

 {application, luke,
  [{description,  "Map/Reduce Framework"},
   {vsn,          "0.2.0"},
-  {modules,      [luke, luke_flow, luke_flow_sup, luke_phase, luke_phase_sup, luke_phases, luke_sup]},
+  {modules,      [luke, luke_flow, luke_flow_sup, luke_phase, luke_phase_sup,
+                  luke_flow_cache, luke_phases, luke_sup]},
   {registered,   [luke_flow_sup, luke_phase_sup, luke_sup]},
   {applications, [kernel, stdlib, sasl]},
   {mod, {luke, []}}]}.
-{erl_opts, [debug_info, fail_on_warning]}.
+{erl_opts, [debug_info, fail_on_warning]}.
+
+{suite, luke_suite}.

src/luke_flow.erl

 -export([start_link/5,
          add_inputs/2,
          finish_inputs/1,
-         collect_output/2,
-         check_cache/2,
-         cache_value/3]).
+         collect_output/2]).
 
 %% FSM states
 -export([get_phases/1,
 collect_output(FlowId, Timeout) ->
     collect_output(FlowId, Timeout, dict:new()).
 
-%% @doc Cache value for the duration of the flow
-%% @spec cache_value(pid(), term(), term()) -> ok
-cache_value(_FlowPid, _Key, _Value) ->
-    ok.
-
-%% @doc Check flow cache for entry
-%% @spec check_cache(pid(), term()) -> not_found | term()
-check_cache(_FlowPid, _Key) ->
-    not_found.
-
 %% @doc Retrieve configured timeout for flow
 %% @spec get_timeout(pid()) -> integer()
 get_timeout(FlowPid) ->
 
 init([Client, FlowId, FlowDesc, FlowTransformer, Timeout]) ->
     process_flag(trap_exit, true),
+    {ok, CachePid} = luke_flow_cache:start_link(),
     Tref = case Timeout of
                infinity ->
                    undefined;
                    {ok, T} = timer:send_after(Timeout, flow_timeout),
                    T
            end,
-    case start_phases(FlowDesc, Timeout) of
+    case start_phases(FlowDesc, CachePid, Timeout) of
         {ok, FSMs} ->
             {ok, executing, #state{fsms=FSMs, flow_id=FlowId, flow_timeout=Timeout, client=Client, xformer=FlowTransformer, tref=Tref}};
         Error ->
     {ok, StateName, State}.
 
 %% Internal functions
-start_phases(FlowDesc, Timeout) ->
-    start_phases(lists:reverse(FlowDesc), length(FlowDesc) - 1, Timeout, []).
+start_phases(FlowDesc, CachePid, Timeout) ->
+    start_phases(lists:reverse(FlowDesc), length(FlowDesc) - 1, CachePid, Timeout, []).
 
-start_phases([], _Id, _Timeout, Accum) ->
+start_phases([], _Id, _CachePid, _Timeout, Accum) ->
     {ok, Accum};
-start_phases([{PhaseMod, Behaviors, Args}|T], Id, Timeout, Accum) ->
+start_phases([{PhaseMod, Behaviors, Args}|T], Id, CachePid, Timeout, Accum) ->
     NextFSM = next_fsm(Accum),
     case proplists:get_value(converge, Behaviors) of
         undefined ->
-            case luke_phase_sup:new_phase(Id, PhaseMod, Behaviors, NextFSM, self(), Timeout, Args) of
+            case luke_phase_sup:new_phase(Id, PhaseMod, Behaviors, NextFSM, self(),
+                                          CachePid, Timeout, Args) of
                 {ok, Pid} ->
                     erlang:link(Pid),
-                    start_phases(T, Id - 1, Timeout, [Pid|Accum]);
+                    start_phases(T, Id - 1, CachePid, Timeout, [Pid|Accum]);
                 Error ->
                     Error
             end;
         InstanceCount ->
-            Pids = start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, self(), Timeout, Args, InstanceCount),
-            start_phases(T, Id - 1, Timeout, [Pids|Accum])
+            Pids = start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, self(), CachePid,
+                                           Timeout, Args, InstanceCount),
+            start_phases(T, Id - 1, CachePid, Timeout, [Pids|Accum])
     end.
 
 collect_output(FlowId, Timeout, Accum) ->
          end
  end.
 
-start_converging_phases(Id, PhaseMod, Behaviors0, NextFSM, Flow, Timeout, Args, Count) ->
+start_converging_phases(Id, PhaseMod, Behaviors0, NextFSM, Flow, CachePid,
+                        Timeout, Args, Count) ->
     Behaviors = [normalize_behavior(B) || B <- Behaviors0],
-    Pids = start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count, []),
+    Pids = start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, CachePid,
+                                   Timeout, Args, Count, []),
     [Leader|_] = Pids,
     lists:foreach(fun(P) -> luke_phase:partners(P, Leader, Pids) end, Pids),
     Pids.
 
-start_converging_phases(_Id, _PhaseMod, _Behaviors, _NextFSM, _Flow, _Timeout, _Args, 0, Accum) ->
+start_converging_phases(_Id, _PhaseMod, _Behaviors, _NextFSM, _Flow, _CachePid,
+                        _Timeout, _Args, 0, Accum) ->
     Accum;
-start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count, Accum) ->
-    case luke_phase_sup:new_phase(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args) of
+start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, CachePid,
+                        Timeout, Args, Count, Accum) ->
+    case luke_phase_sup:new_phase(Id, PhaseMod, Behaviors, NextFSM, Flow, CachePid,
+                                  Timeout, Args) of
         {ok, Pid} ->
             erlang:link(Pid),
-            start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count - 1, [Pid|Accum]);
+            start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, CachePid,
+                                    Timeout, Args, Count - 1, [Pid|Accum]);
         Error ->
             throw(Error)
     end.

src/luke_phase.erl

 
 -behaviour(gen_fsm).
 
+-include_lib("eunit/include/eunit.hrl").
+
 %% API
--export([start_link/7,
+-export([start_link/8,
          complete/0,
          partners/3,
          cache_value/2,
 behaviour_info(_) ->
     undefined.
 
-start_link(PhaseMod, Id, Behaviors, NextPhases, Flow, Timeout, PhaseArgs) ->
-    gen_fsm:start_link(?MODULE, [Id, PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs], []).
+start_link(PhaseMod, Id, Behaviors, NextPhases, Flow, CachePid,
+           Timeout, PhaseArgs) ->
+    gen_fsm:start_link(?MODULE, [Id, PhaseMod, Behaviors, NextPhases, Flow, CachePid,
+                                 Timeout, PhaseArgs], []).
 
 complete() ->
     gen_fsm:send_event(self(), complete).
     gen_fsm:send_event(PhasePid, {partners, Leader, Partners}).
 
 cache_value(Key, Value) ->
-    case erlang:get(luke_flow_pid) of
+    case erlang:get(flow_cache_pid) of
         undefined ->
-            {error, no_flow_pid};
-        FlowPid ->
-            luke_flow:cache_value(FlowPid, Key, Value)
+            ok;
+        CachePid ->
+            luke_flow_cache:cache_value(CachePid, Key, Value)
     end.
 
 check_cache(Key) ->
-    case erlang:get(luke_flow_pid) of
+    case erlang:get(flow_cache_pid) of
         undefined ->
-            {error, no_flow_pid};
-        FlowPid ->
-            luke_flow:check_cache(FlowPid, Key)
+            not_found;
+        CachePid ->
+            luke_flow_cache:check_cache(CachePid, Key)
     end.
 
-init([Id, PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs]) ->
-    erlang:put(luke_flow_pid, Flow),
+init([Id, PhaseMod, Behaviors, NextPhases, Flow, CachePid, Timeout, PhaseArgs]) ->
+    erlang:put(flow_cache_pid, CachePid),
     case PhaseMod:init(PhaseArgs) of
         {ok, ModState} ->
             Accumulate = lists:member(accumulate, Behaviors),

src/luke_phase_sup.erl

 
 %% API
 -export([start_link/0,
-         new_phase/7]).
+         new_phase/8]).
 
 %% Supervisor callbacks
 -export([init/1]).
 
-new_phase(Id, PhaseMod, Behavior, NextPhases, Flow, Timeout, PhaseArgs) when is_atom(PhaseMod),
-                                                                             is_list(PhaseArgs) ->
-    start_child(PhaseMod, [Id, Behavior, NextPhases, Flow, Timeout, PhaseArgs]).
+new_phase(Id, PhaseMod, Behavior, NextPhases, Flow, CachePid,
+          Timeout, PhaseArgs) when is_atom(PhaseMod),
+                                   is_list(PhaseArgs) ->
+    start_child(PhaseMod, [Id, Behavior, NextPhases, Flow, CachePid, Timeout, PhaseArgs]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+{"*", [warn_obsolete_guard, warn_unused_import,
+       warn_shadow_vars, warn_export_vars, debug_info,
+       {i, "../include"},
+       {pa, "../ebin"},
+       {outdir, "../tests_ebin"}]}.

test/async_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(async_phase).
+
+-behaviour(luke_phase).
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, State, _Timeout) ->
+    {output, Inputs, State}.
+
+handle_input_done(State) ->
+    luke_phase:complete(),
+    {no_output, State}.
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.

test/lifecycle_tests.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(lifecycle_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+-include("tests.hrl").
+
+setup_teardown_test_() ->
+    [fun() ->
+             %% Startup/teardown, no input
+             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             exit(Pid, kill),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             %% Startup/teardown, infinity timeout, no input
+             {ok, Pid} = luke:new_flow(self(), make_ref(), ?TWO_PHASE_FLOW, infinity),
+             Phases = test_util:verify_phases(Pid, 2),
+             exit(Pid, kill),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             %% Startup/teardown, input w/no end
+             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             luke_flow:add_inputs(Pid, [100]),
+             exit(Pid, kill),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             %% Startup/teardown, input w/finish
+             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             luke_flow:add_inputs(Pid, [100]),
+             luke_flow:add_inputs(Pid, [200]),
+             luke_flow:finish_inputs(Pid),
+             exit(Pid, kill),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end].

test/luke_suite.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(test_suite).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all_test_() ->
+    [{module, lifecycle_tests},
+     {module, results_tests},
+     {module, cache_tests}].

test/map_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(map_phase).
+
+-behaviour(luke_phase).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {workers=[], done=false}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, #state{workers=Workers}=State, _Timeout) ->
+    Worker = worker(self(), length(Inputs)),
+    {no_output, State#state{workers=[Worker|Workers]}}.
+
+handle_input_done(#state{workers=[]}=State) ->
+    luke_phase:complete(),
+    {no_output, State};
+handle_input_done(State) ->
+    {no_output, State#state{done=true}}.
+
+handle_event({mapexec_results, Worker, Data}, #state{done=Done, workers=Workers0}=State) ->
+    Workers = lists:delete(Worker, Workers0),
+    case Done =:= true andalso length(Workers) == 0 of
+        true ->
+            luke_phase:complete();
+        false ->
+            ok
+    end,
+    {output, Data, State#state{workers=Workers}};
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+worker(Phase, Size) ->
+    spawn(fun() ->
+                  Data = generate_data(Size),
+                  gen_fsm:send_event(Phase, {mapexec_results, self(), Data}) end).
+
+generate_data(Size) ->
+    generate_data(Size, []).
+
+generate_data(0, Accum) ->
+    Accum;
+generate_data(Size, Accum) ->
+    generate_data(Size - 1, [5|Accum]).

test/reduce_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(reduce_phase).
+
+-behaviour(luke_phase).
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {reduced=0}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, #state{reduced=Reduced}=State, _Timeout) ->
+    {no_output, State#state{reduced=lists:sum(Inputs ++ [Reduced])}}.
+
+handle_input_done(#state{reduced=Reduced}=State) ->
+    luke_phase:complete(),
+    {output, [Reduced], State}.
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.

test/results_tests.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(results_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+-include("tests.hrl").
+
+all_test_() ->
+    [fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_PHASE_FLOW),
+             luke_flow:add_inputs(Pid, ["hello"]),
+             test_util:verify_results(FlowId, none),
+             exit(Pid, kill),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_PHASE_FLOW),
+             luke_flow:add_inputs(Pid, ["hello"]),
+             luke_flow:finish_inputs(Pid),
+             {ok, Results} = test_util:verify_results(FlowId, results),
+             test_util:verify_results(FlowId, done),
+             ?assertMatch(["hello"], Results),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_ASYNC_FLOW),
+             luke_flow:add_inputs(Pid, "testing"),
+             {ok, "testing"} = test_util:verify_results(FlowId, results),
+             luke_flow:add_inputs(Pid, [100, 200]),
+             {ok, [100, 200]} = test_util:verify_results(FlowId, results),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_FLOW),
+             luke_flow:add_inputs(Pid, [a,b]),
+             {ok, [5,5]} = test_util:verify_results(FlowId, results),
+             test_util:verify_results(FlowId, none),
+             luke_flow:add_inputs(Pid, [a,b]),
+             {ok, [5,5]} = test_util:verify_results(FlowId, results),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_FLOW),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             ?assertMatch({ok, [5,5,5,5]}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_DBL_FLOW),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             ?assertMatch({ok, [[5,5,5,5],[5,5,5,5]]}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_FLOW),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:finish_inputs(Pid),
+             ?assertMatch({ok, [20]}, luke_flow:collect_output(FlowId, 500)),
+             test_util:verify_results(FlowId, none),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_FLOW1),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:finish_inputs(Pid),
+             ?assertMatch({ok, [20]}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_EMPTY),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:finish_inputs(Pid),
+             ?assertMatch({ok, []}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             test_util:assertDead([Pid|Phases]) end].

test/simple_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(simple_phase).
+
+-behaviour(luke_phase).
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {inputs=[]}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, #state{inputs=Inputs0}=State, _Timeout) ->
+    {no_output, State#state{inputs=Inputs0 ++ Inputs}}.
+
+handle_input_done(State) ->
+    luke_phase:complete(),
+    {output, State#state.inputs, State}.
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.

test/test_util.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(test_util).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([start_flow/1, verify_phases/2, verify_results/2, assertDead/1]).
+
+start_flow(FlowDesc) ->
+    FlowId = make_ref(),
+    {ok, Pid} = luke:new_flow(FlowId, FlowDesc),
+    Phases = test_util:verify_phases(Pid, length(FlowDesc)),
+    {FlowId, Pid, Phases}.
+
+
+verify_phases(Pid, Size) ->
+    Phases = luke_flow:get_phases(Pid),
+    ?assertEqual(Size, length(Phases)),
+    Phases.
+
+verify_results(FlowId, none) ->
+    receive
+        {flow_results, FlowId, done} ->
+            throw({error, unexpected_done});
+        {flow_results, _PhaseId, FlowId, Results} ->
+            throw({error, unexpected_results, Results})
+    after 100 ->
+            ok
+    end;
+verify_results(FlowId, results) ->
+    receive
+        {flow_results, FlowId, done} ->
+            throw({error, unexpected_done});
+        {flow_results, _PhaseId, FlowId, Results} ->
+            {ok, Results}
+    after 100 ->
+            throw({error, no_results})
+    end;
+verify_results(FlowId, done) ->
+    receive
+        {flow_results, FlowId, done} ->
+            ok;
+        {flow_results, _PhaseId, FlowId, Results} ->
+            throw({error, unexpected_results, Results})
+    after 100 ->
+            throw({error, no_results})
+    end.
+
+assertDead(Pids)->
+    timer:sleep(25),
+    assertDead0(Pids).
+
+assertDead0([]) ->
+    ok;
+assertDead0([H|T]) when is_list(H) ->
+    ok = assertDead0(H),
+    assertDead0(T);
+assertDead0([H|T]) when is_pid(H) ->
+    ?assertMatch(false, erlang:is_process_alive(H)),
+    assertDead0(T).
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-define(TWO_PHASE_FLOW, [{simple_phase, [], []},
+                         {simple_phase, [accumulate], []}]).
+
+-define(TWO_ASYNC_FLOW, [{async_phase, [], []},
+                         {async_phase, [accumulate], []}]).
+
+-define(MAP_FLOW, [{map_phase, [accumulate], []}]).
+
+-define(MAP_DBL_FLOW, [{map_phase, [accumulate], []},
+                       {map_phase, [accumulate], []}]).
+
+-define(MAPRED_FLOW, [{map_phase, [], []},
+                      {reduce_phase, [{converge, 3}], []},
+                      {reduce_phase, [accumulate], []}]).
+
+-define(MAPRED_FLOW1, [{map_phase, [], []},
+                       {reduce_phase, [{converge, 1}, accumulate], []}]).
+
+-define(MAPRED_EMPTY, [{map_phase, [], []},
+                       {reduce_phase, [{converge, 1}], []}]).

tests/Emakefile

-{"*", [warn_obsolete_guard, warn_unused_import,
-       warn_shadow_vars, warn_export_vars, debug_info,
-       {i, "../include"},
-       {pa, "../ebin"},
-       {outdir, "../tests_ebin"}]}.

tests/async_phase.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(async_phase).
-
--behaviour(luke_phase).
-
--export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
-         handle_timeout/1, handle_info/2, terminate/2]).
-
--record(state, {}).
-
-init([]) ->
-    {ok, #state{}}.
-
-handle_input(Inputs, State, _Timeout) ->
-    {output, Inputs, State}.
-
-handle_input_done(State) ->
-    luke_phase:complete(),
-    {no_output, State}.
-
-handle_event(_Event, State) ->
-    {no_output, State}.
-
-handle_timeout(State) ->
-    {no_output, State}.
-
-handle_info(_Info, State) ->
-    {no_output, State}.
-
-terminate(_Reason, _State) ->
-    ok.

tests/lifecycle_tests.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(lifecycle_tests).
-
--include_lib("eunit/include/eunit.hrl").
--include("tests.hrl").
-
-setup_teardown_test_() ->
-    [fun() ->
-             %% Startup/teardown, no input
-             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
-             Phases = test_util:verify_phases(Pid, 2),
-             exit(Pid, kill),
-             timer:sleep(10),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             %% Startup/teardown, infinity timeout, no input
-             {ok, Pid} = luke:new_flow(self(), make_ref(), ?TWO_PHASE_FLOW, infinity),
-             Phases = test_util:verify_phases(Pid, 2),
-             exit(Pid, kill),
-             timer:sleep(10),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             %% Startup/teardown, input w/no end
-             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
-             Phases = test_util:verify_phases(Pid, 2),
-             luke_flow:add_inputs(Pid, [100]),
-             exit(Pid, kill),
-             timer:sleep(10),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             %% Startup/teardown, input w/finish
-             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
-             Phases = test_util:verify_phases(Pid, 2),
-             luke_flow:add_inputs(Pid, [100]),
-             luke_flow:add_inputs(Pid, [200]),
-             luke_flow:finish_inputs(Pid),
-             exit(Pid, kill),
-             timer:sleep(10),
-             test_util:assertDead([Pid|Phases]) end].

tests/map_phase.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(map_phase).
-
--behaviour(luke_phase).
-
--include_lib("eunit/include/eunit.hrl").
-
--export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
-         handle_timeout/1, handle_info/2, terminate/2]).
-
--record(state, {workers=[], done=false}).
-
-init([]) ->
-    {ok, #state{}}.
-
-handle_input(Inputs, #state{workers=Workers}=State, _Timeout) ->
-    Worker = worker(self(), length(Inputs)),
-    {no_output, State#state{workers=[Worker|Workers]}}.
-
-handle_input_done(#state{workers=[]}=State) ->
-    luke_phase:complete(),
-    {no_output, State};
-handle_input_done(State) ->
-    {no_output, State#state{done=true}}.
-
-handle_event({mapexec_results, Worker, Data}, #state{done=Done, workers=Workers0}=State) ->
-    Workers = lists:delete(Worker, Workers0),
-    case Done =:= true andalso length(Workers) == 0 of
-        true ->
-            luke_phase:complete();
-        false ->
-            ok
-    end,
-    {output, Data, State#state{workers=Workers}};
-
-handle_event(_Event, State) ->
-    {no_output, State}.
-
-handle_timeout(State) ->
-    {no_output, State}.
-
-handle_info(_Info, State) ->
-    {no_output, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-worker(Phase, Size) ->
-    spawn(fun() ->
-                  Data = generate_data(Size),
-                  gen_fsm:send_event(Phase, {mapexec_results, self(), Data}) end).
-
-generate_data(Size) ->
-    generate_data(Size, []).
-
-generate_data(0, Accum) ->
-    Accum;
-generate_data(Size, Accum) ->
-    generate_data(Size - 1, [5|Accum]).

tests/reduce_phase.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(reduce_phase).
-
--behaviour(luke_phase).
-
--export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
-         handle_timeout/1, handle_info/2, terminate/2]).
-
--record(state, {reduced=0}).
-
-init([]) ->
-    {ok, #state{}}.
-
-handle_input(Inputs, #state{reduced=Reduced}=State, _Timeout) ->
-    {no_output, State#state{reduced=lists:sum(Inputs ++ [Reduced])}}.
-
-handle_input_done(#state{reduced=Reduced}=State) ->
-    luke_phase:complete(),
-    {output, [Reduced], State}.
-
-handle_event(_Event, State) ->
-    {no_output, State}.
-
-handle_timeout(State) ->
-    {no_output, State}.
-
-handle_info(_Info, State) ->
-    {no_output, State}.
-
-terminate(_Reason, _State) ->
-    ok.

tests/results_tests.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(results_tests).
-
--include_lib("eunit/include/eunit.hrl").
--include("tests.hrl").
-
-all_test_() ->
-    [fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_PHASE_FLOW),
-             luke_flow:add_inputs(Pid, ["hello"]),
-             test_util:verify_results(FlowId, none),
-             exit(Pid, kill),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_PHASE_FLOW),
-             luke_flow:add_inputs(Pid, ["hello"]),
-             luke_flow:finish_inputs(Pid),
-             {ok, Results} = test_util:verify_results(FlowId, results),
-             test_util:verify_results(FlowId, done),
-             ?assertMatch(["hello"], Results),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?THREE_PHASE_NESTED_FLOW),
-             luke_flow:add_inputs(Pid, ["hello", "goodbye"]),
-             luke_flow:finish_inputs(Pid),
-             ?assertMatch({ok, [["hello", "goodbye"], ["hello", "goodbye"], ["hello", "goodbye"]]}, luke_flow:collect_output(FlowId, 100)),
-             test_util:verify_results(FlowId, none),
-             test_util:assertDead([Pid|Phases]) end,
-
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_ASYNC_FLOW),
-             luke_flow:add_inputs(Pid, "testing"),
-             {ok, "testing"} = test_util:verify_results(FlowId, results),
-             luke_flow:add_inputs(Pid, [100, 200]),
-             {ok, [100, 200]} = test_util:verify_results(FlowId, results),
-             luke_flow:finish_inputs(Pid),
-             test_util:verify_results(FlowId, done),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_FLOW),
-             luke_flow:add_inputs(Pid, [a,b]),
-             {ok, [5,5]} = test_util:verify_results(FlowId, results),
-             test_util:verify_results(FlowId, none),
-             luke_flow:add_inputs(Pid, [a,b]),
-             {ok, [5,5]} = test_util:verify_results(FlowId, results),
-             luke_flow:finish_inputs(Pid),
-             test_util:verify_results(FlowId, done),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_FLOW),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:add_inputs(Pid, [a,b]),
-             ?assertMatch({ok, [5,5,5,5]}, luke_flow:collect_output(FlowId, 100)),
-             test_util:verify_results(FlowId, none),
-             luke_flow:finish_inputs(Pid),
-             test_util:verify_results(FlowId, done),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_DBL_FLOW),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:add_inputs(Pid, [a,b]),
-             ?assertMatch({ok, [[5,5,5,5],[5,5,5,5]]}, luke_flow:collect_output(FlowId, 100)),
-             test_util:verify_results(FlowId, none),
-             luke_flow:finish_inputs(Pid),
-             test_util:verify_results(FlowId, done),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_FLOW),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:finish_inputs(Pid),
-             ?assertMatch({ok, [20]}, luke_flow:collect_output(FlowId, 500)),
-             test_util:verify_results(FlowId, none),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_FLOW1),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:finish_inputs(Pid),
-             ?assertMatch({ok, [20]}, luke_flow:collect_output(FlowId, 100)),
-             test_util:verify_results(FlowId, none),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_EMPTY),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:finish_inputs(Pid),
-             ?assertMatch({ok, []}, luke_flow:collect_output(FlowId, 100)),
-             test_util:verify_results(FlowId, none),
-             test_util:assertDead([Pid|Phases]) end].

tests/simple_phase.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(simple_phase).
-
--behaviour(luke_phase).
-
--export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
-         handle_timeout/1, handle_info/2, terminate/2]).
-
--record(state, {inputs=[]}).
-
-init([]) ->
-    {ok, #state{}}.
-
-handle_input(Inputs, #state{inputs=Inputs0}=State, _Timeout) ->
-    {no_output, State#state{inputs=Inputs0 ++ Inputs}}.
-
-handle_input_done(State) ->
-    luke_phase:complete(),
-    {output, State#state.inputs, State}.
-
-handle_event(_Event, State) ->
-    {no_output, State}.
-
-handle_timeout(State) ->
-    {no_output, State}.
-
-handle_info(_Info, State) ->
-    {no_output, State}.
-
-terminate(_Reason, _State) ->
-    ok.

tests/test_suite.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(test_suite).
-
--include_lib("eunit/include/eunit.hrl").
-
-all_test_() ->
-    [{module, lifecycle_tests},
-     {module, results_tests}].

tests/test_util.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(test_util).
-
--include_lib("eunit/include/eunit.hrl").
-
--export([start_flow/1, verify_phases/2, verify_results/2, assertDead/1]).
-
-start_flow(FlowDesc) ->
-    FlowId = make_ref(),
-    {ok, Pid} = luke:new_flow(FlowId, FlowDesc),
-    Phases = test_util:verify_phases(Pid, length(FlowDesc)),
-    {FlowId, Pid, Phases}.
-
-
-verify_phases(Pid, Size) ->
-    Phases = luke_flow:get_phases(Pid),
-    ?assertEqual(Size, length(Phases)),
-    Phases.
-
-verify_results(FlowId, none) ->
-    receive
-        {flow_results, FlowId, done} ->
-            throw({error, unexpected_done});
-        {flow_results, _PhaseId, FlowId, Results} ->
-            throw({error, unexpected_results, Results})
-    after 100 ->
-            ok
-    end;
-verify_results(FlowId, results) ->
-    receive
-        {flow_results, FlowId, done} ->
-            throw({error, unexpected_done});
-        {flow_results, _PhaseId, FlowId, Results} ->
-            {ok, Results}
-    after 100 ->
-            throw({error, no_results})
-    end;
-verify_results(FlowId, done) ->
-    receive
-        {flow_results, FlowId, done} ->
-            ok;
-        {flow_results, _PhaseId, FlowId, Results} ->
-            throw({error, unexpected_results, Results})
-    after 100 ->
-            throw({error, no_results})
-    end.
-
-assertDead(Pids)->
-    timer:sleep(25),
-    assertDead0(Pids).
-
-assertDead0([]) ->
-    ok;
-assertDead0([H|T]) when is_list(H) ->
-    ok = assertDead0(H),
-    assertDead0(T);
-assertDead0([H|T]) when is_pid(H) ->
-    ?assertMatch(false, erlang:is_process_alive(H)),
-    assertDead0(T).

tests/tests.hrl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--define(TWO_PHASE_FLOW, [{simple_phase, [], []},
-                         {simple_phase, [accumulate], []}]).
-
--define(THREE_PHASE_NESTED_FLOW, [{nested_phase, [accumulate], []},
-                                  {simple_phase, [accumulate], []},
-                                  {simple_phase, [accumulate], []}]).
-
-
--define(TWO_ASYNC_FLOW, [{async_phase, [], []},
-                         {async_phase, [accumulate], []}]).
-
--define(MAP_FLOW, [{map_phase, [accumulate], []}]).
-
--define(MAP_DBL_FLOW, [{map_phase, [accumulate], []},
-                       {map_phase, [accumulate], []}]).
-
--define(MAPRED_FLOW, [{map_phase, [], []},
-                      {reduce_phase, [{converge, 3}], []},
-                      {reduce_phase, [accumulate], []}]).
-
--define(MAPRED_FLOW1, [{map_phase, [], []},
-                       {reduce_phase, [{converge, 1}, accumulate], []}]).
-
--define(MAPRED_EMPTY, [{map_phase, [], []},
-                       {reduce_phase, [{converge, 1}], []}]).
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.