Commits

kevsmith  committed 7ca119d

Converted adding inputs to a blocking call (for flow only);Added per-flow cache

  • Participants
  • Parent commits e50eb5a

Comments (0)

Files changed (3)

File src/luke_flow.erl

                 flow_timeout,
                 tref,
                 xformer,
+                cache=orddict:new(),
                 results=[]}).
 
 %% @doc Add inputs to the flow. Inputs will be sent to the
 %%      first phase
 %% @spec add_inputs(pid(), any()) -> ok
 add_inputs(FlowPid, Inputs) ->
-    gen_fsm:send_event(FlowPid, {inputs, Inputs}).
+    gen_fsm:sync_send_event(FlowPid, {inputs, Inputs}).
 
 %% @doc Informs the phases all inputs are complete.
 %% @spec finish_inputs(pid()) -> ok
 collect_output(FlowId, Timeout) ->
     collect_output(FlowId, Timeout, dict:new()).
 
+%% @doc Cache value for the duration of the flow
+%% @spec cache_value(pid(), term(), term()) -> ok
+cache_value(FlowPid, Key, Value) ->
+    gen_fsm:sync_send_event(FlowPid, {cache_value, Key, Value}).
+
+%% @doc Check flow cache for entry
+%% @spec check_cache(pid(), term()) -> not_found | term()
+check_cache(FlowPid, Key) ->
+    gen_fsm:sync_send_event(FlowPid, {check_cache, Key}).
+
 %% @doc Returns the pids for each phase. Intended for
 %%      testing only
 %% @spec get_phases(pid()) -> [pid()]
             {stop, Error}
     end.
 
-executing({inputs, Inputs}, #state{fsms=[H|_]}=State) ->
-    luke_phases:send_inputs(H, Inputs),
-    {next_state, executing, State};
 executing(inputs_done, #state{fsms=[H|_]}=State) ->
     luke_phases:send_inputs_done(H),
     {next_state, executing, State};
     Client ! {flow_results, PhaseId, FlowId, Result},
     {next_state, executing, State}.
 
+executing({inputs, Inputs}, _From, #state{fsms=[H|_], flow_timeout=Timeout}=State) ->
+    luke_phases:send_sync_inputs(H, Inputs, Timeout),
+    {reply, ok, executing, State};
+
+executing({cache_value, Key, Value}, _From, #state{cache=Cache0}=State) ->
+    Cache = orddict:store(Key, Value, Cache0),
+    {reply, ok, executing, State#state{cache=Cache}};
+executing({check_cache, Key}, _From, #state{cache=Cache}=State) ->
+    Reply = case orddict:is_key(Key, Cache) of
+                false ->
+                    not_found;
+                true ->
+                    oddict:fetch(Key, Cache)
+            end,
+    {reply, Reply, executing, State};
 executing(get_phases, _From, #state{fsms=FSMs}=State) ->
     {reply, FSMs, executing, State}.
 
 handle_sync_event(_Event, _From, StateName, State) ->
     {reply, ignored, StateName, State}.
 
-handle_info(flow_timeout, _StateName, State) ->
-    {stop, flow_timeout, State};
+handle_info(flow_timeout, _StateName, #state{flow_id=FlowId, client=Client}=State) ->
+    Client ! {flow_error, FlowId, {error, timeout}},
+    {stop, normal, State};
 handle_info({'EXIT', _Pid, normal}, StateName, State) ->
     {next_state, StateName, State};
 handle_info({'EXIT', _Pid, Reason}, _StateName, #state{flow_id=FlowId, client=Client}=State) ->

File src/luke_phase.erl

 %% API
 -export([start_link/7,
          complete/0,
-         partners/3]).
+         partners/3,
+         cache_value/2,
+         check_cache/1]).
 
 %% Behaviour
 -export([behaviour_info/1]).
 
 %% States
--export([executing/2]).
+-export([executing/2,
+         executing/3]).
 
 %% gen_fsm callbacks
 -export([init/1,
 partners(PhasePid, Leader, Partners) ->
     gen_fsm:send_event(PhasePid, {partners, Leader, Partners}).
 
+cache_value(Key, Value) ->
+    case erlang:get(luke_flow_pid) of
+        undefined ->
+            {error, no_flow_pid};
+        FlowPid ->
+            luke_flow:cache_value(FlowPid, Key, Value)
+    end.
+
+check_cache(Key) ->
+    case erlang:get(luke_flow_pid) of
+        undefined ->
+            {error, no_flow_pid};
+        FlowPid ->
+            luke_flow:check_cache(FlowPid, Key)
+    end.
+
 init([Id, PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs]) ->
+    erlang:put(luke_flow_pid, Flow),
     case PhaseMod:init(PhaseArgs) of
         {ok, ModState} ->
             Accumulate = lists:member(accumulate, Behaviors),
 executing({partners, _, _}, State) ->
     {stop, {error, no_convergence}, State};
 executing({inputs, Input}, #state{mod=PhaseMod, modstate=ModState, flow_timeout=Timeout}=State) ->
-    handle_callback(PhaseMod:handle_input(Input, ModState, Timeout), State);
+    handle_callback(async, PhaseMod:handle_input(Input, ModState, Timeout), State);
 executing(inputs_done, #state{mod=PhaseMod, modstate=ModState, done_count=DoneCount0}=State) ->
     case DoneCount0 - 1 of
         0 ->
-            handle_callback(PhaseMod:handle_input_done(ModState), State#state{done_count=0});
+            handle_callback(async, PhaseMod:handle_input_done(ModState), State#state{done_count=0});
         DoneCount ->
             {next_state, executing, State#state{done_count=DoneCount}}
     end;
     end,
     {stop, normal, State};
 executing(timeout, #state{mod=Mod, modstate=ModState}=State) ->
-    handle_callback(Mod:handle_timeout(ModState), State);
+    handle_callback(async, Mod:handle_timeout(ModState), State);
 executing(timeout, State) ->
     {stop, normal, State};
 executing(Event, #state{mod=PhaseMod, modstate=ModState}=State) ->
-    handle_callback(PhaseMod:handle_event(Event, ModState), State).
+    handle_callback(async, PhaseMod:handle_event(Event, ModState), State).
+
+executing({inputs, Input}, _From, #state{mod=PhaseMod, modstate=ModState, flow_timeout=Timeout}=State) ->
+    handle_callback(sync, PhaseMod:handle_input(Input, ModState, Timeout), State).
 
 handle_event(_Event, StateName, State) ->
     {next_state, StateName, State}.
 handle_sync_event(_Event, _From, StateName, State) ->
     {reply, ignored, StateName, State}.
 handle_info(timeout, executing, #state{mod=Mod, modstate=ModState}=State) ->
-    handle_callback(Mod:handle_timeout(ModState), State);
+    handle_callback(async, Mod:handle_timeout(ModState), State);
 handle_info(Info, _StateName, #state{mod=PhaseMod, modstate=ModState}=State) ->
-    handle_callback(PhaseMod:handle_info(Info, ModState), State).
+    handle_callback(async, PhaseMod:handle_info(Info, ModState), State).
 
 terminate(Reason, _StateName, #state{mod=PhaseMod, modstate=ModState}) ->
     PhaseMod:terminate(Reason, ModState),
 
 %% Internal functions
 %% Handle callback module return values
-handle_callback({no_output, NewModState}, State) ->
-    {next_state, executing, State#state{modstate=NewModState}};
-handle_callback({no_output, NewModState, PhaseTimeout}, #state{flow_timeout=Timeout}=State) when PhaseTimeout < Timeout ->
-    {next_state, executing, State#state{modstate=NewModState}, PhaseTimeout};
-handle_callback({output, Output, NewModState}, State) ->
+handle_callback(Type, {no_output, NewModState}, State) ->
+    State1 = State#state{modstate=NewModState},
+    case Type of
+        async ->
+            {next_state, executing, State1};
+        sync ->
+            {reply, ok, executing, State1}
+    end;
+handle_callback(Type, {no_output, NewModState, PhaseTimeout},
+                #state{flow_timeout=Timeout}=State) when PhaseTimeout < Timeout ->
+    State1 = State#state{modstate=NewModState},
+    case Type of
+        async ->
+            {next_state, executing, State1, PhaseTimeout};
+        sync ->
+            {reply, ok, executin, State1, PhaseTimeout}
+    end;
+handle_callback(Type, {output, Output, NewModState}, State) ->
     State1 = route_output(Output, State),
-    {next_state, executing, State1#state{modstate=NewModState}};
-handle_callback({output, Output, NewModState, PhaseTimeout}, #state{flow_timeout=Timeout}=State) when PhaseTimeout < Timeout ->
+    State2 = State1#state{modstate=NewModState},
+    case Type of
+        async ->
+            {next_state, executing, State2};
+        sync ->
+            {reply, ok, executing, State2}
+    end;
+handle_callback(Type, {output, Output, NewModState, PhaseTimeout},
+                #state{flow_timeout=Timeout}=State) when PhaseTimeout < Timeout ->
     State1 = route_output(Output, State),
-    {next_state, executing, State1#state{modstate=NewModState}, PhaseTimeout};
-handle_callback({stop, Reason, NewModState}, State) ->
+    State2 = State1#state{modstate=NewModState},
+    case Type of
+        async ->
+            {next_state, executing, State2, PhaseTimeout};
+        sync ->
+            {reply, ok, executing, State2, PhaseTimeout}
+    end;
+handle_callback(_Type, {stop, Reason, NewModState}, State) ->
     {stop, Reason, State#state{modstate=NewModState}};
-handle_callback(BadValue, _State) ->
+handle_callback(_Type, BadValue, _State) ->
   throw({error, {bad_return, BadValue}}).
 
 %% Route output to lead when converging

File src/luke_phases.erl

 -module(luke_phases).
 
 -export([send_inputs/2,
+         send_sync_inputs/3,
          send_inputs_done/1,
          send_flow_complete/1]).
 -export([send_flow_results/3]).
 send_inputs(PhasePid, Inputs) when is_pid(PhasePid) ->
     gen_fsm:send_event(PhasePid, {inputs, Inputs}).
 
+%% @doc Sends inputs to a phase process
+%%      NOTE: This call blocks
+%% @spec send_sync_inputs(pid() | [pid()], any(), integer()) -> ok | timeout
+send_sync_inputs(PhasePids, Inputs, Timeout) when is_list(PhasePids) ->
+    [H|T] = PhasePids,
+    send_sync_inputs(H, Inputs, Timeout),
+    T ++ [H];
+send_sync_inputs(PhasePid, Inputs, Timeout) when is_pid(PhasePid) ->
+    gen_fsm:sync_send_event(PhasePid, {inputs, Inputs}, Timeout).
+
+
 %% @doc Signals completion of inputs to a phase
 %%      or a list of phases.
 send_inputs_done(PhasePids) when is_list(PhasePids) ->