Anonymous avatar Anonymous committed b5e911c

replace riak_vnode and sidekick with single FSM

Comments (0)

Files changed (4)

src/riak_backup.erl

     [backup_vnode(VNode) ||  VNode <- VNodes].
     
 backup_vnode(_VNode = {_Index, VNodePid}) ->
-    List = gen_server2:call(VNodePid,list),
+    {ok, List} = gen_fsm:sync_send_event(VNodePid, list),
     [backup_key(VNodePid, Bucket, Key) || {Bucket, Key} <- List].
 
 backup_key(VNodePid, Bucket, Key) ->
-    {ok, B} = gen_server2:call(VNodePid, {get_binary, {Bucket, Key}}),
+    {ok, B} = gen_fsm:sync_send_event(VNodePid, {get_binary, {Bucket, Key}}),
     ok = dets:insert(?TABLE, [{{Bucket,Key}, B}]).
 
 
-
 %%% RESTORE %%%
 
 %% @doc

src/riak_vnode.erl

 %% specific language governing permissions and limitations
 %% under the License.    
 
-%% @doc gen_server process per partition-store
+-module(riak_vnode).
+-behaviour(gen_fsm).
 
--module(riak_vnode).
+-export([start/1]).
+-export([init/1, handle_event/3, handle_sync_event/4,
+         handle_info/3, terminate/3, code_change/4]).
+-export([active/2,merk_waiting/2,waiting_diffobjs/2]).
+-export([active/3,merk_waiting/3,waiting_diffobjs/3]).
 
--behaviour(gen_server2).
--export([start_link/1,start/1]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-	 terminate/2, code_change/3]).
--record(state, {idx,sidekick,mapcache,active,mod,modstate}).
+-define(TIMEOUT, 60000).
 
-%% @spec start_link(integer()) -> {ok, Pid}
-%% @doc Start the vnode with index VNodeIndex
-start_link(VNodeIndex) ->
-    gen_server2:start_link(?MODULE, [VNodeIndex], []).
+-record(state, {idx,mapcache,mod,modstate,waiting_diffobjs}).
 
-%% @spec start(integer()) -> {ok, Pid}
-%% @doc Start the vnode with index VNodeIndex
-start(VNodeIndex) ->
-    gen_server2:start(?MODULE, [VNodeIndex], []).
+start(Idx) ->
+    gen_fsm:start(?MODULE, [Idx], []).
+init([VNodeIndex]) ->
+    Mod = riak:get_app_env(storage_backend),
+    {ok, ModState} = Mod:start(VNodeIndex),
+    StateData0 = #state{idx=VNodeIndex,mod=Mod,modstate=ModState},
+    {next_state, StateName, StateData, Timeout} = hometest(StateData0),
+    {ok, StateName, StateData, Timeout}.
 
 %% @private
-init([VNodeIndex]) ->
-    riak_eventer:notify(riak_vnode, init, VNodeIndex),
-    Mod = riak:get_app_env(storage_backend),
-    {ok, ModState} = Mod:start(VNodeIndex),
-    {ok, Sidekick} = riak_vnode_sidekick:start(self(), VNodeIndex),
-    Cache = dict:new(),
-    {ok, #state{idx=VNodeIndex,active=true,
-                sidekick=Sidekick,mapcache=Cache,mod=Mod,modstate=ModState}}.
+hometest(StateData0=#state{idx=Idx}) ->
+    StateData = StateData0#state{mapcache=orddict:new()},
+    {ok, MyRing} = riak_ring_manager:get_my_ring(),
+    Me = node(),
+    case riak_ring:index_owner(MyRing, Idx) of
+        Me ->
+            {next_state,active,StateData,?TIMEOUT};
+        TargetNode ->
+            case net_adm:ping(TargetNode) of
+                pang -> {next_state,active,StateData,?TIMEOUT};
+                pong -> build_and_send_merkle(TargetNode, StateData)
+            end
+    end.
 
 %% @private
-handle_cast({mapcache,BKey,{M,F,Arg,KeyData},MF_Res},
-            State=#state{mapcache=Cache}) ->
-    KeyCache0 = case dict:find(BKey, Cache) of
-        error -> dict:new();
+build_and_send_merkle(TargetNode,
+                      StateData=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
+    ObjList = Mod:list(ModState),
+    Merk = make_merk(StateData, ObjList),
+    gen_server:cast({riak_vnode_master, TargetNode},
+                    {vnode_merkle, {self(),Idx,Merk,ObjList}}),
+    {next_state,merk_waiting,
+     StateData#state{waiting_diffobjs=ObjList},?TIMEOUT}.
+
+%% @private
+make_merk(StateData,ObjList) ->
+    Merk = merkerl:build_tree([]),
+    make_merk(StateData,ObjList,Merk).
+make_merk(_StateData,[],Merk) -> Merk;
+make_merk(StateData=#state{mod=Mod,modstate=ModState},
+          [BKey|Objlist],Merk) ->
+    V = Mod:get(ModState,BKey), % normally, V = {ok,BinObj}
+    make_merk(StateData,Objlist,merkerl:insert({BKey,erlang:phash2(V)},Merk)).
+
+%% @private
+send_diff_objs(TargetNode,DiffList,
+               StateData=#state{mod=Mod,modstate=ModState}) ->
+    % send each obj (BKey) in difflist to targetnode
+    % return a state with waiting_diffobjs populated
+    [send_diff_obj(TargetNode,BKey,Mod,ModState) || BKey <- DiffList],
+    StateData#state{waiting_diffobjs=DiffList}.
+send_diff_obj(TargetNode,BKey,Mod,ModState) ->
+    {ok,BinObj} = Mod:get(ModState,BKey),
+    gen_fsm:send_event(TargetNode, {diffobj,{BKey,BinObj,self()}}).
+
+%%%%%%%%%% in merk_waiting state, we have sent a merkle tree to the
+%%%%%%%%%% home vnode, and are waiting for a list of different objects
+merk_waiting({get_binary,_BKey}, _From, StateData) ->
+    {reply,{error, wrong_state},active,StateData,?TIMEOUT};
+merk_waiting(list, _From, StateData) ->
+    {reply,{error, wrong_state},active,StateData,?TIMEOUT}.
+merk_waiting(timeout, StateData) ->
+    % didn't get a response to our merkle tree, switch back to active mode
+    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
+merk_waiting(merk_nodiff, StateData0=#state{waiting_diffobjs=WD,
+                                            mod=Mod,modstate=ModState}) ->
+    % the far side is home and has all of the objects, cleanup time
+    StateData = StateData0#state{waiting_diffobjs=[]},
+    [Mod:delete(ModState, BKey) || BKey <- WD],
+    case Mod:list(ModState) of
+        [] -> 
+            Mod:stop(ModState),
+            {stop,normal,StateData};
+        _ ->
+            hometest(StateData)
+    end;
+merk_waiting({merk_diff,TargetVNode,DiffList}, StateData0) ->
+    StateData = send_diff_objs(TargetVNode,DiffList,StateData0),
+    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
+merk_waiting({diffobj,{_BKey,_BinObj,_RemNode}}, StateData) ->
+    hometest(StateData);
+merk_waiting({map, ClientPid, QTerm, BKey, KeyData},
+             StateData=#state{mapcache=Cache,mod=Mod,modstate=ModState}) ->
+    do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,self()),
+    {next_state,merk_waiting,StateData,?TIMEOUT};
+merk_waiting({put, FSM_pid, _BKey, _RObj, ReqID, _FSMTime},
+             StateData=#state{idx=Idx}) ->
+    gen_fsm:send_event(FSM_pid, {fail, Idx, ReqID}),
+    {next_state,merk_waiting,StateData,?TIMEOUT};
+merk_waiting({get, FSM_pid, BKey, ReqID}, StateData) ->
+    do_get(FSM_pid, BKey, ReqID, StateData),
+    {next_state,merk_waiting,StateData,?TIMEOUT};
+merk_waiting({vnode_merkle, {_RemoteVN,_Merkle,_ObjList}}, StateData) ->
+    hometest(StateData);
+merk_waiting({list_bucket, FSM_pid, Bucket, ReqID},
+             StateData=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
+    do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx),
+    {next_state,merk_waiting,StateData,?TIMEOUT};
+merk_waiting({delete, From, BKey, ReqID}, StateData=#state{mapcache=Cache}) ->
+    do_delete(From, BKey, ReqID, StateData),
+    {next_state,
+     merk_waiting,StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
+merk_waiting(_OtherMessage,StateData) ->
+    {next_state,merk_waiting,StateData,?TIMEOUT}.
+
+%%%%%%%%%% in waiting_diffobjs state, we have sent a list of diff objs to the
+%%%%%%%%%% home vnode, and are waiting to hear that they've been handled
+waiting_diffobjs({get_binary,_BKey}, _From, StateData) ->
+    {reply,{error, wrong_state},active,StateData,?TIMEOUT};
+waiting_diffobjs(list, _From, StateData) ->
+    {reply,{error, wrong_state},active,StateData,?TIMEOUT}.
+waiting_diffobjs(timeout, StateData) ->
+    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
+waiting_diffobjs({resolved_diffobj,K},
+         StateData0=#state{waiting_diffobjs=WD0,mod=Mod,modstate=ModState})->
+    WD = lists:delete(K,WD0),
+    Mod:delete(ModState, K),
+    StateData = StateData0#state{waiting_diffobjs=WD},
+    case WD of
+        [] -> % resolved all the intended diff objects
+            hometest(StateData);
+        _ -> % some left, keep waiting
+            {next_state,waiting_diffobjs,StateData,?TIMEOUT}
+    end;
+waiting_diffobjs(merk_nodiff, StateData) ->
+    % got merkle reply at a very strange time
+    % jump into active mode to handle some requests before trying again
+    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
+waiting_diffobjs({merk_diff,_TargetNode,_DiffList}, StateData) ->
+    % got merkle reply at a very strange time
+    % jump into active mode to handle some requests before trying again
+    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
+waiting_diffobjs({diffobj,{_BKey,_BinObj,_RemNode}}, StateData) ->
+    hometest(StateData);
+waiting_diffobjs({map, ClientPid, QTerm, BKey, KeyData},
+                 StateData=#state{mapcache=Cache,mod=Mod,modstate=ModState}) ->
+    do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,self()),
+    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
+waiting_diffobjs({put, FSM_pid, _BKey, _RObj, ReqID, _FSMTime},
+                 StateData=#state{idx=Idx}) ->
+    gen_fsm:send_event(FSM_pid, {fail, Idx, ReqID}),
+    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
+waiting_diffobjs({get, FSM_pid, BKey, ReqID}, StateData) ->
+    do_get(FSM_pid, BKey, ReqID, StateData),
+    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
+waiting_diffobjs({vnode_merkle, {_RemoteVN,_Merkle,_ObjList}}, StateData) ->
+    hometest(StateData);
+waiting_diffobjs({list_bucket, FSM_pid, Bucket, ReqID},
+                 StateData=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
+    do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx),
+    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
+waiting_diffobjs({delete, From, BKey, ReqID},
+                 StateData=#state{mapcache=Cache}) ->
+    do_delete(From, BKey, ReqID, StateData),
+    {next_state,waiting_diffobjs,
+     StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
+waiting_diffobjs(_OtherMessage,StateData) ->
+    {next_state,waiting_diffobjs,StateData,?TIMEOUT}.
+
+%%%%%%%%%% in active state, we process normal client requests
+active({get_binary,BKey}, _From, StateData=#state{mod=Mod,modstate=ModState}) ->
+    {reply,Mod:get(ModState,BKey),active,StateData,?TIMEOUT};
+active(list, _From, StateData=#state{mod=Mod,modstate=ModState}) ->
+    {reply,{ok, Mod:list(ModState)},active,StateData,?TIMEOUT}.
+active(timeout, StateData) ->
+    hometest(StateData);
+active({diffobj,{BKey,BinObj,FromVN}}, StateData) ->
+    do_diffobj_put(BKey, binary_to_term(BinObj), StateData),
+    gen_fsm:send_event(FromVN,{resolved_diffobj,BKey}),
+    {next_state,active,StateData,?TIMEOUT};
+active({map, ClientPid, QTerm, BKey, KeyData},
+       StateData=#state{mapcache=Cache,mod=Mod,modstate=ModState}) ->
+    do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,self()),
+    {next_state,active,StateData,?TIMEOUT};
+active({put, FSM_pid, BKey, RObj, ReqID, FSMTime},
+       StateData=#state{idx=Idx,mapcache=Cache}) ->
+    gen_fsm:send_event(FSM_pid, {w, Idx, ReqID}),
+    do_put(FSM_pid, BKey, RObj, ReqID, FSMTime, StateData),
+    {next_state,
+     active,StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
+active({get, FSM_pid, BKey, ReqID}, StateData) ->
+    do_get(FSM_pid, BKey, ReqID, StateData),
+    {next_state,active,StateData,?TIMEOUT};
+active({vnode_merkle, {RemoteVN,Merkle,ObjList}}, StateData) ->
+    Me = self(),
+    spawn(fun() -> do_merkle(Me,RemoteVN,Merkle,ObjList,StateData) end),
+    {next_state,active,StateData,?TIMEOUT};
+active({list_bucket, FSM_pid, Bucket, ReqID},
+       StateData=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
+    do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx),
+    {next_state,active,StateData,?TIMEOUT};
+active({delete, From, BKey, ReqID}, StateData=#state{mapcache=Cache}) ->
+    do_delete(From, BKey, ReqID, StateData),
+    {next_state,
+     active,StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
+active({mapcache, BKey,{M,F,Arg,KeyData},MF_Res},
+       StateData=#state{mapcache=Cache}) ->
+    KeyCache0 = case orddict:find(BKey, Cache) of
+        error -> orddict:new();
         {ok,CDict} -> CDict
     end,
-    KeyCache = dict:store({M,F,Arg,KeyData},MF_Res,KeyCache0),
-    {noreply, State#state{mapcache=dict:store(BKey,KeyCache,Cache)}};
-handle_cast(cache_purge, State=#state{idx=Idx}) ->
-    riak_eventer:notify(riak_vnode, cache_purge, Idx),
-    {noreply, State#state{mapcache=dict:new()}};
-handle_cast(activate, State=#state{idx=Idx}) ->
-    riak_eventer:notify(riak_vnode, activate, Idx),
-    {noreply, State#state{active=true}};
-handle_cast(deactivate, State=#state{idx=Idx}) ->  % when transferring to home
-    riak_eventer:notify(riak_vnode, deactivate, Idx),
-    {noreply, State#state{active=false}};
-handle_cast({rexit, From}, State=#state{idx=Idx,sidekick=Sidekick}) ->
-    riak_eventer:notify(riak_vnode, shutdown, {Idx,From}),
-    Sidekick ! vnode_shutdown,
-    {stop, normal, State};
-handle_cast({vnode_merkle, {RemoteVN,Merkle}}, State) ->
-    spawn(fun() -> do_merkle(RemoteVN,Merkle,State) end),
-    {noreply, State};
-handle_cast(_, State=#state{active=false}) -> % below here requires "active"
-    {noreply, State};
-handle_cast({map, ClientPid, QTerm, BKey, KeyData},
-            State=#state{mapcache=Cache,mod=Mod,modstate=ModState}) ->
-    riak_eventer:notify(riak_vnode, map, QTerm),
-    VNode = self(),
-    do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,VNode),
-    {noreply, State};
-handle_cast({put, FSM_pid, BKey, RObj, ReqID, FSMTime},
-            State=#state{mapcache=Cache,idx=Idx}) ->
-    riak_eventer:notify(riak_vnode, put, {ReqID, Idx}),
-    gen_fsm:send_event(FSM_pid, {w, Idx, ReqID}),
-    do_put(FSM_pid, BKey, RObj, ReqID, FSMTime, State),
-    {noreply, State#state{mapcache=dict:erase(BKey,Cache)}};
-handle_cast({get, FSM_pid, BKey, ReqID}, State=#state{idx=Idx}) ->
-    riak_eventer:notify(riak_vnode, get, {ReqID, Idx}),
-    do_get(FSM_pid, BKey, ReqID, State),
-    {noreply, State};
-handle_cast({delete, Client, BKey, ReqID}, State=#state{idx=Idx}) ->
-    riak_eventer:notify(riak_vnode, delete, {ReqID, Idx}),
-    do_delete(Client, BKey, ReqID, State),
-    {noreply, State};
-handle_cast({list_bucket, Client, Bucket, ReqID},
-            State=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
-    riak_eventer:notify(riak_vnode, list_bucket, {ReqID, Idx}),
-    do_list_bucket(Client,ReqID,Bucket,Mod,ModState,Idx),
-    {noreply, State}.
+    KeyCache = orddict:store({M,F,Arg,KeyData},MF_Res,KeyCache0),
+    {next_state,active,
+     StateData#state{mapcache=orddict:store(BKey,KeyCache,Cache)},?TIMEOUT};
+active(merk_nodiff, StateData) ->
+    hometest(StateData);
+active({merk_diff,_TargetNode,_DiffList}, StateData) ->
+    hometest(StateData);
+active({resolved_diffobj,_K}, StateData) ->
+    hometest(StateData).
 
 %% @private
-handle_call(is_backup_node,_From,State) ->
-    {reply, riak:get_app_env(backup, false), State};
-handle_call({get_binary,BKey},
-            From,State=#state{mod=Mod,modstate=ModState}) ->
-    async_get_binary(From,BKey,Mod,ModState),
-    {noreply, State};
-handle_call(list,From,State=#state{mod=Mod,modstate=ModState}) ->
-    async_do_list(From,Mod,ModState),
-    {noreply, State}.
-
 do_get(FSM_pid, BKey, ReqID,
        _State=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
     RetVal = case do_get_binary(BKey, Mod, ModState) of
         {ok, Binary} -> {ok, binary_to_term(Binary)};
         X -> X
     end,
-    riak_eventer:notify(riak_vnode, get_reply, ReqID),
     gen_fsm:send_event(FSM_pid, {r, RetVal, Idx, ReqID}).
 
-async_get_binary(From,BKey,Mod,ModState) ->
-    spawn(fun() ->
-                  RetVal = do_get_binary(BKey,Mod,ModState),
-                  gen_server2:reply(From, RetVal)
-          end).
-
-async_do_list(From,Mod,ModState) ->
-    spawn(fun() ->
-                  RetVal = Mod:list(ModState),
-                  gen_server2:reply(From, RetVal)
-          end).
-
+%% @private
 do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx) ->
     RetVal = Mod:list_bucket(ModState,Bucket),
     riak_eventer:notify(riak_vnode, keys_reply, {ReqID, FSM_pid}),
     gen_fsm:send_event(FSM_pid, {kl, RetVal,Idx,ReqID}).
 
+%% @private
 do_get_binary(BKey, Mod, ModState) ->
     Mod:get(ModState,BKey).
 
+%% @private
 do_delete(Client, BKey, ReqID,
           _State=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
     case Mod:delete(ModState, BKey) of
         ok ->
-            riak_eventer:notify(riak_vnode,delete_reply,ReqID),
             gen_server2:reply(Client, {del, Idx, ReqID});
-        {error, Reason} ->
-            riak_eventer:notify(riak_vnode,delete_fail,{ReqID,Reason}),
+        {error, _Reason} ->
             gen_server2:reply(Client, {fail, Idx, ReqID})
     end.
 
-simple_binary_put(BKey, Val, Mod, ModState) ->
-    Mod:put(ModState, BKey, Val).
+%% @private
+% upon receipt of a handoff datum, there is no client FSM
+do_diffobj_put(BKey, DiffObj, 
+       _StateData=#state{mod=Mod,modstate=ModState}) ->
+    ReqID = erlang:phash2(erlang:now()),
+    case syntactic_put_merge(Mod, ModState, BKey, DiffObj, ReqID) of
+        {newobj, NewObj} ->
+            Val = term_to_binary(NewObj),
+            Mod:put(ModState, BKey, Val);
+        _ -> nop
+    end.
 
+%% @private
+% upon receipt of a client-initiated put
 do_put(FSM_pid, BKey, RObj, ReqID, PruneTime, 
        _State=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
     {ok,Ring} = riak_ring_manager:get_my_ring(),    
     BProps = riak_bucket:get_bucket(Bucket, Ring),
     case syntactic_put_merge(Mod, ModState, BKey, RObj, ReqID) of
         oldobj -> 
-            riak_eventer:notify(riak_vnode,put_reply,ReqID),
             gen_fsm:send_event(FSM_pid, {dw, Idx, ReqID});
         {newobj, NewObj} ->
             VC = riak_object:vclock(NewObj),
             ObjToStore = riak_object:set_vclock(NewObj,
                                            vclock:prune(VC,PruneTime,BProps)),
             Val = term_to_binary(ObjToStore),
-            case simple_binary_put(BKey, Val, Mod, ModState) of
+            case Mod:put(ModState, BKey, Val) of
                 ok ->
-                    riak_eventer:notify(riak_vnode,put_reply,ReqID),
                     gen_fsm:send_event(FSM_pid, {dw, Idx, ReqID});
-                {error, Reason} ->
-                    riak_eventer:notify(riak_vnode,put_fail,{ReqID,Reason}),
+                {error, _Reason} ->
                     gen_fsm:send_event(FSM_pid, {fail, Idx, ReqID})
             end
     end.
 
+%% @private
 do_map(ClientPid,{map,FunTerm,Arg,_Acc},
        BKey,KeyData,Cache,Mod,ModState,VNode) ->
-    riak_eventer:notify(riak_vnode, map_start, {FunTerm,Arg,BKey}),
     CacheVal = case FunTerm of
         {qfun,_} -> not_cached; % live funs are not cached
         {modfun,CMod,CFun} ->
-            case dict:find(BKey, Cache) of
+            case orddict:find(BKey, Cache) of
                 error -> not_cached;
                 {ok,CDict} ->
-                    case dict:find({CMod,CFun,Arg,KeyData},CDict) of
+                    case orddict:find({CMod,CFun,Arg,KeyData},CDict) of
                         error -> not_cached;
                         {ok,CVal} -> CVal
                     end
         not_cached ->
              uncached_map(BKey,Mod,ModState,FunTerm,Arg,KeyData,VNode);
         CV ->
-             riak_eventer:notify(riak_vnode,cached_map,{FunTerm,Arg,BKey}),
              {mapexec_reply, CV, self()}
     end,
-    riak_eventer:notify(riak_vnode, map_reply, {FunTerm,Arg,BKey}),
     gen_fsm:send_event(ClientPid, RetVal).
-
 uncached_map(BKey,Mod,ModState,FunTerm,Arg,KeyData,VNode) ->
     riak_eventer:notify(riak_vnode, uncached_map, {FunTerm,Arg,BKey}),
     case do_get_binary(BKey, Mod, ModState) of
             uncached_map1({error, notfound},FunTerm,Arg,BKey,KeyData,VNode);
         X -> {mapexec_error, self(), X}
     end.
-
 uncached_map1(V,FunTerm,Arg,BKey,KeyData,VNode) ->
     try
         MapVal = case FunTerm of
             {qfun,F} -> (F)(V,KeyData,Arg);
             {modfun,M,F} ->
                 MF_Res = M:F(V,KeyData,Arg),
-                gen_server2:cast(VNode,
-                                 {mapcache, BKey,{M,F,Arg,KeyData},MF_Res}),
+                gen_fsm:send_event(VNode,
+                                   {mapcache, BKey,{M,F,Arg,KeyData},MF_Res}),
                 MF_Res
         end,
         {mapexec_reply, MapVal, self()}
          {mapexec_error, self(), Reason}
     end.
 
-do_merkle(RemoteVN,RemoteMerkle,
-          _State=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
-    % called upon receipt of merkle tree from RemoteVN
-    % this function is quite computationally intensive if either side has
-    %  a significant volume of data
-    % note that the hashed values are of term {ok,BinVal} not BinVal
-    %  (this is intentional)
-    riak_eventer:notify(riak_vnode, merkle_start, RemoteVN),
-    MyMerkle = merkerl:build_tree([{K,crypto:sha(V)} || {K,{ok,V}} <- 
-                [{K,Mod:get(ModState,K)} || K <- Mod:list(ModState)]]),
+%% @private
+do_merkle(Me,RemoteVN,RemoteMerkle,ObjList,StateData) ->
+    % given a RemoteMerkle over the ObjList from RemoteVN
+    % determine which elements in ObjList we differ on
+    MyMerkle = make_merk(StateData,ObjList),
     case merkerl:diff(MyMerkle,RemoteMerkle) of
-        [] -> nop;
-        MerkDiff ->
-            RemoteResults = [{K,gen_server2:call(RemoteVN, {get_binary,K})} ||
-                                K <- MerkDiff],
-            RemoteObjs = [{K,binary_to_term(V)} || {K,{ok,V}} <- RemoteResults],
-            IsBackup = gen_server2:call(RemoteVN,is_backup_node) ,
-            [local_reconcile(K,V,Mod,ModState,IsBackup) || {K,V} <- RemoteObjs]
-    end,
-    riak_eventer:notify(riak_vnode, merkle_end, RemoteVN),
-    % the following cast is because the remote side has now finished handoff
-    gen_server2:cast(RemoteVN, {rexit, {peer, Idx}}).
-
-local_reconcile(K,RemObj,Mod,ModState,IsBackup) ->
-    FinalObj = case IsBackup of
-        true -> RemObj;
-        false ->
-            case Mod:get(ModState,K) of
-                {ok,V} -> riak_object:reconcile([binary_to_term(V),RemObj],false);
-                _ -> RemObj
-            end
-    end,
-    case Mod:put(ModState, K, term_to_binary(FinalObj, [compressed])) of
-        ok ->
-            riak_eventer:notify(riak_vnode, stored_handoff, K);
-        {error, Reason} ->
-            riak_eventer:notify(riak_vnode, stored_handoff_fail, {K,Reason})
+        [] -> gen_fsm:send_event(RemoteVN,merk_nodiff);
+        DiffList -> gen_fsm:send_event(RemoteVN,{merk_diff,Me,DiffList})
     end.
 
 %% @private
-handle_info(_Msg, State) -> {noreply, State}.
-
-%% @private
-terminate(_Reason, _State) -> ok.
-
-%% @private
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
-
-%% @private
 syntactic_put_merge(Mod, ModState, BKey, Obj1, ReqId) ->
     case Mod:get(ModState, BKey) of
         {error, notfound} -> {newobj, Obj1};
             end    
     end.
 
+%% @private
+code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
+
+%% @private
+handle_event(_Event, _StateName, StateData) ->
+    {stop,badmsg,StateData}.
+
+%% @private
+handle_sync_event(_Event, _From, _StateName, StateData) ->
+    {stop,badmsg,StateData}.
+
+%% @private
+handle_info(vnode_shutdown, _StateName, StateData) ->
+    {stop,normal,StateData}.
+
+%% @private
+terminate(_Reason, _StateName, _State) -> ok.
+

src/riak_vnode_master.erl

-%% This file is provided to you under the Apache License,
+ %% This file is provided to you under the Apache License,
 %% Version 2.0 (the "License"); you may not use this file
 %% except in compliance with the License.  You may obtain
 %% a copy of the License at
 handle_cast({vnode_map, {Partition,_Node},
              {ClientPid,QTerm,BKey,KeyData}}, State) ->
     Pid = get_vnode(Partition, State),
-    gen_server2:cast(Pid, {map, ClientPid, QTerm, BKey, KeyData}),
-    % (obligation done, now the problem of the vnodes)
+    gen_fsm:send_event(Pid, {map, ClientPid, QTerm, BKey, KeyData}),
     {noreply, State};
 handle_cast({vnode_put, {Partition,_Node},
              {FSM_pid,BKey,RObj,ReqID,FSMTime}}, State) ->
     Pid = get_vnode(Partition, State),
-    gen_server2:cast(Pid, {put, FSM_pid, BKey, RObj, ReqID, FSMTime}),
-    % (obligation done, now the problem of the vnodes)
+    gen_fsm:send_event(Pid, {put, FSM_pid, BKey, RObj, ReqID, FSMTime}),
     {noreply, State};
 handle_cast({vnode_get, {Partition,_Node},
              {FSM_pid,BKey,ReqID}}, State) ->
     Pid = get_vnode(Partition, State),
-    gen_server2:cast(Pid, {get, FSM_pid, BKey, ReqID}),
-    % (obligation done, now the problem of the vnodes)
+    gen_fsm:send_event(Pid, {get, FSM_pid, BKey, ReqID}),
     {noreply, State};
-handle_cast({vnode_merkle, {RemoteVN,Partition,Merkle}}, State) ->
+handle_cast({vnode_merkle, {RemoteVN,Partition,Merkle,ObjList}}, State) ->
     Pid = get_vnode(Partition, State),
-    gen_server2:cast(Pid, {vnode_merkle, {RemoteVN,Merkle}}),
-    % (obligation done, now the problem of the vnodes)
+    gen_fsm:send_event(Pid, {vnode_merkle, {RemoteVN,Merkle,ObjList}}),
     {noreply, State};
 handle_cast({vnode_list_bucket, {Partition,_Node},
             {FSM_pid, Bucket, ReqID}}, State) ->
     Pid = get_vnode(Partition, State),
-    gen_server2:cast(Pid, {list_bucket, FSM_pid, Bucket, ReqID}),
+    gen_fsm:send_event(Pid, {list_bucket, FSM_pid, Bucket, ReqID}),
     {noreply, State}.
 
-
 %% @private
 handle_call(all_possible_vnodes, _From, State) ->
     {reply, make_all_active(State), State};
 handle_call({vnode_del, {Partition,_Node},
              {BKey,ReqID}}, From, State) ->
     Pid = get_vnode(Partition, State),
-    gen_server2:cast(Pid, {delete, From, BKey, ReqID}),
-    % (obligation done, now the problem of the vnodes)
+    gen_fsm:send_event(Pid, {delete, From, BKey, ReqID}),
     {noreply, State}.
 
 %% @private

src/riak_vnode_sidekick.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.    
-
--module(riak_vnode_sidekick).
--behaviour(gen_fsm).
-
--export([start/2]).
--export([init/1, handle_event/3, handle_sync_event/4,
-         handle_info/3, terminate/3, code_change/4]).
--export([home/2,not_home/2]).
-
--define(TIMEOUT, 60000).     % poll time for normal operation
--define(LONGTIMEOUT, 120000). % if a vnode-merkle exchange in progress
-
--record(state, {vnode,idx,counter}).
-
-% The vnode sidekick notices when a vnode is not "home"
-% (when it is serving as an imperfect recipient of puts)
-% and if it is not home and the home node is reachable,
-% the sidekick initiates handoff.
-
-start(VNode,Idx) ->
-    gen_fsm:start(?MODULE, [VNode,Idx], []).
-init([VNode,Idx]) ->
-    StateData = #state{vnode=VNode,idx=Idx,counter=0},
-    {ok,home,StateData,?TIMEOUT}.
-
-home(timeout, StateData=#state{vnode=VNode,idx=Idx,counter=Count}) ->
-    {ok, MyRing} = riak_ring_manager:get_my_ring(),
-    Me = node(),
-    case riak_ring:index_owner(MyRing, Idx) of
-        Me ->
-            gen_server2:cast(VNode, activate),
-            case Count > 9 of
-                true ->
-                    gen_server2:cast(VNode, cache_purge),
-                    {next_state,home,StateData#state{counter=0},?TIMEOUT};
-                false ->
-                    {next_state,home,StateData#state{counter=Count+1},?TIMEOUT}
-            end;
-        _ ->
-            {next_state,not_home,StateData,1}
-    end.
-
-not_home(timeout, StateData=#state{vnode=VNode,idx=Idx}) ->
-    {ok, MyRing} = riak_ring_manager:get_my_ring(),
-    TargetNode = riak_ring:index_owner(MyRing, Idx),
-    Me = node(),
-    case TargetNode of % just in case we took ownership
-        Me -> {next_state,home,StateData,?TIMEOUT}; 
-        _ -> 
-            case net_adm:ping(TargetNode) of
-                pang -> {next_state,not_home,StateData,?TIMEOUT};
-                pong ->
-                    ObjList = gen_server2:call(VNode, list, 60000),
-                    case ObjList of
-                        [] ->
-                            gen_server2:cast(VNode, {rexit, "sidekick"}),
-                            {next_state,not_home,StateData,?LONGTIMEOUT};
-                        _ -> 
-                            gen_server2:cast(VNode, deactivate),
-                            Merk = make_merk(VNode, Idx, ObjList),
-                            gen_server:cast({riak_vnode_master, TargetNode},
-                                            {vnode_merkle, {VNode,Idx,Merk}}),
-                            {next_state,not_home,StateData,?LONGTIMEOUT}
-                    end
-            end
-    end.
-
-make_merk(VNode, Idx, ObjList) ->
-    riak_eventer:notify(riak_vnode_sidekick, merkle_prep, Idx),
-    merkerl:build_tree([{K,crypto:sha(V)} || {K,{ok,V}} <- 
-                        [{K,gen_server2:call(VNode, {get_binary, K})} ||
-                           K <- ObjList]]).
-
-%% @private
-code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
-
-%% @private
-handle_event(_Event, _StateName, StateData) ->
-    {stop,badmsg,StateData}.
-
-%% @private
-handle_sync_event(_Event, _From, _StateName, StateData) ->
-    {stop,badmsg,StateData}.
-
-%% @private
-handle_info(vnode_shutdown, _StateName, StateData) ->
-    {stop,normal,StateData}.
-
-%% @private
-terminate(_Reason, _StateName, _State) -> ok.
-
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.