Commits

Anonymous committed e50eb5a

initial import

  • Participants

Comments (0)

Files changed (21)

+all:
+	./rebar compile
+
+clean:
+	rm -rf tests_ebin docs
+	./rebar clean
+
+test: all
+	@mkdir -p tests_ebin
+	@cd tests;erl -make
+	@erl -noshell -boot start_sasl -pa ebin -pa tests_ebin -s luke -eval 'test_suite:test().' -s init stop
+	@rm -f ebin/test_* ebin/*_tests.erl
+
+#docs: all
+#	@mkdir -p docs
+#	@./build_docs.sh

File ebin/luke.app

+% -*- mode: erlang -*-
+{application, luke,
+ [{description,  "Map/Reduce Framework"},
+  {vsn,          "0.2.0"},
+  {modules,      [luke, luke_flow, luke_flow_sup, luke_phase, luke_phase_sup, luke_phases, luke_sup]},
+  {registered,   [luke_flow_sup, luke_phase_sup, luke_sup]},
+  {applications, [kernel, stdlib, sasl]},
+  {mod, {luke, []}}]}.

File rebar

Binary file added.

File rebar.config

+{erl_opts, [debug_info, fail_on_warning]}.

File src/luke.erl

+%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
+
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+%% @doc The main entry point for Luke. This module is responsible
+%%      for starting Luke as an OTP application and also
+%%      running new process flows.
+
+-module(luke).
+
+-behaviour(application).
+
+-define(DEFAULT_TIMEOUT, 60000).
+
+%% Application callbacks
+-export([start/0,
+         start/2,
+         stop/1]).
+
+%% Public API
+-export([new_flow/2,
+         new_flow/3,
+         new_flow/4,
+         new_flow/5,
+         new_flow/6]).
+
+new_flow(FlowId, FlowDesc) ->
+    new_flow(self(), FlowId, FlowDesc, ?DEFAULT_TIMEOUT).
+
+new_flow(Client, FlowId, FlowDesc) ->
+    new_flow(Client, FlowId, FlowDesc, ?DEFAULT_TIMEOUT).
+
+new_flow(Client, FlowId, FlowDesc, Timeout) ->
+    luke_flow_sup:new_flow(Client, FlowId, FlowDesc, Timeout).
+
+new_flow(Node, Client, FlowId, FlowDesc, Timeout) ->
+    new_flow(Node, Client, FlowId, FlowDesc, undefined, Timeout).
+
+new_flow(Node, Client, FlowId, FlowDesc, ResultTransformer, Timeout) ->
+    luke_flow_sup:new_flow(Node, Client, FlowId, FlowDesc, ResultTransformer, Timeout).
+
+
+start() ->
+    application:start(luke).
+
+start(_StartType, _StartArgs) ->
+    luke_sup:start_link().
+
+stop(_State) ->
+    ok.

File src/luke_flow.erl

+%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
+
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+%% @doc Manages the execution of a flow
+-module(luke_flow).
+
+-behaviour(gen_fsm).
+
+%% API
+-export([start_link/5,
+         add_inputs/2,
+         finish_inputs/1,
+         collect_output/2]).
+
+%% FSM states
+-export([get_phases/1,
+         executing/2,
+         executing/3]).
+
+%% gen_fsm callbacks
+-export([init/1,
+         handle_event/3,
+         handle_sync_event/4,
+         handle_info/3,
+         terminate/3,
+         code_change/4]).
+
+-record(state, {flow_id,
+                fsms,
+                client,
+                flow_timeout,
+                tref,
+                xformer,
+                results=[]}).
+
+%% @doc Add inputs to the flow. Inputs will be sent to the
+%%      first phase
+%% @spec add_inputs(pid(), any()) -> ok
+add_inputs(FlowPid, Inputs) ->
+    gen_fsm:send_event(FlowPid, {inputs, Inputs}).
+
+%% @doc Informs the phases all inputs are complete.
+%% @spec finish_inputs(pid()) -> ok
+finish_inputs(FlowPid) ->
+    gen_fsm:send_event(FlowPid, inputs_done).
+
+%% @doc Collects flow output. This function will block
+%%      until the flow completes or exceeds the flow_timeout.
+%% @spec collect_output(any(), integer()) -> [any()] | {error, any()}
+collect_output(FlowId, Timeout) ->
+    collect_output(FlowId, Timeout, dict:new()).
+
+%% @doc Returns the pids for each phase. Intended for
+%%      testing only
+%% @spec get_phases(pid()) -> [pid()]
+get_phases(FlowPid) ->
+    gen_fsm:sync_send_event(FlowPid, get_phases).
+
+start_link(Client, FlowId, FlowDesc, FlowTransformer, Timeout) when is_list(FlowDesc),
+                                                                    is_pid(Client) ->
+    gen_fsm:start_link(?MODULE, [Client, FlowId, FlowDesc, FlowTransformer, Timeout], []).
+
+init([Client, FlowId, FlowDesc, FlowTransformer, Timeout]) ->
+    process_flag(trap_exit, true),
+    Tref = case Timeout of
+               infinity ->
+                   undefined;
+               _ ->
+                   {ok, T} = timer:send_after(Timeout, flow_timeout),
+                   T
+           end,
+    case start_phases(FlowDesc, Timeout) of
+        {ok, FSMs} ->
+            {ok, executing, #state{fsms=FSMs, flow_id=FlowId, flow_timeout=Timeout, client=Client, xformer=FlowTransformer, tref=Tref}};
+        Error ->
+            {stop, Error}
+    end.
+
+executing({inputs, Inputs}, #state{fsms=[H|_]}=State) ->
+    luke_phases:send_inputs(H, Inputs),
+    {next_state, executing, State};
+executing(inputs_done, #state{fsms=[H|_]}=State) ->
+    luke_phases:send_inputs_done(H),
+    {next_state, executing, State};
+executing(timeout, #state{client=Client, flow_id=FlowId}=State) ->
+    Client ! {flow_results, FlowId, done},
+    {stop, normal, State};
+executing({results, done}, #state{client=Client, flow_id=FlowId}=State) ->
+    Client ! {flow_results, FlowId, done},
+    {stop, normal, State};
+executing({results, PhaseId, Result0}, #state{client=Client, flow_id=FlowId, xformer=XFormer}=State) ->
+    Result = transform_results(XFormer, Result0),
+    Client ! {flow_results, PhaseId, FlowId, Result},
+    {next_state, executing, State}.
+
+executing(get_phases, _From, #state{fsms=FSMs}=State) ->
+    {reply, FSMs, executing, State}.
+
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+handle_sync_event(_Event, _From, StateName, State) ->
+    {reply, ignored, StateName, State}.
+
+handle_info(flow_timeout, _StateName, State) ->
+    {stop, flow_timeout, State};
+handle_info({'EXIT', _Pid, normal}, StateName, State) ->
+    {next_state, StateName, State};
+handle_info({'EXIT', _Pid, Reason}, _StateName, #state{flow_id=FlowId, client=Client}=State) ->
+    Client ! {flow_error, FlowId, Reason},
+    {stop, normal, State};
+handle_info(_Info, StateName, State) ->
+    {next_state, StateName, State}.
+
+terminate(_Reason, _StateName, #state{tref=Tref}) ->
+    timer:cancel(Tref),
+    ok.
+
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%% Internal functions
+start_phases(FlowDesc, Timeout) ->
+    start_phases(lists:reverse(FlowDesc), length(FlowDesc) - 1, Timeout, []).
+
+start_phases([], _Id, _Timeout, Accum) ->
+    {ok, Accum};
+start_phases([{PhaseMod, Behaviors, Args}|T], Id, Timeout, Accum) ->
+    NextFSM = next_fsm(Accum),
+    case proplists:get_value(converge, Behaviors) of
+        undefined ->
+            case luke_phase_sup:new_phase(Id, PhaseMod, Behaviors, NextFSM, self(), Timeout, Args) of
+                {ok, Pid} ->
+                    erlang:link(Pid),
+                    start_phases(T, Id - 1, Timeout, [Pid|Accum]);
+                Error ->
+                    Error
+            end;
+        InstanceCount ->
+            Pids = start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, self(), Timeout, Args, InstanceCount),
+            start_phases(T, Id - 1, Timeout, [Pids|Accum])
+    end.
+
+collect_output(FlowId, Timeout, Accum) ->
+    receive
+        {flow_results, FlowId, done} ->
+            {ok, finalize_results(Accum)};
+        {flow_results, PhaseId, FlowId, Results} ->
+            collect_output(FlowId, Timeout, accumulate_results(PhaseId, Results, Accum));
+        {flow_error, FlowId, Error} ->
+            Error
+    after Timeout ->
+            case dict:size(Accum) of
+                0 ->
+                    {error, timeout};
+                _ ->
+                    {ok, finalize_results(Accum)}
+            end
+    end.
+
+next_fsm(Accum) ->
+ if
+     length(Accum) == 0 ->
+         undefined;
+     true ->
+         case hd(Accum) of
+             P when is_pid(P) ->
+                 [P];
+             P ->
+                 P
+         end
+ end.
+
+start_converging_phases(Id, PhaseMod, Behaviors0, NextFSM, Flow, Timeout, Args, Count) ->
+    Behaviors = [normalize_behavior(B) || B <- Behaviors0],
+    Pids = start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count, []),
+    [Leader|_] = Pids,
+    lists:foreach(fun(P) -> luke_phase:partners(P, Leader, Pids) end, Pids),
+    Pids.
+
+start_converging_phases(_Id, _PhaseMod, _Behaviors, _NextFSM, _Flow, _Timeout, _Args, 0, Accum) ->
+    Accum;
+start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count, Accum) ->
+    case luke_phase_sup:new_phase(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args) of
+        {ok, Pid} ->
+            erlang:link(Pid),
+            start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count - 1, [Pid|Accum]);
+        Error ->
+            throw(Error)
+    end.
+
+normalize_behavior({converge, _}) ->
+    converge;
+normalize_behavior(Behavior) ->
+    Behavior.
+
+finalize_results(Accum) ->
+    case [lists:append(R) || {_, R} <- lists:sort(dict:to_list(Accum))] of
+        [R] ->
+            R;
+        R ->
+            R
+    end.
+
+accumulate_results(PhaseId, Results, Accum) ->
+    case dict:find(PhaseId, Accum) of
+        error ->
+            dict:store(PhaseId, [Results], Accum);
+        {ok, PhaseAccum} ->
+            dict:store(PhaseId, [Results|PhaseAccum], Accum)
+    end.
+
+transform_results(undefined, Results) ->
+    Results;
+transform_results(Xformer, Results) when is_list(Results) ->
+    [Xformer(R) || R <- Results];
+transform_results(Xformer, Results) ->
+    Xformer(Results).

File src/luke_flow_sup.erl

+%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
+
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke_flow_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0,
+         new_flow/4,
+         new_flow/6]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+new_flow(Client, FlowId, FlowDesc, Timeout) ->
+    start_child(Client, FlowId, FlowDesc, Timeout).
+
+new_flow(Node, Client, FlowId, FlowDesc, ResultTransformer, Timeout) ->
+    start_child(Node, Client, FlowId, FlowDesc, ResultTransformer, Timeout).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    SupFlags = {simple_one_for_one, 0, 1},
+    Process = {undefined,
+               {luke_flow, start_link, []},
+               temporary, brutal_kill, worker, dynamic},
+    {ok, {SupFlags, [Process]}}.
+
+%% Internal functions
+start_child(Client, FlowId, FlowDesc, Timeout) ->
+    supervisor:start_child(?MODULE, [Client, FlowId, FlowDesc, undefined, Timeout]).
+start_child(Node, Client, FlowId, FlowDesc, ResultTransformer, Timeout) ->
+    supervisor:start_child({?MODULE, Node}, [Client, FlowId, FlowDesc, ResultTransformer, Timeout]).

File src/luke_phase.erl

+%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
+
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke_phase).
+
+-behaviour(gen_fsm).
+
+%% API
+-export([start_link/7,
+         complete/0,
+         partners/3]).
+
+%% Behaviour
+-export([behaviour_info/1]).
+
+%% States
+-export([executing/2]).
+
+%% gen_fsm callbacks
+-export([init/1,
+         handle_event/3,
+         handle_sync_event/4,
+         handle_info/3,
+         terminate/3,
+         code_change/4]).
+
+-record(state, {id,
+                mod,
+                modstate,
+                converge=false,
+                accumulate=false,
+                lead_partner,
+                partners,
+                next_phases,
+                done_count=1,
+                flow,
+                flow_timeout}).
+
+behaviour_info(callbacks) ->
+  [{init, 1},
+   {handle_timeout, 1},
+   {handle_input, 3},
+   {handle_input_done, 1},
+   {handle_event, 2},
+   {handle_info, 2},
+   {terminate, 2}];
+behaviour_info(_) ->
+    undefined.
+
+start_link(PhaseMod, Id, Behaviors, NextPhases, Flow, Timeout, PhaseArgs) ->
+    gen_fsm:start_link(?MODULE, [Id, PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs], []).
+
+complete() ->
+    gen_fsm:send_event(self(), complete).
+
+partners(PhasePid, Leader, Partners) ->
+    gen_fsm:send_event(PhasePid, {partners, Leader, Partners}).
+
+init([Id, PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs]) ->
+    case PhaseMod:init(PhaseArgs) of
+        {ok, ModState} ->
+            Accumulate = lists:member(accumulate, Behaviors),
+            Converge = lists:member(converge, Behaviors),
+            {ok, executing, #state{id=Id, mod=PhaseMod, modstate=ModState, next_phases=NextPhases,
+                                   flow=Flow, accumulate=Accumulate, converge=Converge, flow_timeout=Timeout}};
+        {stop, Reason} ->
+            {stop, Reason}
+    end.
+
+executing({partners, Lead0, Partners0}, #state{converge=true}=State) when is_list(Partners0) ->
+    Me = self(),
+    Lead = case Lead0 of
+               Me ->
+                   undefined;
+               _ ->
+                   erlang:link(Lead0),
+                   Lead0
+           end,
+    Partners = lists:delete(self(), Partners0),
+    DoneCount = if
+                    Lead =:= undefined ->
+                        length(Partners) + 1;
+                    true ->
+                        1
+                end,
+    {next_state, executing, State#state{lead_partner=Lead, partners=Partners, done_count=DoneCount}};
+executing({partners, _, _}, State) ->
+    {stop, {error, no_convergence}, State};
+executing({inputs, Input}, #state{mod=PhaseMod, modstate=ModState, flow_timeout=Timeout}=State) ->
+    handle_callback(PhaseMod:handle_input(Input, ModState, Timeout), State);
+executing(inputs_done, #state{mod=PhaseMod, modstate=ModState, done_count=DoneCount0}=State) ->
+    case DoneCount0 - 1 of
+        0 ->
+            handle_callback(PhaseMod:handle_input_done(ModState), State#state{done_count=0});
+        DoneCount ->
+            {next_state, executing, State#state{done_count=DoneCount}}
+    end;
+executing(complete, #state{lead_partner=Leader}=State) when is_pid(Leader) ->
+    luke_phases:send_inputs_done(Leader),
+    {stop, normal, State};
+executing(complete, #state{flow=Flow, next_phases=Next}=State) ->
+    case Next of
+        undefined ->
+            luke_phases:send_flow_complete(Flow);
+        _ ->
+            luke_phases:send_inputs_done(Next)
+    end,
+    {stop, normal, State};
+executing(timeout, #state{mod=Mod, modstate=ModState}=State) ->
+    handle_callback(Mod:handle_timeout(ModState), State);
+executing(timeout, State) ->
+    {stop, normal, State};
+executing(Event, #state{mod=PhaseMod, modstate=ModState}=State) ->
+    handle_callback(PhaseMod:handle_event(Event, ModState), State).
+
+handle_event(_Event, StateName, State) ->
+    {next_state, StateName, State}.
+
+handle_sync_event(_Event, _From, StateName, State) ->
+    {reply, ignored, StateName, State}.
+handle_info(timeout, executing, #state{mod=Mod, modstate=ModState}=State) ->
+    handle_callback(Mod:handle_timeout(ModState), State);
+handle_info(Info, _StateName, #state{mod=PhaseMod, modstate=ModState}=State) ->
+    handle_callback(PhaseMod:handle_info(Info, ModState), State).
+
+terminate(Reason, _StateName, #state{mod=PhaseMod, modstate=ModState}) ->
+    PhaseMod:terminate(Reason, ModState),
+    ok.
+
+code_change(_OldVsn, StateName, State, _Extra) ->
+    {ok, StateName, State}.
+
+%% Internal functions
+%% Handle callback module return values
+handle_callback({no_output, NewModState}, State) ->
+    {next_state, executing, State#state{modstate=NewModState}};
+handle_callback({no_output, NewModState, PhaseTimeout}, #state{flow_timeout=Timeout}=State) when PhaseTimeout < Timeout ->
+    {next_state, executing, State#state{modstate=NewModState}, PhaseTimeout};
+handle_callback({output, Output, NewModState}, State) ->
+    State1 = route_output(Output, State),
+    {next_state, executing, State1#state{modstate=NewModState}};
+handle_callback({output, Output, NewModState, PhaseTimeout}, #state{flow_timeout=Timeout}=State) when PhaseTimeout < Timeout ->
+    State1 = route_output(Output, State),
+    {next_state, executing, State1#state{modstate=NewModState}, PhaseTimeout};
+handle_callback({stop, Reason, NewModState}, State) ->
+    {stop, Reason, State#state{modstate=NewModState}};
+handle_callback(BadValue, _State) ->
+  throw({error, {bad_return, BadValue}}).
+
+%% Route output to lead when converging
+%% Accumulation is ignored for non-leads of converging phases
+%% since all accumulation is performed in the lead process
+route_output(Output, #state{converge=true, lead_partner=Lead}=State) when is_pid(Lead) ->
+    propagate_inputs([Lead], Output),
+    State;
+
+%% Send output to flow for accumulation and propagate as inputs
+%% to the next phase. Accumulation is only true for the lead
+%% process of a converging phase
+route_output(Output, #state{id=Id, converge=true, accumulate=Accumulate, lead_partner=undefined,
+                            flow=Flow, next_phases=Next}=State) ->
+    if
+        Accumulate =:= true ->
+            luke_phases:send_flow_results(Flow, Id, Output);
+        true ->
+            ok
+    end,
+    RotatedNext = propagate_inputs(Next, Output),
+    State#state{next_phases=RotatedNext};
+
+%% Route output to the next phase. Accumulate output
+%% to the flow if accumulation is turned on.
+route_output(Output, #state{id=Id, converge=false, accumulate=Accumulate, flow=Flow, next_phases=Next} = State) ->
+    if
+        Accumulate =:= true ->
+            luke_phases:send_flow_results(Flow, Id, Output);
+        true ->
+            ok
+    end,
+    RotatedNext = propagate_inputs(Next, Output),
+    State#state{next_phases=RotatedNext}.
+
+propagate_inputs(undefined, _Results) ->
+    undefined;
+propagate_inputs(Next, Results) ->
+    luke_phases:send_inputs(Next, Results).

File src/luke_phase_sup.erl

+%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
+
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke_phase_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0,
+         new_phase/7]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+new_phase(Id, PhaseMod, Behavior, NextPhases, Flow, Timeout, PhaseArgs) when is_atom(PhaseMod),
+                                                                             is_list(PhaseArgs) ->
+    start_child(PhaseMod, [Id, Behavior, NextPhases, Flow, Timeout, PhaseArgs]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    SupFlags = {simple_one_for_one, 0, 1},
+    Process = {undefined,
+               {luke_phase, start_link, []},
+               temporary, brutal_kill, worker, dynamic},
+    {ok, {SupFlags, [Process]}}.
+
+%% Internal functions
+start_child(ModName, Args) ->
+  supervisor:start_child(?MODULE, [ModName|Args]).

File src/luke_phases.erl

+%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
+
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+%% @doc Encapsulates the messaging protocol used during flow processing.
+%%      Some of these functions operate on either a single
+%%      phase pid or a list of phase pids.
+%%
+%%      Multiple pids are encountered when a phase has parallel
+%%      instances running. This is triggerd by the 'converge'
+%%      phase behavior.
+-module(luke_phases).
+
+-export([send_inputs/2,
+         send_inputs_done/1,
+         send_flow_complete/1]).
+-export([send_flow_results/3]).
+
+%% @doc Sends inputs to a phase process
+%%      If a phase has multiple processes, inputs
+%%      will be distributed in a round robin fashion.
+%% @spec send_inputs(pid() | [pid()], any()) -> ok
+send_inputs(PhasePids, Inputs) when is_list(PhasePids) ->
+    [H|T] = PhasePids,
+    send_inputs(H, Inputs),
+    T ++ [H];
+send_inputs(PhasePid, Inputs) when is_pid(PhasePid) ->
+    gen_fsm:send_event(PhasePid, {inputs, Inputs}).
+
+%% @doc Signals completion of inputs to a phase
+%%      or a list of phases.
+send_inputs_done(PhasePids) when is_list(PhasePids) ->
+    lists:foreach(fun(Pid) -> send_inputs_done(Pid) end, PhasePids);
+send_inputs_done(PhasePid) when is_pid(PhasePid) ->
+    gen_fsm:send_event(PhasePid, inputs_done).
+
+%% @doc Signal completion of flow to the flow pid
+%%      This is sent by the last process of the last
+%%      phase in the flow
+%% @spec send_flow_complete(pid()) -> ok
+send_flow_complete(FlowPid) ->
+    gen_fsm:send_event(FlowPid, {results, done}).
+
+%% @doc Sends flow results to the flow pid
+%%      This is sent by phases which are configured
+%%      to accumulate their results
+%% @spec send_flow_results(pid(), any(), any()) -> ok
+send_flow_results(FlowPid, Id, Results) ->
+    gen_fsm:send_event(FlowPid, {results, Id, Results}).

File src/luke_sup.erl

+%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
+
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(luke_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+init([]) ->
+    RestartStrategy = one_for_one,
+    MaxRestarts = 1000,
+    MaxSecondsBetweenRestarts = 3600,
+
+    SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
+
+    Restart = permanent,
+    Shutdown = infinity,
+    Type = supervisor,
+
+    PhaseSup = {luke_phase_sup, {luke_phase_sup, start_link, []},
+                Restart, Shutdown, Type, [luke_phase_sup]},
+    FlowSup = {luke_flow_sup, {luke_flow_sup, start_link, []},
+                Restart, Shutdown, Type, [luke_flow_sup]},
+
+    {ok, {SupFlags, [FlowSup, PhaseSup]}}.

File tests/Emakefile

+{"*", [warn_obsolete_guard, warn_unused_import,
+       warn_shadow_vars, warn_export_vars, debug_info,
+       {i, "../include"},
+       {pa, "../ebin"},
+       {outdir, "../tests_ebin"}]}.

File tests/async_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(async_phase).
+
+-behaviour(luke_phase).
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, State, _Timeout) ->
+    {output, Inputs, State}.
+
+handle_input_done(State) ->
+    luke_phase:complete(),
+    {no_output, State}.
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.

File tests/lifecycle_tests.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(lifecycle_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+-include("tests.hrl").
+
+setup_teardown_test_() ->
+    [fun() ->
+             %% Startup/teardown, no input
+             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             exit(Pid, kill),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             %% Startup/teardown, infinity timeout, no input
+             {ok, Pid} = luke:new_flow(self(), make_ref(), ?TWO_PHASE_FLOW, infinity),
+             Phases = test_util:verify_phases(Pid, 2),
+             exit(Pid, kill),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             %% Startup/teardown, input w/no end
+             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             luke_flow:add_inputs(Pid, [100]),
+             exit(Pid, kill),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             %% Startup/teardown, input w/finish
+             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
+             Phases = test_util:verify_phases(Pid, 2),
+             luke_flow:add_inputs(Pid, [100]),
+             luke_flow:add_inputs(Pid, [200]),
+             luke_flow:finish_inputs(Pid),
+             exit(Pid, kill),
+             timer:sleep(10),
+             test_util:assertDead([Pid|Phases]) end].

File tests/map_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(map_phase).
+
+-behaviour(luke_phase).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {workers=[], done=false}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, #state{workers=Workers}=State, _Timeout) ->
+    Worker = worker(self(), length(Inputs)),
+    {no_output, State#state{workers=[Worker|Workers]}}.
+
+handle_input_done(#state{workers=[]}=State) ->
+    luke_phase:complete(),
+    {no_output, State};
+handle_input_done(State) ->
+    {no_output, State#state{done=true}}.
+
+handle_event({mapexec_results, Worker, Data}, #state{done=Done, workers=Workers0}=State) ->
+    Workers = lists:delete(Worker, Workers0),
+    case Done =:= true andalso length(Workers) == 0 of
+        true ->
+            luke_phase:complete();
+        false ->
+            ok
+    end,
+    {output, Data, State#state{workers=Workers}};
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+worker(Phase, Size) ->
+    spawn(fun() ->
+                  Data = generate_data(Size),
+                  gen_fsm:send_event(Phase, {mapexec_results, self(), Data}) end).
+
+generate_data(Size) ->
+    generate_data(Size, []).
+
+generate_data(0, Accum) ->
+    Accum;
+generate_data(Size, Accum) ->
+    generate_data(Size - 1, [5|Accum]).

File tests/reduce_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(reduce_phase).
+
+-behaviour(luke_phase).
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {reduced=0}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, #state{reduced=Reduced}=State, _Timeout) ->
+    {no_output, State#state{reduced=lists:sum(Inputs ++ [Reduced])}}.
+
+handle_input_done(#state{reduced=Reduced}=State) ->
+    luke_phase:complete(),
+    {output, [Reduced], State}.
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.

File tests/results_tests.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(results_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+-include("tests.hrl").
+
+all_test_() ->
+    [fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_PHASE_FLOW),
+             luke_flow:add_inputs(Pid, ["hello"]),
+             test_util:verify_results(FlowId, none),
+             exit(Pid, kill),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_PHASE_FLOW),
+             luke_flow:add_inputs(Pid, ["hello"]),
+             luke_flow:finish_inputs(Pid),
+             {ok, Results} = test_util:verify_results(FlowId, results),
+             test_util:verify_results(FlowId, done),
+             ?assertMatch(["hello"], Results),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?THREE_PHASE_NESTED_FLOW),
+             luke_flow:add_inputs(Pid, ["hello", "goodbye"]),
+             luke_flow:finish_inputs(Pid),
+             ?assertMatch({ok, [["hello", "goodbye"], ["hello", "goodbye"], ["hello", "goodbye"]]}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             test_util:assertDead([Pid|Phases]) end,
+
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_ASYNC_FLOW),
+             luke_flow:add_inputs(Pid, "testing"),
+             {ok, "testing"} = test_util:verify_results(FlowId, results),
+             luke_flow:add_inputs(Pid, [100, 200]),
+             {ok, [100, 200]} = test_util:verify_results(FlowId, results),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_FLOW),
+             luke_flow:add_inputs(Pid, [a,b]),
+             {ok, [5,5]} = test_util:verify_results(FlowId, results),
+             test_util:verify_results(FlowId, none),
+             luke_flow:add_inputs(Pid, [a,b]),
+             {ok, [5,5]} = test_util:verify_results(FlowId, results),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_FLOW),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             ?assertMatch({ok, [5,5,5,5]}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_DBL_FLOW),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             ?assertMatch({ok, [[5,5,5,5],[5,5,5,5]]}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             luke_flow:finish_inputs(Pid),
+             test_util:verify_results(FlowId, done),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_FLOW),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:finish_inputs(Pid),
+             ?assertMatch({ok, [20]}, luke_flow:collect_output(FlowId, 500)),
+             test_util:verify_results(FlowId, none),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_FLOW1),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:finish_inputs(Pid),
+             ?assertMatch({ok, [20]}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             test_util:assertDead([Pid|Phases]) end,
+     fun() ->
+             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_EMPTY),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:add_inputs(Pid, [a,b]),
+             luke_flow:finish_inputs(Pid),
+             ?assertMatch({ok, []}, luke_flow:collect_output(FlowId, 100)),
+             test_util:verify_results(FlowId, none),
+             test_util:assertDead([Pid|Phases]) end].

File tests/simple_phase.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(simple_phase).
+
+-behaviour(luke_phase).
+
+-export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
+         handle_timeout/1, handle_info/2, terminate/2]).
+
+-record(state, {inputs=[]}).
+
+init([]) ->
+    {ok, #state{}}.
+
+handle_input(Inputs, #state{inputs=Inputs0}=State, _Timeout) ->
+    {no_output, State#state{inputs=Inputs0 ++ Inputs}}.
+
+handle_input_done(State) ->
+    luke_phase:complete(),
+    {output, State#state.inputs, State}.
+
+handle_event(_Event, State) ->
+    {no_output, State}.
+
+handle_timeout(State) ->
+    {no_output, State}.
+
+handle_info(_Info, State) ->
+    {no_output, State}.
+
+terminate(_Reason, _State) ->
+    ok.

File tests/test_suite.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(test_suite).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all_test_() ->
+    [{module, lifecycle_tests},
+     {module, results_tests}].

File tests/test_util.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(test_util).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([start_flow/1, verify_phases/2, verify_results/2, assertDead/1]).
+
+start_flow(FlowDesc) ->
+    FlowId = make_ref(),
+    {ok, Pid} = luke:new_flow(FlowId, FlowDesc),
+    Phases = test_util:verify_phases(Pid, length(FlowDesc)),
+    {FlowId, Pid, Phases}.
+
+
+verify_phases(Pid, Size) ->
+    Phases = luke_flow:get_phases(Pid),
+    ?assertEqual(Size, length(Phases)),
+    Phases.
+
+verify_results(FlowId, none) ->
+    receive
+        {flow_results, FlowId, done} ->
+            throw({error, unexpected_done});
+        {flow_results, _PhaseId, FlowId, Results} ->
+            throw({error, unexpected_results, Results})
+    after 100 ->
+            ok
+    end;
+verify_results(FlowId, results) ->
+    receive
+        {flow_results, FlowId, done} ->
+            throw({error, unexpected_done});
+        {flow_results, _PhaseId, FlowId, Results} ->
+            {ok, Results}
+    after 100 ->
+            throw({error, no_results})
+    end;
+verify_results(FlowId, done) ->
+    receive
+        {flow_results, FlowId, done} ->
+            ok;
+        {flow_results, _PhaseId, FlowId, Results} ->
+            throw({error, unexpected_results, Results})
+    after 100 ->
+            throw({error, no_results})
+    end.
+
+assertDead(Pids)->
+    timer:sleep(25),
+    assertDead0(Pids).
+
+assertDead0([]) ->
+    ok;
+assertDead0([H|T]) when is_list(H) ->
+    ok = assertDead0(H),
+    assertDead0(T);
+assertDead0([H|T]) when is_pid(H) ->
+    ?assertMatch(false, erlang:is_process_alive(H)),
+    assertDead0(T).

File tests/tests.hrl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-define(TWO_PHASE_FLOW, [{simple_phase, [], []},
+                         {simple_phase, [accumulate], []}]).
+
+-define(THREE_PHASE_NESTED_FLOW, [{nested_phase, [accumulate], []},
+                                  {simple_phase, [accumulate], []},
+                                  {simple_phase, [accumulate], []}]).
+
+
+-define(TWO_ASYNC_FLOW, [{async_phase, [], []},
+                         {async_phase, [accumulate], []}]).
+
+-define(MAP_FLOW, [{map_phase, [accumulate], []}]).
+
+-define(MAP_DBL_FLOW, [{map_phase, [accumulate], []},
+                       {map_phase, [accumulate], []}]).
+
+-define(MAPRED_FLOW, [{map_phase, [], []},
+                      {reduce_phase, [{converge, 3}], []},
+                      {reduce_phase, [accumulate], []}]).
+
+-define(MAPRED_FLOW1, [{map_phase, [], []},
+                       {reduce_phase, [{converge, 1}, accumulate], []}]).
+
+-define(MAPRED_EMPTY, [{map_phase, [], []},
+                       {reduce_phase, [{converge, 1}], []}]).