Commits

Anonymous committed d95d876

Adding new dataflow-lite framework to riak

Comments (0)

Files changed (20)

apps/luke/Makefile

+all:
+	./rebar compile
+
+clean:
+	rm -rf tests_ebin docs
+	./rebar clean
+
+test: all
+	@mkdir -p tests_ebin
+	@cd tests;erl -make
+	@erl -noshell -boot start_sasl -pa ebin -pa tests_ebin -s luke -eval 'test_suite:test().' -s init stop
+	@rm -f ebin/test_* ebin/*_tests.erl
+
+#docs: all
+#	@mkdir -p docs
+#	@./build_docs.sh

apps/luke/ebin/luke.app

+% -*- mode: erlang -*-
+{application, luke,
+ [{description,  "Map/Reduce Framework"},
+  {vsn,          "0.1"},
+  {modules,      [luke, luke_flow, luke_flow_sup, luke_phase, luke_phase_sup, luke_phases, luke_sup]},
+  {registered,   [luke_flow_sup, luke_phase_sup, luke_sup]},
+  {applications, [kernel, stdlib, sasl]},
+  {mod, {luke, []}}]}.

Binary file added.

apps/luke/rebar.config

+{lib_dirs, [".."]}.

apps/luke/src/luke.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke).
+
+-behaviour(application).
+
+-define(DEFAULT_TIMEOUT, 60000).
+
+%% Application callbacks
+-export([start/0, start/2, stop/1, new_flow/2, new_flow/3, new_flow/4]).
+
+new_flow(FlowId, FlowDesc) ->
+    new_flow(self(), FlowId, FlowDesc, ?DEFAULT_TIMEOUT).
+
+new_flow(Client, FlowId, FlowDesc) ->
+    new_flow(Client, FlowId, FlowDesc, ?DEFAULT_TIMEOUT).
+
+new_flow(Client, FlowId, FlowDesc, Timeout) ->
+    luke_flow_sup:new_flow(Client, FlowId, FlowDesc, Timeout).
+
+start() ->
+    application:start(luke).
+
+start(_StartType, _StartArgs) ->
+    luke_sup:start_link().
+
+stop(_State) ->
+    ok.

apps/luke/src/luke_flow.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke_flow).
+
+-behaviour(gen_fsm).
+
+%% API
+-export([start_link/4, add_inputs/2, finish_inputs/1, collect_output/2]).
+
+%% FSM states
+-export([get_phases/1, executing/2, executing/3]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3,
+         handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
+
+-record(state, {flow_id, fsms, client, timeout, results=[]}).
+
+add_inputs(FlowPid, Inputs) ->
+    gen_fsm:send_event(FlowPid, {inputs, Inputs}).
+
+finish_inputs(FlowPid) ->
+    gen_fsm:send_event(FlowPid, inputs_done).
+
+collect_output(FlowId, Timeout) ->
+    collect_output(FlowId, Timeout, []).
+
+get_phases(FlowPid) ->
+    gen_fsm:sync_send_event(FlowPid, get_phases).
+
+start_link(Client, FlowId, FlowDesc, Timeout) when is_list(FlowDesc),
+                                                   is_pid(Client) ->
+    gen_fsm:start_link(?MODULE, [Client, FlowId, FlowDesc, Timeout], []).
+
+init([Client, FlowId, FlowDesc, Timeout0]) ->
+    Timeout = erlang:trunc(Timeout0 * 1.1),
+    case start_phases(FlowDesc, Timeout) of
+        {ok, FSMs} ->
+            {ok, executing, #state{fsms=FSMs, flow_id=FlowId, timeout=Timeout, client=Client}, Timeout};
+        Error ->
+            {stop, Error}
+    end.
+
+executing({inputs, Inputs}, #state{fsms=[H|_], timeout=Timeout}=State) ->
+    luke_phases:send_inputs(H, Inputs),
+    {next_state, executing, State, Timeout};
+executing(inputs_done, #state{fsms=[H|_], timeout=Timeout}=State) ->
+    luke_phases:send_inputs_done(H),
+    {next_state, executing, State, Timeout};
+executing(timeout, #state{client=Client, flow_id=FlowId}=State) ->
+    Client ! {flow_results, FlowId, done},
+    {stop, normal, State};
+executing({results, done}, #state{client=Client, flow_id=FlowId}=State) ->
+    Client ! {flow_results, FlowId, done},
+    {stop, normal, State};
+executing({results, Result}, #state{client=Client, flow_id=FlowId, timeout=Timeout}=State) ->
+    Client ! {flow_results, FlowId, Result},
+    {next_state, executing, State, Timeout}.
+
+executing(get_phases, _From, #state{fsms=FSMs}=State) ->
+    {reply, FSMs, executing, State}.
+
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+handle_sync_event(_Event, _From, StateName, State) ->
+    {reply, ignored, StateName, State}.
+
+handle_info({'DOWN', _MRef, _Type, Pid, Reason}, StateName, #state{client=Client, fsms=FSMs, timeout=Timeout}=State) ->
+    case lists:member(Pid, FSMs) of
+        false ->
+            {next_state, StateName, State, Timeout};
+        true ->
+            if
+                Reason =:= normal ->
+                    {next_state, StateName, State#state{fsms=lists:delete(Pid, FSMs)}, Timeout};
+                true ->
+                    Client ! {flow_error, Reason},
+                    {stop, normal, State}
+            end
+    end;
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%% Internal functions
+start_phases(FlowDesc, Timeout) ->
+    PerPhaseTimeout = erlang:trunc(Timeout / length(FlowDesc)),
+    start_phases(lists:reverse(FlowDesc), PerPhaseTimeout, []).
+
+start_phases([], _Timeout, Accum) ->
+    {ok, Accum};
+start_phases([{PhaseMod, Accumulates, Args}|T], Timeout, Accum) ->
+    NextFSM = if
+                  length(Accum) == 0 ->
+                      undefined;
+                  true ->
+                      hd(Accum)
+              end,
+    case luke_phase_sup:new_phase(PhaseMod, Accumulates, NextFSM, self(), Timeout, Args) of
+        {ok, Pid} ->
+            erlang:monitor(process, Pid),
+            start_phases(T, Timeout, [Pid|Accum]);
+        Error ->
+            Error
+    end.
+
+collect_output(FlowId, Timeout, Accum) ->
+    receive
+        {flow_results, FlowId, done} ->
+            {ok, lists:flatten(lists:reverse(Accum))};
+        {flow_results, FlowId, Results} ->
+            collect_output(FlowId, Timeout, [Results|Accum])
+    after Timeout ->
+            if
+                length(Accum) == 0 ->
+                    {error, timeout};
+                true ->
+                    {ok, lists:flatten(lists:reverse(Accum))}
+            end
+    end.

apps/luke/src/luke_flow_sup.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke_flow_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0, new_flow/4]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+new_flow(Client, FlowId, FlowDesc, Timeout) ->
+    start_child(Client, FlowId, FlowDesc, Timeout).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    SupFlags = {simple_one_for_one, 0, 1},
+    Process = {undefined,
+               {luke_flow, start_link, []},
+               temporary, brutal_kill, worker, dynamic},
+    {ok, {SupFlags, [Process]}}.
+
+%% Internal functions
+start_child(Client, FlowId, FlowDesc, Timeout) ->
+  supervisor:start_child(?MODULE, [Client, FlowId, FlowDesc, Timeout]).

apps/luke/src/luke_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke_phase).
+
+-behaviour(gen_fsm).
+
+%% API
+-export([start_link/6, complete/0]).
+
+%% Behaviour
+-export([behaviour_info/1]).
+
+%% States
+-export([executing/2]).
+
+%% gen_fsm callbacks
+-export([init/1, handle_event/3,
+         handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
+
+-record(state, {mod, modstate, accumulates, next_phase, flow, timeout, cb_timeout=false}).
+
+behaviour_info(callbacks) ->
+  [{init, 1},
+   {handle_timeout, 1},
+   {handle_input, 3},
+   {handle_input_done, 1},
+   {handle_event, 2},
+   {handle_info, 2},
+   {terminate, 2}];
+behaviour_info(_) ->
+    undefined.
+
+start_link(PhaseMod, Accumulates, NextPhase, Flow, Timeout, PhaseArgs) ->
+    gen_fsm:start_link(?MODULE, [PhaseMod, Accumulates, NextPhase, Flow, Timeout, PhaseArgs], []).
+
+complete() ->
+    gen_fsm:send_event(self(), complete).
+
+init([PhaseMod, Accumulates, NextPhase, Flow, Timeout, PhaseArgs]) ->
+    case PhaseMod:init(PhaseArgs) of
+        {ok, ModState} ->
+            erlang:monitor(process, Flow),
+            {ok, executing, #state{mod=PhaseMod, modstate=ModState, next_phase=NextPhase,
+                                   flow=Flow, accumulates=Accumulates, timeout=Timeout}, Timeout};
+        {stop, Reason} ->
+            {stop, Reason}
+    end.
+
+
+executing({inputs, Input}, #state{mod=PhaseMod, modstate=ModState, timeout=Timeout}=State) ->
+    handle_callback(PhaseMod:handle_input(Input, ModState, Timeout), State);
+executing(inputs_done, #state{mod=PhaseMod, modstate=ModState}=State) ->
+    handle_callback(PhaseMod:handle_input_done(ModState), State);
+executing(complete, #state{flow=Flow, next_phase=Next}=State) ->
+    case Next of
+        undefined ->
+            luke_phases:send_flow_complete(Flow);
+        _ ->
+            luke_phases:send_inputs_done(Next)
+    end,
+    {stop, normal, State};
+executing(timeout, #state{cb_timeout=true, mod=Mod, modstate=ModState}=State) ->
+    handle_callback(Mod:handle_timeout(ModState), State#state{cb_timeout=false});
+executing(timeout, #state{cb_timeout=false}=State) ->
+    {stop, normal, State};
+executing(Event, #state{mod=PhaseMod, modstate=ModState}=State) ->
+    handle_callback(PhaseMod:handle_event(Event, ModState), State).
+
+handle_event(_Event, StateName, #state{timeout=Timeout}=State) ->
+    {next_state, StateName, State, Timeout}.
+
+handle_sync_event(_Event, _From, StateName, #state{timeout=Timeout}=State) ->
+    {reply, ignored, StateName, State, Timeout}.
+
+handle_info({'DOWN', _MRef, _Type, Flow, _Info}, _StateName, #state{flow=Flow}=State) ->
+    {stop, normal, State};
+handle_info(timeout, executing, #state{cb_timeout=true, mod=Mod, modstate=ModState}=State) ->
+    handle_callback(Mod:handle_timeout(ModState), State#state{cb_timeout=false});
+handle_info(timeout, executing, #state{flow=Flow}=State) ->
+    luke_phases:send_flow_results(Flow, {error, timeout}),
+    luke_phases:send_flow_complete(Flow),
+    {stop, normal, State};
+handle_info(Info, _StateName, #state{mod=PhaseMod, modstate=ModState}=State) ->
+    handle_callback(PhaseMod:handle_info(Info, ModState), State).
+
+terminate(Reason, _StateName, #state{mod=PhaseMod, modstate=ModState}) ->
+    PhaseMod:terminate(Reason, ModState),
+    ok.
+
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%% Internal functions
+handle_callback({no_output, NewModState}, #state{timeout=Timeout}=State) ->
+    {next_state, executing, State#state{modstate=NewModState}, Timeout};
+handle_callback({no_output, NewModState, TempTimeout}, #state{timeout=Timeout}=State) when TempTimeout < Timeout ->
+    io:format("TempTimeout: ~p~n", [TempTimeout]),
+    {next_state, executing, State#state{modstate=NewModState, cb_timeout=true}, TempTimeout};
+handle_callback({output, Output, NewModState}, #state{timeout=Timeout}=State) ->
+    route_output(Output, State),
+    {next_state, executing, State#state{modstate=NewModState}, Timeout};
+handle_callback({output, Output, NewModState, TempTimeout}, #state{timeout=Timeout}=State) when TempTimeout < Timeout ->
+    io:format("TempTimeout: ~p~n", [TempTimeout]),
+    route_output(Output, State),
+    {next_state, executing, State#state{modstate=NewModState, cb_timeout=true}, TempTimeout};
+handle_callback({stop, Reason, NewModState}, State) ->
+    {stop, Reason, State#state{modstate=NewModState}}.
+
+route_output(Output, #state{next_phase=Next, flow=Flow, accumulates=Accumulates}) ->
+    propagate_inputs(Next, Output),
+    if
+        Accumulates =:= true ->
+            luke_phases:send_flow_results(Flow, Output);
+        true ->
+            ok
+    end.
+
+propagate_inputs(undefined, _Results) ->
+    ok;
+propagate_inputs(Next, Results) ->
+    luke_phases:send_inputs(Next, Results).

apps/luke/src/luke_phase_sup.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke_phase_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0, new_phase/6]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+new_phase(PhaseMod, Accumulates, NextPhase, Flow, Timeout, PhaseArgs) when is_atom(PhaseMod),
+                                                                           is_list(PhaseArgs) ->
+    start_child(PhaseMod, [Accumulates, NextPhase, Flow, Timeout, PhaseArgs]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    SupFlags = {simple_one_for_one, 0, 1},
+    Process = {undefined,
+               {luke_phase, start_link, []},
+               temporary, brutal_kill, worker, dynamic},
+    {ok, {SupFlags, [Process]}}.
+
+%% Internal functions
+start_child(ModName, Args) ->
+  supervisor:start_child(?MODULE, [ModName|Args]).

apps/luke/src/luke_phases.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke_phases).
+
+-export([send_inputs/2, send_inputs_done/1, send_flow_complete/1]).
+-export([send_phase_complete/1, send_flow_results/2]).
+
+send_inputs(PhasePid, Inputs) ->
+    gen_fsm:send_event(PhasePid, {inputs, Inputs}).
+
+send_inputs_done(PhasePid) ->
+    gen_fsm:send_event(PhasePid, inputs_done).
+
+send_phase_complete(PhasePid) ->
+    gen_fsm:send_event(PhasePid, phase_complete).
+
+send_flow_complete(FlowPid) ->
+    gen_fsm:send_event(FlowPid, {results, done}).
+
+send_flow_results(FlowPid, Results) ->
+    gen_fsm:send_event(FlowPid, {results, Results}).

apps/luke/src/luke_sup.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+init([]) ->
+    RestartStrategy = one_for_one,
+    MaxRestarts = 1000,
+    MaxSecondsBetweenRestarts = 3600,
+
+    SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
+
+    Restart = permanent,
+    Shutdown = infinity,
+    Type = supervisor,
+
+    PhaseSup = {luke_phase_sup, {luke_phase_sup, start_link, []},
+                Restart, Shutdown, Type, [luke_phase_sup]},
+    FlowSup = {luke_flow_sup, {luke_flow_sup, start_link, []},
+                Restart, Shutdown, Type, [luke_flow_sup]},
+
+    {ok, {SupFlags, [FlowSup, PhaseSup]}}.

apps/luke/tests/Emakefile

+{"*", [warn_obsolete_guard, warn_unused_import,
+       warn_shadow_vars, warn_export_vars, debug_info,
+       {i, "../include"},
+       {pa, "../ebin"},
+       {outdir, "../tests_ebin"}]}.

apps/luke/tests/async_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(async_phase).
+
+-behaviour(luke_phase).
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, State, _Timeout) ->
+    {output, Inputs, State}.
+
+handle_input_done(State) ->
+    luke_phase:complete(),
+    {no_output, State}.
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.

apps/luke/tests/lifecycle_tests.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(lifecycle_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+-include("tests.hrl").
+
+setup_teardown_test_() ->
+    [fun() ->
+             %% Startup/teardown, no input
+             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             exit(Pid, shutdown),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             %% Startup/teardown, input w/no end
+             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             luke_flow:add_inputs(Pid, [100]),
+             exit(Pid, shutdown),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             %% Startup/teardown, input w/finish
+             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             luke_flow:add_inputs(Pid, [100]),
+             luke_flow:add_inputs(Pid, [200]),
+             luke_flow:finish_inputs(Pid),
+             exit(Pid, shutdown),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end].

apps/luke/tests/map_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(map_phase).
+
+-behaviour(luke_phase).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {workers=[], done=false}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, #state{workers=Workers}=State, _Timeout) ->
+    Worker = worker(self(), length(Inputs)),
+    {no_output, State#state{workers=[Worker|Workers]}}.
+
+handle_input_done(#state{workers=[]}=State) ->
+    luke_phase:complete(),
+    {no_output, State};
+handle_input_done(State) ->
+    {no_output, State#state{done=true}}.
+
+handle_event({mapexec_results, Worker, Data}, #state{done=Done, workers=Workers0}=State) ->
+    Workers = lists:delete(Worker, Workers0),
+    case Done =:= true andalso length(Workers) == 0 of
+        true ->
+            luke_phase:complete();
+        false ->
+            ok
+    end,
+    {output, Data, State#state{workers=Workers}};
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+worker(Phase, Size) ->
+    spawn(fun() ->
+                  Data = generate_data(Size),
+                  gen_fsm:send_event(Phase, {mapexec_results, self(), Data}) end).
+
+generate_data(Size) ->
+    generate_data(Size, []).
+
+generate_data(0, Accum) ->
+    Accum;
+generate_data(Size, Accum) ->
+    generate_data(Size - 1, [5|Accum]).

apps/luke/tests/results_tests.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(results_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+-include("tests.hrl").
+
+all_test_() ->
+    [fun() ->
+             FlowId = make_ref(),
+             {ok, Pid} = luke:new_flow(FlowId, ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             luke_flow:add_inputs(Pid, ["hello"]),
+             test_util:verify_results(FlowId, none),
+             exit(Pid, shutdown),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             FlowId = make_ref(),
+             {ok, Pid} = luke:new_flow(FlowId, ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             luke_flow:add_inputs(Pid, ["hello"]),
+             luke_flow:finish_inputs(Pid),
+             {ok, Results} = test_util:verify_results(FlowId, results),
+             test_util:verify_results(FlowId, done),
+             ?assertMatch(["hello"], Results),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             FlowId = make_ref(),
+             {ok, Pid} = luke:new_flow(FlowId, ?TWO_ASYNC_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             luke_flow:add_inputs(Pid, "testing"),
+             {ok, "testing"} = test_util:verify_results(FlowId, results),
+             luke_flow:add_inputs(Pid, [100, 200]),
+             {ok, [100, 200]} = test_util:verify_results(FlowId, results),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             FlowId = make_ref(),
+             {ok, Pid} = luke:new_flow(FlowId, ?MAP_FLOW),
+             Phases = test_util:verify_phases(Pid, 1),
+             luke_flow:add_inputs(Pid, [a,b]),
+             {ok, [5,5]} = test_util:verify_results(FlowId, results),
+             test_util:verify_results(FlowId, none),
+             luke_flow:add_inputs(Pid, [a,b]),
+             {ok, [5,5]} = test_util:verify_results(FlowId, results),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             FlowId = make_ref(),
+             {ok, Pid} = luke:new_flow(FlowId, ?MAP_FLOW),
+             Phases = test_util:verify_phases(Pid, 1),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             ?assertMatch({ok, [5,5,5,5]}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             FlowId = make_ref(),
+             {ok, Pid} = luke:new_flow(FlowId, ?MAP_DBL_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             ?assertMatch({ok, [5,5,5,5,5,5,5,5]}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end].

apps/luke/tests/simple_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(simple_phase).
+
+-behaviour(luke_phase).
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {inputs=[]}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, #state{inputs=Inputs0}=State, _Timeout) ->
+    {no_output, State#state{inputs=Inputs0 ++ Inputs}}.
+
+handle_input_done(State) ->
+    luke_phase:complete(),
+    {output, State#state.inputs, State}.
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.

apps/luke/tests/test_suite.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(test_suite).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all_test_() ->
+    [{module, lifecycle_tests},
+     {module, results_tests}].

apps/luke/tests/test_util.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(test_util).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([verify_phases/2, verify_results/2, assertDead/1]).
+
+verify_phases(Pid, Size) ->
+    Phases = luke_flow:get_phases(Pid),
+    ?assertEqual(Size, length(Phases)),
+    Phases.
+
+verify_results(FlowId, none) ->
+    receive
+        {flow_results, FlowId, done} ->
+            throw({error, unexpected_done});
+        {flow_results, FlowId, Results} ->
+            throw({error, unexpected_results, Results})
+    after 100 ->
+            ok
+    end;
+verify_results(FlowId, results) ->
+    receive
+        {flow_results, FlowId, done} ->
+            throw({error, unexpected_done});
+        {flow_results, FlowId, Results} ->
+            {ok, Results}
+    after 100 ->
+            throw({error, no_results})
+    end;
+verify_results(FlowId, done) ->
+    receive
+        {flow_results, FlowId, done} ->
+            ok;
+        {flow_results, FlowId, Results} ->
+            throw({error, unexpected_results, Results})
+    after 100 ->
+            throw({error, no_results})
+    end.
+
+assertDead(Pids)->
+    timer:sleep(25),
+    assertDead0(Pids).
+
+assertDead0([]) ->
+    ok;
+assertDead0([H|T]) ->
+    ?assertMatch(false, erlang:is_process_alive(H)),
+    assertDead0(T).

apps/luke/tests/tests.hrl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-define(TWO_PHASE_FLOW, [{simple_phase, false, []},
+                         {simple_phase, true, []}]).
+
+-define(TWO_ASYNC_FLOW, [{async_phase, false, []},
+                         {async_phase, true, []}]).
+
+-define(MAP_FLOW, [{map_phase, true, []}]).
+-define(MAP_DBL_FLOW, [{map_phase, true, []},
+                       {map_phase, true, []}]).
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.