Commits

Anonymous committed a07a3d9

[mq]: map phase consolidation

  • Participants
  • Parent commits 5dfaf37

Comments (0)

Files changed (5)

File src/riak_map_executor.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.    
-
--module(riak_map_executor).
--behaviour(gen_fsm).
-
--export([start_link/4]).
--export([init/1, handle_event/3, handle_sync_event/4,
-         handle_info/3, terminate/3, code_change/4]).
-
--export([wait/2]). 
-
--record(state, {bkey,qterm,phase_pid,vnodes,keydata,ring}).
-
-% {link, Bucket, Tag, Acc}
-% {map, FunTerm, Arg, Acc}
-
-% where FunTerm is one of:
-% {modfun, Mod, Fun} : Mod and Fun are both atoms -> Mod:Fun(Obj,Keydata,Arg)
-% {qfun, Fun} : Fun is an actual fun -> Fun(Obj,Keydata,Arg)
-
-% all map funs (and link funs) must return a list of values,
-% but that is not enforced at this layer
-
-start_link(Ring,Input,QTerm,PhasePid) ->
-    gen_fsm:start_link(?MODULE, [Ring,Input,QTerm,PhasePid], []).
-%% @private
-init([Ring,{{Bucket,Key},KeyData},QTerm0,PhasePid]) ->
-    riak_eventer:notify(riak_map_executor, mapexec_start, start),
-    DocIdx = chash:key_of({Bucket,Key}),
-    BucketProps = riak_bucket:get_bucket(Bucket, Ring),
-    LinkFun = case QTerm0 of
-        {link,_,_,_} -> proplists:get_value(linkfun, BucketProps);
-        _ -> nop
-    end,
-    case LinkFun of
-        linkfun_unset ->
-            gen_fsm:send_event(PhasePid, {mapexec_error, self(),
-                            io_lib:format("linkfun unset for ~s",[Bucket])}),
-            {stop,no_linkfun};
-        _ ->
-            QTerm = case QTerm0 of
-                        {map, _, _, _} -> QTerm0;
-                        {link, LB, LT, LAcc} -> {map, LinkFun, {LB, LT}, LAcc}
-                    end,
-            N = proplists:get_value(n_val,BucketProps),
-            Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N),
-            {Targets, _} = lists:split(N, Preflist),
-            VNodes = try_vnode(QTerm, {Bucket,Key}, KeyData, Targets),
-            {ok,wait,
-             #state{bkey={Bucket,Key},qterm=QTerm,phase_pid=PhasePid,
-                    vnodes=VNodes,keydata=KeyData,ring=Ring},
-             1000}
-    end.
-
-try_vnode(QTerm, BKey, KeyData, [{P,VN}|VNs]) ->
-    riak_eventer:notify(riak_map_executor, try_vnode, {QTerm,BKey,P,VN}),
-    gen_server:cast({riak_vnode_master, VN},
-                    {vnode_map, {P,node()},
-                     {self(),QTerm,BKey,KeyData}}),
-    VNs.
-    
-wait(timeout, StateData=#state{phase_pid=PhasePid,vnodes=[]}) ->
-    gen_fsm:send_event(PhasePid, {mapexec_error, self(), "all nodes failed"}),
-    {stop,normal,StateData};
-wait(timeout, StateData=
-     #state{vnodes=VNodes,qterm=QTerm,bkey=BKey,keydata=KeyData}) ->
-    {next_state, wait, StateData#state{
-                         vnodes=try_vnode(QTerm, BKey, KeyData, VNodes)},
-     1000};
-wait({mapexec_error, VN, ErrMsg},
-     StateData=#state{phase_pid=PhasePid,vnodes=[]}) ->
-    riak_eventer:notify(riak_map_executor, mapexec_vnode_err, {VN,ErrMsg}),    
-    gen_fsm:send_event(PhasePid, {mapexec_error, self(), "all nodes failed"}),
-    {stop,normal,StateData};
-wait({mapexec_error, VN, ErrMsg},StateData=
-     #state{vnodes=VNodes,qterm=QTerm,bkey=BKey,keydata=KeyData}) ->
-    riak_eventer:notify(riak_map_executor, mapexec_vnode_err, {VN,ErrMsg}),    
-    {next_state, wait, StateData#state{
-                         vnodes=try_vnode(QTerm, BKey, KeyData, VNodes)},
-     1000};
-wait({mapexec_reply, RetVal, _VN}, StateData=#state{phase_pid=PhasePid}) ->
-    gen_fsm:send_event(PhasePid, {mapexec_reply, RetVal, self()}),
-    {stop,normal,StateData}.
-
-%% @private    
-handle_event(_Event, _StateName, StateData) ->
-    {stop,badmsg,StateData}.
-
-%% @private
-handle_sync_event(_Event, _From, _StateName, StateData) ->
-    {stop,badmsg,StateData}.
-
-%% @private
-handle_info(_Info, _StateName, StateData) ->
-    {stop,badmsg,StateData}.
-
-%% @private
-terminate(Reason, _StateName, _State) ->
-    riak_eventer:notify(riak_map_executor, mapexec_end, Reason),
-    Reason.
-
-%% @private
-code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
-

File src/riak_map_phase_fsm.erl

 -module(riak_map_phase_fsm).
 -behaviour(gen_fsm).
 
--export([start_link/4]).
+-export([start_link/5]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 -export([wait/2]). 
 
--record(state, {done,qterm,next_fsm,coord,acc,map_fsms,ring}).
+-record(state, {done,qterm,next_qterm,next_fsm,coord,acc,local_fsms,
+                ring,pending}).
 
-start_link(Ring,QTerm,NextFSM,Coordinator) ->
-    gen_fsm:start_link(?MODULE, [Ring,QTerm,NextFSM,Coordinator], []).
+% done: whether we believe that we are no longer receiving input
+% qterm: the m/r query term for this phase
+% next_qterm: the m/r query term for the phase after this one
+% next_fsm: the pid for the phase after this one
+% coord: the pid for the mapreduce_fsm coordinating the overall request
+% acc: true/false, whether or not the client wants this phase's results
+% local_fsms: the vnode-local workers for this phase in the form
+%             [{Partition, LocalPhasePid}]
+% ring: a riak_ring structure
+% pending: items we're waiting on, in the form:
+%             [{BKeyData,[{Partition,VNode}]}]  (a preflist)
+
+start_link(Ring,QTerm,NextFSM,NextQTerm,Coordinator) ->
+    gen_fsm:start_link(?MODULE, [Ring,QTerm,NextFSM,NextQTerm,Coordinator], []).
 %% @private
-init([Ring,QTerm,NextFSM,Coordinator]) ->
+init([Ring,QTerm,NextFSM,NextQTerm,Coordinator]) ->
     {_,_,_,Acc} = QTerm,
     riak_eventer:notify(riak_map_phase_fsm, map_start, start),
-    {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,
-                    coord=Coordinator,acc=Acc,map_fsms=[],ring=Ring}}.
+    {ok,wait,#state{qterm=QTerm,next_fsm=NextFSM,next_qterm=NextQTerm,
+      ring=Ring,coord=Coordinator,acc=Acc,local_fsms=[],pending=[],done=false}}.
 
-wait({mapexec_reply,Reply,MapFSM}, StateData=
-     #state{done=Done,next_fsm=NextFSM,coord=Coord,acc=Acc,map_fsms=FSMs0}) ->
-    FSMs = lists:delete(MapFSM,FSMs0),
+wait({localphase_reply,Results,BKData}, StateData=#state{
+       pending=Pending0,done=Done,next_fsm=NextFSM,coord=Coord,acc=Acc}) ->
+    Pending = lists:foldl(fun(BKD,P) -> lists:keydelete(BKD,1,P) end,
+                          Pending0, BKData),
     case NextFSM of
         final -> nop;
-        _ -> gen_fsm:send_event(NextFSM, {input, Reply})
+        _ -> gen_fsm:send_event(NextFSM, {input, Results})
     end,
     case Acc of
         false -> nop;
-        true -> gen_fsm:send_event(Coord, {acc, {list, Reply}})
+        true -> gen_fsm:send_event(Coord, {acc, {list, Results}})
     end,
-    case FSMs of
+    case Pending of
         [] -> 
             case Done of
                 true -> finish(StateData);
-                _ -> nop
+                _ -> {next_state, wait, StateData#state{pending=Pending}}
             end;
-        _ -> nop
-    end,
-    {next_state, wait, StateData#state{map_fsms=FSMs}};
-wait({mapexec_error, _ErrFSM, ErrMsg}, StateData=
-     #state{next_fsm=NextFSM,coord=Coord}) ->
-    riak_eventer:notify(riak_map_phase_fsm, error, ErrMsg),
-    gen_fsm:send_event(Coord, {error, self(), ErrMsg}),
-    case NextFSM of
-        final -> nop;
-        _ -> gen_fsm:send_event(NextFSM, die)
-    end,
-    {stop,normal,StateData};
-wait(done, StateData=#state{map_fsms=FSMs}) ->
+        _ -> {next_state, wait, StateData#state{pending=Pending}}
+    end;
+wait({mapexec_error, ErrFSM, {BKey,KeyData}}, StateData= 
+     #state{next_fsm=NextFSM,coord=Coord,pending=Pending0}) ->
+    % single item fail passthrough, try another localphase
+    riak_eventer:notify(riak_map_phase_fsm, mapexec_error,
+                        {ErrFSM,BKey,KeyData}),
+    BKData = {BKey,KeyData},
+    {BKData, PrefList} = lists:keyfind(BKData, 1, Pending0),
+    case PrefList of
+        [] -> % out of local phase locations to try, time to die
+            gen_fsm:send_event(Coord, {error, self(), "too many nodes failed"}),
+            case NextFSM of
+                final -> nop;
+                _ -> gen_fsm:send_event(NextFSM, die)
+            end,
+            {stop,normal,StateData};
+        [{Partition,VNode}|RestPList] ->
+            NewSD = send_to_localphase({Partition,VNode,[BKData]},StateData),
+            Pending = lists:keyreplace(BKData, 1, Pending0, {BKData,RestPList}),
+            {next_state, wait, NewSD#state{pending=Pending}}
+    end;
+wait({localphase_finish,Partition}, StateData=#state{local_fsms=LocalFSMs}) ->
+    {next_state, wait,
+     StateData#state{local_fsms=lists:keydelete(Partition, 1, LocalFSMs)}};
+wait(done, StateData=#state{pending=Pending}) ->
     riak_eventer:notify(riak_map_phase_fsm, done_inputs, done_inputs),
-    case FSMs of
+    case Pending of
         [] -> finish(StateData);
         _ -> {next_state, wait, StateData#state{done=true}}
     end;
-wait({input,Inputs0}, StateData=#state{qterm=QTerm,map_fsms=FSMs0,ring=Ring}) ->
-    Inputs = [convert_input(I) || I <- Inputs0],
-    NewFSMs = [FSM ||
-               {ok,FSM} <- [riak_map_executor:start_link(Ring,Input,QTerm,self()) ||
-                  Input <- Inputs]],
-    FSMs = NewFSMs ++ FSMs0,
-    {next_state, wait, StateData#state{map_fsms=FSMs}};
+wait({input,Inputs}, StateData0=#state{pending=Pending,ring=Ring}) ->
+    {NewPend,GInputs} = group_inputs(Inputs,Ring),
+    StateData = lists:foldl(fun send_to_localphase/2, StateData0, GInputs),
+    {next_state, wait, StateData#state{pending=Pending ++ NewPend}};
 wait(die, StateData=#state{next_fsm=NextFSM}) ->
     % there is a very slight possibility of a 'die' message arriving
     %  at an unintended process, due to multiple die messages being sent.
     end,
     {stop,normal,StateData}.
 
-finish(StateData=#state{next_fsm=NextFSM,coord=Coord}) ->
+group_inputs(Inputs,Ring) ->
+% pending is [{BKData,[{Partition,VNode}]}]  (basically a preflist)
+    % NewPend is in form of 'pending' w/o first partition,
+    % GInputs is [{Partition,VNode,[BKData]}]
+    {NewPend, Partitioned} = lists:unzip([{{I,tl(Targets)},{I,hd(Targets)}} ||
+        {I,Targets} <- [make_single_input(I0,Ring) || I0 <- Inputs]]),
+    % [{I,{P,V}}]
+    GInputs0 = lists:foldl(fun add_to_group/2, [], Partitioned),
+    GInputs = [{P,V,L} || {{P,V},L} <- GInputs0],
+    {NewPend,GInputs}.
+make_single_input(I0,Ring) ->
+    % given either {B,K} or {{B,K},D}, produce:
+    % { {{B,K},D}, [{Partition,VNode}]  }
+    I = convert_input(I0),
+    {{Bucket,Key},_KeyData} = I,
+    BucketProps = riak_bucket:get_bucket(Bucket, Ring),
+    N = proplists:get_value(n_val,BucketProps),
+    Preflist = riak_ring:filtered_preflist(chash:key_of({Bucket,Key}), Ring, N),
+    {Targets, _} = lists:split(N, Preflist), % Targets is [{Partition,VNode}]
+    {I,Targets}.
+add_to_group({BKD,{P,V}}, GI) ->
+    case lists:keyfind({P,V}, 1, GI) of
+        false -> [{{P,V},[BKD]}|GI];
+        {{P,V},BKD_L} -> lists:keyreplace({P,V}, 1, GI, {{P,V},[BKD|BKD_L]})
+    end.
+    
+send_to_localphase({Partition,VNode,Inputs},
+                   StateData=#state{local_fsms=LocalFSMs,
+                        qterm=QTerm,next_qterm=NextQTerm}) ->
+    {FSM, LFSMs} = 
+    case lists:keyfind(Partition, 1, LocalFSMs) of
+        {Partition, TheFSM} -> {TheFSM, LocalFSMs};
+        _ ->
+            {ok, TheFSM} = riak_map_localphase:start_link(
+                             QTerm,NextQTerm,VNode,self(),Partition),
+            {TheFSM,[{Partition,TheFSM}|LocalFSMs]}
+    end,
+    gen_fsm:send_event(FSM,{input,Inputs}),
+    StateData#state{local_fsms=LFSMs}.
+
+finish(StateData=#state{next_fsm=NextFSM,local_fsms=LocalFSMs,coord=Coord}) ->
+    [gen_fsm:send_event(LFSM, done) || {_P,LFSM} <- LocalFSMs],
     case NextFSM of
         final -> nop;
         _ -> gen_fsm:send_event(NextFSM, done)
 %% @private
 code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
 
+%% @private
 convert_input(I={{_B,_K},_D})
   when is_atom(_B) andalso (is_list(_K) orelse is_binary(_K)) -> I;
 convert_input(I={_B,_K})

File src/riak_mapreduce_fsm.erl

 check_query_syntax([BadQTerm|_]) -> {bad_qterm,BadQTerm}.
 
 make_phase_fsms(Query, Ring) -> 
-    make_phase_fsms(lists:reverse(Query),final,[], Ring).
-make_phase_fsms([], _NextFSM, FSMs, _Ring) -> FSMs;
-make_phase_fsms([QTerm|Rest], NextFSM, FSMs, Ring) -> 
+    make_phase_fsms(lists:reverse(Query),final,final,[], Ring).
+make_phase_fsms([], _NextFSM, _NextQTerm, FSMs, _Ring) -> FSMs;
+make_phase_fsms([QTerm|Rest], NextFSM, NextQTerm, FSMs, Ring) -> 
     PhaseMod = case QTerm of
         {reduce, _, _, _} -> riak_reduce_phase_fsm;
         {map, _, _, _} -> riak_map_phase_fsm;
         {link, _, _, _} -> riak_map_phase_fsm
     end,
-    {ok, Pid} = PhaseMod:start_link(Ring, QTerm, NextFSM, self()),
-    make_phase_fsms(Rest,Pid,[Pid|FSMs], Ring).
+    {ok, Pid} = PhaseMod:start_link(Ring, QTerm, NextFSM, NextQTerm, self()),
+    make_phase_fsms(Rest,Pid,QTerm,[Pid|FSMs], Ring).
 
 wait({done,FSM}, StateData=#state{client=Client,acc=Acc,reqid=ReqID,
                                   endtime=End,fsms=FSMs0}) ->

File src/riak_reduce_phase_fsm.erl

 -module(riak_reduce_phase_fsm).
 -behaviour(gen_fsm).
 
--export([start_link/4]).
+-export([start_link/5]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 
 -record(state, {done,qterm,next_fsm,coord,acc,reduced}).
 
-start_link(_Ring,QTerm,NextFSM,Coordinator) ->
+start_link(_Ring,QTerm,NextFSM,_NextQTerm,Coordinator) ->
     gen_fsm:start_link(?MODULE, [QTerm,NextFSM,Coordinator], []).
 %% @private
 init([QTerm,NextFSM,Coordinator]) ->

File src/riak_vnode.erl

             end
     end.
 
+do_map(ClientPid,{link,LBucket,LTag,LAcc},
+       BKey,KeyData,Cache,Mod,ModState,VNode) ->
+    {ok, Ring} = riak_ring_manager:get_my_ring(),
+    {Bucket,_Key} = BKey,
+    BucketProps = riak_bucket:get_bucket(Bucket, Ring),
+    LinkFun = proplists:get_value(linkfun, BucketProps),
+    case LinkFun of
+        linkfun_unset ->
+            {mapexec_error, {BKey,KeyData}, self(), "no linkfun"};
+        _ ->
+            do_map(ClientPid,{map,LinkFun,{LBucket,LTag},LAcc},
+                   BKey,KeyData,Cache,Mod,ModState,VNode)
+    end;
 do_map(ClientPid,{map,FunTerm,Arg,_Acc},
        BKey,KeyData,Cache,Mod,ModState,VNode) ->
     riak_eventer:notify(riak_vnode, map_start, {FunTerm,Arg,BKey}),
              uncached_map(BKey,Mod,ModState,FunTerm,Arg,KeyData,VNode);
         CV ->
              riak_eventer:notify(riak_vnode,cached_map,{FunTerm,Arg,BKey}),
-             {mapexec_reply, CV, self()}
+             {mapexec_reply, {BKey,KeyData}, CV, self()}
     end,
     riak_eventer:notify(riak_vnode, map_reply, {FunTerm,Arg,BKey}),
     gen_fsm:send_event(ClientPid, RetVal).
             uncached_map1(V,FunTerm,Arg,BKey,KeyData,VNode);
         {error, notfound} ->
             uncached_map1({error, notfound},FunTerm,Arg,BKey,KeyData,VNode);
-        X -> {mapexec_error, self(), X}
+        X -> {mapexec_error, {BKey,KeyData}, self(), X}
     end.
 
 uncached_map1(V,FunTerm,Arg,BKey,KeyData,VNode) ->
                                  {mapcache, BKey,{M,F,Arg,KeyData},MF_Res}),
                 MF_Res
         end,
-        {mapexec_reply, MapVal, self()}
+        {mapexec_reply, {BKey,KeyData}, MapVal, self()}
     catch C:R ->
          Reason = {C, R, erlang:get_stacktrace()},
-         {mapexec_error, self(), Reason}
+         {mapexec_error, {BKey,KeyData}, self(), Reason}
     end.
 
 do_merkle(RemoteVN,RemoteMerkle,