Commits

Kevin Smith  committed 5507dff

Moving message construction and sending (via gen_fsm) into riak_phase_proto

  • Participants
  • Parent commits cab26cb

Comments (0)

Files changed (6)

File apps/js_data/src/jsd_generator.erl

         true ->
             nop
     end,
-    %M = fun(Obj, _, _) ->
-    %            Values = riak_object:get_value(Obj), [proplists:get_value(avg_sales, V) || V <- Values] end,
-    %R = fun(Values, _) ->
-    %            lists:sum(Values) / length(Values) end,
-    M = <<"function(values, key_data, arg) { return values.map(function(value) { return value[\"avg_sales\"]; });};">>,
-    R = <<"function(values, arg) { var accum = 0; values.map(function(v) { accum = accum + v;}); return accum / values.length; };">>,
-    {ok, _} = Client:mapred([{<<"customers">>, <<"customer_list">>}], [{map, {jsanon, M}, none, false},
-                                                                       {reduce, {jsanon, R}, none, true}]),
+    M = fun(Obj, _, _) ->
+                Values = riak_object:get_value(Obj), [proplists:get_value(avg_sales, V) || V <- Values] end,
+    R = fun(Values, _) ->
+                lists:sum(Values) / length(Values) end,
+    %M = <<"function(values, key_data, arg) { return values.map(function(value) { return value[\"avg_sales\"]; });};">>,
+    %R = <<"function(values, arg) { var accum = 0; values.map(function(v) { accum = accum + v;}); return accum / values.length; };">>,
+    {ok, _} = Client:mapred([{<<"customers">>, <<"customer_list">>}], [{map, {qfun, M}, none, false},
+                                                                       {reduce, {qfun, R}, none, true}]),
     stress(Client, Count - 1).
 
 

File apps/riak/ebin/riak.app

 			 riak_multi_backend,
              riak_object,
              riak_osmos_backend,
+             riak_phase_proto,
              riak_phase_sup,
              riak_put_fsm,
              riak_reduce_phase_fsm,

File apps/riak/src/riak_map_executor.erl

 %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 %% KIND, either express or implied.  See the License for the
 %% specific language governing permissions and limitations
-%% under the License.    
+%% under the License.
 
 -module(riak_map_executor).
 -behaviour(gen_fsm).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
--export([wait/2]). 
+-export([wait/2]).
 
 -record(state, {bkey,qterm,phase_pid,vnodes,keydata,ring}).
 
     end,
     case LinkFun of
         linkfun_unset ->
-            gen_fsm:send_event(PhasePid, {mapexec_error, self(),
-                            io_lib:format("linkfun unset for ~s",[Bucket])}),
+            riak_phase_proto:mapexec_error(PhasePid,
+                                            io_lib:format("linkfun unset for ~s",[Bucket])),
             {stop,no_linkfun};
         _ ->
             QTerm = case QTerm0 of
                     {vnode_map, {P,node()},
                      {self(),QTerm,BKey,KeyData}}),
     VNs.
-    
+
 wait(timeout, StateData=#state{phase_pid=PhasePid,vnodes=[]}) ->
-    gen_fsm:send_event(PhasePid, {mapexec_error, self(), "all nodes failed"}),
+    riak_phase_proto:mapexec_error(PhasePid, "all nodes failed"),
     {stop,normal,StateData};
 wait(timeout, StateData=
      #state{vnodes=VNodes,qterm=QTerm,bkey=BKey,keydata=KeyData}) ->
      1000};
 wait({mapexec_error, VN, ErrMsg},
      StateData=#state{phase_pid=PhasePid,vnodes=[]}) ->
-    riak_eventer:notify(riak_map_executor, mapexec_vnode_err, {VN,ErrMsg}),    
-    gen_fsm:send_event(PhasePid, {mapexec_error, self(), "all nodes failed"}),
+    riak_eventer:notify(riak_map_executor, mapexec_vnode_err, {VN,ErrMsg}),
+    riak_phase_proto:mapexec_error(PhasePid, "all nodes failed"),
     {stop,normal,StateData};
 wait({mapexec_error, VN, ErrMsg},StateData=
      #state{vnodes=VNodes,qterm=QTerm,bkey=BKey,keydata=KeyData}) ->
-    riak_eventer:notify(riak_map_executor, mapexec_vnode_err, {VN,ErrMsg}),    
+    riak_eventer:notify(riak_map_executor, mapexec_vnode_err, {VN,ErrMsg}),
     {next_state, wait, StateData#state{
                          vnodes=try_vnode(QTerm, BKey, KeyData, VNodes)},
      1000};
 wait({mapexec_reply, RetVal, _VN}, StateData=#state{phase_pid=PhasePid}) ->
-    gen_fsm:send_event(PhasePid, {mapexec_reply, RetVal, self()}),
+    riak_phase_proto:mapexec_result(PhasePid, RetVal),
     {stop,normal,StateData}.
 
-%% @private    
+%% @private
 handle_event(_Event, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 
 
 %% @private
 code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
-

File apps/riak/src/riak_map_phase_fsm.erl

     FSMs = lists:delete(MapFSM,FSMs0),
     case NextFSM of
         final -> nop;
-        _ -> gen_fsm:send_event(NextFSM, {input, Reply})
+        _ -> riak_phase_proto:send_inputs(NextFSM, Reply)
     end,
     case Acc of
         false -> nop;
-        true -> gen_fsm:send_event(Coord, {acc, {list, Reply}})
+        true -> riak_phase_proto:phase_results(Coord, Reply)
     end,
     case FSMs =:= [] andalso Done =:= true of
         true ->
 wait({mapexec_error, _ErrFSM, ErrMsg}, StateData=
      #state{next_fsm=NextFSM,coord=Coord}) ->
     riak_eventer:notify(riak_map_phase_fsm, error, ErrMsg),
-    gen_fsm:send_event(Coord, {error, self(), ErrMsg}),
+    riak_phase_proto:error(Coord, ErrMsg),
     case NextFSM of
         final -> nop;
-        _ -> gen_fsm:send_event(NextFSM, die)
+        _ -> riak_phase_proto:die(NextFSM)
     end,
     {stop,normal,StateData};
 wait(done, StateData=#state{map_fsms=FSMs}) ->
     riak_eventer:notify(riak_map_phase_fsm, map_die, die),
     case NextFSM of
         final -> nop;
-        _ -> gen_fsm:send_event(NextFSM, die)
+        _ -> riak_phase_proto:die(NextFSM)
     end,
     {stop,normal,StateData}.
 
 finish(StateData=#state{next_fsm=NextFSM,coord=Coord}) ->
     case NextFSM of
         final -> nop;
-        _ -> gen_fsm:send_event(NextFSM, done)
+        _ -> riak_phase_proto:done(NextFSM)
     end,
-    gen_fsm:send_event(Coord, {done, self()}),
+    riak_phase_proto:phase_done(Coord),
     riak_eventer:notify(riak_map_phase_fsm, map_done, done),
     {stop,normal,StateData}.
 

File apps/riak/src/riak_mapreduce_fsm.erl

      StateData=#state{reqid=ReqId,timeout=Timeout,fsms=FSMs}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_got_input,
                         {ReqId, length(Inputs)}),
-    gen_fsm:send_event(hd(FSMs), {input, Inputs}),
+    riak_phase_proto:send_inputs(hd(FSMs), Inputs),
     {next_state, wait, StateData, Timeout};
 wait(input_done, StateData=#state{reqid=ReqId,fsms=FSMs}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_done_input, {ReqId}),
-    gen_fsm:send_event(hd(FSMs), done),
+    riak_phase_proto:done(hd(FSMs)),
     maybe_finish(StateData#state{input_done=true});
 wait({done,FSM}, StateData=#state{fsms=FSMs0}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done_msg, {FSM,FSMs0}),
 wait({error, ErrFSM, ErrMsg}, StateData=#state{client=Client,reqid=ReqId,
                                                fsms=FSMs0}) ->
     FSMs = lists:delete(ErrFSM,FSMs0),
-    [gen_fsm:send_event(FSM, die) || FSM <- FSMs],
+    riak_phase_proto:die_all(FSMs),
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {error, ReqId}),
     Client ! {ReqId, {error, ErrMsg}},
     {stop,normal,StateData};

File apps/riak/src/riak_reduce_phase_fsm.erl

                     Reason = {C, R, erlang:get_stacktrace()},
                     case NextFSM of
                         final -> nop;
-                        _ -> gen_fsm:send_event(NextFSM, die)
+                        _ -> riak_phase_proto:die(NextFSM)
                     end,
-                    gen_fsm:send_event(Coord, {error, self(), Reason}),
+                    riak_phase_proto:error(Coord, Reason),
                     {{stop,normal,StateData},Reduced}
             end
     end,
             case NextFSM of
                 final -> nop;
                 _ ->
-                    gen_fsm:send_event(NextFSM, {input, Red}),
-                    gen_fsm:send_event(NextFSM, done)
+                    riak_phase_proto:input(NextFSM, Red),
+                    riak_phase_proto:done(NextFSM)
             end,
             case Acc of
                 false -> nop;
-                true -> gen_fsm:send_event(Coord, {acc, {list, Red}})
+                true -> riak_phase_proto:phase_results(Coord, Red)
             end,
-            gen_fsm:send_event(Coord, {done, self()}),
+            riak_phase_proto:phase_done(Coord),
             {stop,normal,StateData}
     end;
 wait(done, StateData) ->
     riak_eventer:notify(riak_reduce_phase_fsm, die, die),
     case NextFSM of
         final -> nop;
-        _ -> gen_fsm:send_event(NextFSM, die)
+        _ -> riak_phase_proto:die(NextFSM)
     end,
     {stop,normal,StateData}.