Commits

jus...@basho.com  committed 7054c90

reduce periodically and at finish, not per-input-list

  • Participants
  • Parent commits 7f2e3ab

Comments (0)

Files changed (1)

File src/riak_reduce_phase_fsm.erl

 
 -export([wait/2]). 
 
--record(state, {done,qterm,next_fsm,coord,acc,reduced}).
+-record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input}).
 
 start_link(_Ring,QTerm,NextFSM,Coordinator) ->
     gen_fsm:start_link(?MODULE, [QTerm,NextFSM,Coordinator], []).
 init([QTerm,NextFSM,Coordinator]) ->
     {_,_,_,Acc} = QTerm,
     riak_eventer:notify(riak_reduce_phase_fsm, reduce_start, start),
-    {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,
+    {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,fresh_input=false,
                     coord=Coordinator,acc=Acc,reduced=[]}}.
 
-wait(done, StateData=#state{acc=Acc,next_fsm=NextFSM,coord=Coord,
-                            reduced=Reduced}) ->
-    case NextFSM of
-        final -> nop;
-        _ -> 
-            gen_fsm:send_event(NextFSM, {input, Reduced}),
-            gen_fsm:send_event(NextFSM, done)
+wait(timeout, StateData=#state{next_fsm=NextFSM,done=Done,
+                               acc=Acc,fresh_input=Fresh,
+                               qterm={reduce,FunTerm,Arg,_Acc},
+                               coord=Coord,reduced=Reduced}) ->
+    {Res,Red} = case Fresh of
+        false ->
+            {{next_state, wait, StateData#state{reduced=Reduced}},Reduced};
+        true ->
+            try
+                NewReduced = case FunTerm of
+                                 {qfun,F} -> F(Reduced,Arg);
+                                 {modfun,M,F} -> M:F(Reduced,Arg)
+                             end,
+                {{next_state, wait, StateData#state{reduced=NewReduced}},
+                 NewReduced}
+            catch C:R ->
+                    Reason = {C, R, erlang:get_stacktrace()},
+                    case NextFSM of
+                        final -> nop;
+                        _ -> gen_fsm:send_event(NextFSM, die)
+                    end,
+                    gen_fsm:send_event(Coord, {error, self(), Reason}),
+                    {{stop,normal,StateData},Reduced}
+            end
     end,
-    case Acc of
-        false -> nop;
-        true -> gen_fsm:send_event(Coord, {acc, {list, Reduced}})
-    end,
-    gen_fsm:send_event(Coord, {done, self()}),
-    riak_eventer:notify(riak_reduce_phase_fsm, done, self()),
-    {stop,normal,StateData};
-wait({input,Inputs}, StateData=#state{next_fsm=NextFSM,
-                                      qterm={reduce,FunTerm,Arg,_Acc},
-                                      coord=Coord,reduced=Reduced}) ->
-    NewInputs = Inputs ++ Reduced,
-    try
-        NewReduced = case FunTerm of
-            {qfun,F} -> F(NewInputs,Arg);
-            {modfun,M,F} -> M:F(NewInputs,Arg)
-        end,
-        {next_state, wait, StateData#state{reduced=NewReduced}}
-    catch C:R ->
-         Reason = {C, R, erlang:get_stacktrace()},
-         case NextFSM of
-             final -> nop;
-             _ -> gen_fsm:send_event(NextFSM, die)
-         end,
-         gen_fsm:send_event(Coord, {error, self(), Reason}),
-         {stop,normal,StateData}
+    case Done of
+        false -> Res;
+        true ->
+            case NextFSM of
+                final -> nop;
+                _ -> 
+                    gen_fsm:send_event(NextFSM, {input, Red}),
+                    gen_fsm:send_event(NextFSM, done)
+            end,
+            case Acc of
+                false -> nop;
+                true -> gen_fsm:send_event(Coord, {acc, {list, Red}})
+            end,
+            gen_fsm:send_event(Coord, {done, self()}),
+            {stop,normal,StateData}
     end;
+wait(done, StateData) ->
+    {next_state, wait, StateData#state{done=true}, 1};
+wait({input,Inputs}, StateData=#state{reduced=Reduced}) ->
+    {next_state, wait,
+     StateData#state{reduced=Inputs ++ Reduced, fresh_input=true}, 100};
 wait(die, StateData=#state{next_fsm=NextFSM}) ->
     riak_eventer:notify(riak_reduce_phase_fsm, die, die),
     case NextFSM of