Commits

Anonymous committed 2927b76

Getting reduce phase errors to propagate up the FSM chain

  • Participants
  • Parent commits 53e2b79

Comments (0)

Files changed (2)

File apps/riak/src/mapred_resource.erl

 format_error({error, Error}) when is_list(Error) ->
     mochijson2:encode({struct, Error});
 format_error(_Error) ->
-    io:format("_Error: ~p~n", [_Error]),
     mochijson2:encode({struct, [{error, map_reduce_error}]}).
 
 stream_mapred_results(RD, ReqId, State) ->

File apps/riak/src/riak_reduce_phase_fsm.erl

     {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,fresh_input=false,
                     coord=Coordinator,acc=Acc,reduced=[]}}.
 
-wait(timeout, StateData=#state{next_fsm=NextFSM,done=Done,
-                               acc=Acc,fresh_input=Fresh,
-                               qterm={Lang,{reduce,FunTerm,Arg,_Acc}},
-                               coord=Coord,reduced=Reduced}) ->
-    {Res,Red} = case Fresh of
-        false ->
-            {{next_state, wait, StateData#state{reduced=Reduced}},Reduced};
-        true ->
-            try
-                NewReduced = case {Lang, FunTerm} of
-                                 {erlang, {qfun,F}} ->
-                                     F(Reduced,Arg);
-                                 {erlang, {modfun,M,F}} ->
-                                     M:F(Reduced,Arg);
-                                 {javascript, QTerm} ->
-                                     case js_reduce(QTerm, Reduced, Arg) of
-                                         {ok, Result} ->
-                                             Result;
-                                         {error, Error} ->
-                                             error_logger:error_msg("Javascript reduce error: ~p~n", [Error]),
-                                             Reduced
-                                     end
-                             end,
-                {{next_state, wait, StateData#state{reduced=NewReduced}}, NewReduced}
-            catch C:R ->
-                    Reason = {C, R, erlang:get_stacktrace()},
+wait(timeout, StateData=#state{next_fsm=NextFSM, done=Done, acc=Acc, coord=Coord}) ->
+    case perform_reduce(StateData) of
+        {HasUpdates, NewStateData} when HasUpdates =:= true;
+                                    HasUpdates =:= false ->
+            case Done of
+                false ->
+                    {next_state, wait, NewStateData};
+                true ->
                     case NextFSM of
-                        final -> nop;
-                        _ -> riak_phase_proto:die(NextFSM)
+                        final ->
+                            nop;
+                        _ ->
+                            riak_phase_proto:send_inputs(NextFSM, NewStateData#state.reduced),
+                            riak_phase_proto:done(NextFSM)
                     end,
-                    riak_phase_proto:error(Coord, Reason),
-                    {{stop,normal,StateData},Reduced}
-            end
-    end,
-    case Done of
-        false -> Res;
-        true ->
-            case NextFSM of
-                final -> nop;
-                _ ->
-                    riak_phase_proto:send_inputs(NextFSM, Red),
-                    riak_phase_proto:done(NextFSM)
-            end,
-            case Acc of
-                false -> nop;
-                true -> riak_phase_proto:phase_results(Coord, Red)
-            end,
-            riak_phase_proto:phase_done(Coord),
-            {stop,normal,StateData}
+                    case Acc of
+                        false ->
+                            nop;
+                        true ->
+                            riak_phase_proto:phase_results(Coord, NewStateData#state.reduced),
+                            {stop, normal, NewStateData}
+                    end
+            end;
+        {error, NewStateData} ->
+            #state{reduced=Reason}=NewStateData,
+            riak_phase_proto:error(Coord, Reason),
+            {stop, normal, NewStateData}
     end;
 wait(done, StateData) ->
     {next_state, wait, StateData#state{done=true}, 1};
     end,
     {stop,normal,StateData}.
 
+perform_reduce(#state{fresh_input=false}=State) ->
+    {false, State};
+perform_reduce(#state{reduced=Reduced,
+                      qterm={Lang,{reduce,FunTerm,Arg,_Acc}}}=State) ->
+    try
+        case {Lang, FunTerm} of
+            {erlang, {qfun,F}} ->
+                {true, State#state{reduced=F(Reduced,Arg)}};
+            {erlang, {modfun,M,F}} ->
+                {true, State#state{reduced=M:F(Reduced,Arg)}};
+            {javascript, QTerm} ->
+                case js_reduce(QTerm, Reduced, Arg) of
+                    {ok, Result} ->
+                        {true, State#state{reduced=Result}};
+                    Error ->
+                        {error, State#state{reduced=Error}}
+                end
+        end
+    catch C:R ->
+            Reason = {C, R, erlang:get_stacktrace()},
+            {error, State#state{reduced=Reason}}
+    end.
+
 js_reduce(QTerm, Reduced, Arg) ->
     case riak_js_manager:blocking_dispatch({QTerm, Reduced, Arg}) of
         {ok, Result} ->