Commits

Anonymous committed b193839

Ripping out two way monitors for links & trapping exits in luke_flow

Comments (0)

Files changed (4)

apps/luke/src/luke_flow.erl

     gen_fsm:start_link(?MODULE, [Client, FlowId, FlowDesc, Timeout], []).
 
 init([Client, FlowId, FlowDesc, Timeout]) ->
+    process_flag(trap_exit, true),
     case start_phases(FlowDesc, Timeout) of
         {ok, FSMs} ->
             {ok, executing, #state{fsms=FSMs, flow_id=FlowId, timeout=Timeout, client=Client}, Timeout};
 handle_sync_event(_Event, _From, StateName, State) ->
     {reply, ignored, StateName, State}.
 
-handle_info({'DOWN', _MRef, _Type, Pid, Reason}, StateName, #state{flow_id=FlowId, client=Client,
-                                                                   fsms=FSMs, timeout=Timeout}=State) ->
-    if
-        Reason =:= normal ->
-            {next_state, StateName, State#state{fsms=lists:delete(Pid, FSMs)}, Timeout};
-        true ->
-            Client ! {flow_error, FlowId, Reason},
-            {stop, normal, State}
-    end;
+handle_info({'EXIT', _Pid, normal}, StateName, State) ->
+    {next_state, StateName, State};
+handle_info({'EXIT', _Pid, Reason}, _StateName, #state{flow_id=FlowId, client=Client}=State) ->
+    Client ! {flow_error, FlowId, Reason},
+    {stop, normal, State};
 handle_info(_Info, StateName, State) ->
     {next_state, StateName, State}.
 
         undefined ->
             case luke_phase_sup:new_phase(PhaseMod, Behaviors, NextFSM, self(), Timeout, Args) of
                 {ok, Pid} ->
-                    erlang:monitor(process, Pid),
+                    erlang:link(Pid),
                     start_phases(T, Timeout, [Pid|Accum]);
                 Error ->
                     Error

apps/luke/src/luke_phase.erl

 init([PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs]) ->
     case PhaseMod:init(PhaseArgs) of
         {ok, ModState} ->
-            erlang:monitor(process, Flow),
             Accumulate = lists:member(accumulate, Behaviors),
             Converge = lists:member(converge, Behaviors),
             {ok, executing, #state{mod=PhaseMod, modstate=ModState, next_phases=NextPhases,
 
 handle_sync_event(_Event, _From, StateName, #state{timeout=Timeout}=State) ->
     {reply, ignored, StateName, State, Timeout}.
-
-handle_info({'DOWN', _MRef, _Type, Flow, _Info}, _StateName, #state{flow=Flow}=State) ->
-    {stop, normal, State};
 handle_info(timeout, executing, #state{cb_timeout=true, mod=Mod, modstate=ModState}=State) ->
     handle_callback(Mod:handle_timeout(ModState), State#state{cb_timeout=false});
 handle_info(timeout, executing, #state{flow=Flow}=State) ->

apps/luke/tests/lifecycle_tests.erl

              %% Startup/teardown, no input
              {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
              Phases = test_util:verify_phases(Pid, 2),
-             exit(Pid, shutdown),
+             exit(Pid, kill),
              timer:sleep(10),
              test_util:assertDead([Pid|Phases]) end,
      fun() ->
              %% Startup/teardown, infinity timeout, no input
              {ok, Pid} = luke:new_flow(self(), make_ref(), ?TWO_PHASE_FLOW, infinity),
              Phases = test_util:verify_phases(Pid, 2),
-             exit(Pid, shutdown),
+             exit(Pid, kill),
              timer:sleep(10),
              test_util:assertDead([Pid|Phases]) end,
      fun() ->
              {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
              Phases = test_util:verify_phases(Pid, 2),
              luke_flow:add_inputs(Pid, [100]),
-             exit(Pid, shutdown),
+             exit(Pid, kill),
              timer:sleep(10),
              test_util:assertDead([Pid|Phases]) end,
      fun() ->
              luke_flow:add_inputs(Pid, [100]),
              luke_flow:add_inputs(Pid, [200]),
              luke_flow:finish_inputs(Pid),
-             exit(Pid, shutdown),
+             exit(Pid, kill),
              timer:sleep(10),
              test_util:assertDead([Pid|Phases]) end].

apps/luke/tests/results_tests.erl

              {FlowId, Pid, Phases} = test_util:start_flow(?TWO_PHASE_FLOW),
              luke_flow:add_inputs(Pid, ["hello"]),
              test_util:verify_results(FlowId, none),
-             exit(Pid, shutdown),
+             exit(Pid, kill),
              test_util:assertDead([Pid|Phases]) end,
      fun() ->
              {FlowId, Pid, Phases} = test_util:start_flow(?TWO_PHASE_FLOW),