Commits

Andy Gross  committed e3c4199 Merge

merge handoff changes

  • Participants
  • Parent commits bc2e38f, 7e40062

Comments (0)

Files changed (14)

File apps/riak/ebin/riak.app

   {modules, [
              bloom,
              chash,
+             gen_nb_server,
              gen_server2,
              jaywalker_resource,
              jiak,
              riak_app,
              riak_backup,
              riak_bucket,
-			 riak_cache_backend,
+	     riak_cache_backend,
              riak_claim,
              riak_client,
              riak_connect,
              riak_fs_backend,
              riak_gb_trees_backend,
              riak_get_fsm,
+             riak_handoff_listener,
+             riak_handoff_receiver,
+             riak_handoff_sender,
              riak_keys_fsm,
              riak_local_logger,
              riak_map_executor,

File apps/riak/rebar.config

 {erl_opts, [debug_info, fail_on_warning]}.
 {lib_dirs, [".."]}.
 
-{erl_first_files, ["src/gen_server2.erl"]}.
+{erl_first_files, ["src/gen_nb_server", "src/gen_server2.erl"]}.

File apps/riak/src/gen_nb_server.erl

+%% Copyright (c) 2009 Hypothetical Labs, Inc.
+ 
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+ 
+-module(gen_nb_server).
+ 
+-author('kevin@hypotheticalabs.com').
+ 
+-behaviour(gen_server).
+ 
+%% API
+-export([start_link/4]).
+ 
+%% Behavior callbacks
+-export([behaviour_info/1]).
+ 
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+ 
+-define(SERVER, ?MODULE).
+ 
+-record(state, {cb,
+                sock,
+                server_state}).
+ 
+%% @hidden
+behaviour_info(callbacks) ->
+  [{init, 1},
+   {handle_call, 3},
+   {handle_cast, 2},
+   {handle_info, 2},
+   {terminate, 2},
+   {sock_opts, 0},
+   {new_connection, 2}];
+ 
+behaviour_info(_) ->
+  undefined.
+ 
+%% @spec start_link(CallbackModule, IpAddr, Port, InitParams) -> Result
+%% CallbackModule = atom()
+%% IpAddr = string()
+%% Port = integer()
+%% InitParams = [any()]
+%% Result = {ok, pid()} | {error, any()}
+%% @doc Start server listening on IpAddr:Port
+start_link(CallbackModule, IpAddr, Port, InitParams) ->
+  gen_server:start_link(?MODULE, [CallbackModule, IpAddr, Port, InitParams], []).
+ 
+%% @hidden
+init([CallbackModule, IpAddr, Port, InitParams]) ->
+  case CallbackModule:init(InitParams) of
+    {ok, ServerState} ->
+      case listen_on(CallbackModule, IpAddr, Port) of
+        {ok, Sock} ->
+          {ok, #state{cb=CallbackModule, sock=Sock, server_state=ServerState}};
+        Error ->
+          CallbackModule:terminate(Error, ServerState),
+          Error
+      end;
+    Err ->
+      Err
+  end.
+ 
+%% @hidden
+handle_call(Request, From, #state{cb=Callback, server_state=ServerState}=State) ->
+  case Callback:handle_call(Request, From, ServerState) of
+    {reply, Reply, NewServerState} ->
+      {reply, Reply, State#state{server_state=NewServerState}};
+    {reply, Reply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+      {reply, Reply, State#state{server_state=NewServerState}, Arg};
+    {noreply, NewServerState} ->
+      {noreply, State#state{server_state=NewServerState}};
+    {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+      {noreply, State#state{server_state=NewServerState}, Arg};
+    {stop, Reason, NewServerState} ->
+      {stop, Reason, State#state{server_state=NewServerState}};
+    {stop, Reason, Reply, NewServerState} ->
+      {stop, Reason, Reply, State#state{server_state=NewServerState}}
+  end.
+ 
+%% @hidden
+handle_cast(Msg, #state{cb=Callback, server_state=ServerState}=State) ->
+  case Callback:handle_cast(Msg, ServerState) of
+    {noreply, NewServerState} ->
+      {noreply, State#state{server_state=NewServerState}};
+    {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+      {noreply, State#state{server_state=NewServerState}, Arg};
+    {stop, Reason, NewServerState} ->
+      {stop, Reason, State#state{server_state=NewServerState}}
+  end.
+ 
+%% @hidden
+handle_info({inet_async, ListSock, _Ref, {ok, CliSocket}}, #state{cb=Callback, server_state=ServerState}=State) ->
+  inet_db:register_socket(CliSocket, inet_tcp),
+  case Callback:new_connection(CliSocket, ServerState) of
+    {ok, NewServerState} ->
+      prim_inet:async_accept(ListSock, -1),
+      {noreply, State#state{server_state=NewServerState}};
+    {stop, Reason, NewServerState} ->
+      {stop, Reason, State#state{server_state=NewServerState}}
+  end;
+ 
+handle_info(Info, #state{cb=Callback, server_state=ServerState}=State) ->
+  case Callback:handle_info(Info, ServerState) of
+    {noreply, NewServerState} ->
+      {noreply, State#state{server_state=NewServerState}};
+    {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+      {noreply, State#state{server_state=NewServerState}, Arg};
+    {stop, Reason, NewServerState} ->
+      {stop, Reason, State#state{server_state=NewServerState}}
+  end.
+ 
+%% @hidden
+terminate(Reason, #state{cb=Callback, sock=Sock, server_state=ServerState}) ->
+  gen_tcp:close(Sock),
+  Callback:terminate(Reason, ServerState),
+  ok.
+ 
+%% @hidden
+code_change(_OldVsn, State, _Extra) ->
+  {ok, State}.
+ 
+%% Internal functions
+ 
+%% @hidden
+%% @spec listen_on(CallbackModule, IpAddr, Port) -> Result
+%% CallbackModule = atom()
+%% IpAddr = string()
+%% Port = integer()
+%% Result = {ok, port()} | {error, any()}
+listen_on(CallbackModule, IpAddr, Port) ->
+  SockOpts = [{ip, convert(IpAddr)}|CallbackModule:sock_opts()],
+  case gen_tcp:listen(Port, SockOpts) of
+    {ok, LSock} ->
+      {ok, _Ref} = prim_inet:async_accept(LSock, -1),
+      {ok, LSock};
+    Err ->
+      Err
+  end.
+ 
+%% @hidden
+%% @spec convert_addr(Addr) -> Result
+%% Addr = string()
+%% Result = {integer(), integer(), integer(), integer()}
+%% @doc Converts text IP addresses "0.0.0.0" to tuples {0, 0, 0, 0}
+convert(Addr) ->
+  T = string:tokens(Addr, "."),
+  list_to_tuple([list_to_integer(X) || X <- T]).

File apps/riak/src/riak_connect.erl

     end.
 
 ensure_vnodes_started(Ring) ->
-    VNodes2Start = case length(riak_ring:all_members(Ring)) of
-       1 -> riak_ring:my_indices(Ring);
-       _ -> [riak_ring:random_other_index(Ring)| riak_ring:my_indices(Ring)]
-    end,
-    [begin
-        gen_server:cast({riak_vnode_master, node()}, {start_vnode, I}) 
-    end|| I <- VNodes2Start].
+    AllMembers = riak_ring:all_members(Ring),
+    VNodes2Start = 
+        case {length(AllMembers), hd(AllMembers) =:= node()} of
+            {1, true} -> riak_ring:my_indices(Ring);
+            _ -> 
+                {ok, Excl} = gen_server:call(riak_vnode_master, get_exclusions),
+                case riak_ring:random_other_index(Ring, Excl) of
+                    no_indices ->
+                        case length(Excl) =:= riak_ring:num_partitions(Ring) of
+                            true ->
+                                exit;
+                            false ->
+                                riak_ring:my_indices(Ring)
+                        end;
+                    RO ->
+                        [RO | riak_ring:my_indices(Ring)]
+                end
+        end,
+    case VNodes2Start of
+        exit ->
+            riak:stop("node removal completed, exiting.");
+        _ ->
+            [begin
+                 gen_server:cast({riak_vnode_master, node()}, {start_vnode, I}) 
+             end|| I <- VNodes2Start]
+    end.
 
 remove_from_cluster(ExitingNode) ->
     % Set the remote node to stop claiming.

File apps/riak/src/riak_dets_backend.erl

 -module(riak_dets_backend).
 
 -include_lib("eunit/include/eunit.hrl").
--export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2,fold/3, is_empty/1, drop/1]).
 
 % @type state() = term().
--record(state, {table}).
+-record(state, {table, path}).
 
 % @spec start(Partition :: integer(), Config :: proplist()) ->
 %                        {ok, state()} | {{error, Reason :: term()}, state()}
                                    {min_no_slots, 8192},
                                    {max_no_slots, 16777216}]) of
         {ok, DetsName} ->
-            {ok, #state{table=DetsName}};
+            ok = dets:sync(DetsName),
+            {ok, #state{table=DetsName, path=TablePath}};
         {error, Reason}  ->
             riak:stop("dets:open_file failed"),
             {error, Reason}
 % @spec stop(state()) -> ok | {error, Reason :: term()}
 stop(#state{table=T}) -> dets:close(T).
 
-% get(state(), Key :: binary()) ->
+% get(state(), riak_object:bkey()) ->
 %   {ok, Val :: binary()} | {error, Reason :: term()}
 % key must be 160b
 get(#state{table=T}, BKey) ->
         {error, Err} -> {error, Err}
     end.
 
-% put(state(), Key :: binary(), Val :: binary()) ->
+% put(state(), riak_object:bkey(), Val :: binary()) ->
 %   ok | {error, Reason :: term()}
 % key must be 160b
 put(#state{table=T},BKey,Val) -> dets:insert(T, {BKey,Val}).
 
-% delete(state(), Key :: binary()) ->
+% delete(state(), riak_object:bkey()) ->
 %   ok | {error, Reason :: term()}
 % key must be 160b
 delete(#state{table=T}, BKey) -> dets:delete(T, BKey).
 
-% list(state()) -> [Key :: binary()]
+% list(state()) -> [riak_object:bkey()]
 list(#state{table=T}) ->
     MList = dets:match(T,{'$1','_'}),
     list(MList,[]).
     MList = dets:match(T,MatchSpec),
     list(MList,[]).
 
+fold(#state{table=T}, Fun0, Acc) -> 
+    Fun = fun({{B,K}, V}, AccIn) -> Fun0({B,K}, V, AccIn) end,
+    dets:foldl(Fun, Acc, T).
+
+is_empty(#state{table=T}) ->
+    ok = dets:sync(T),
+    dets:info(T, size) =:= 0.
+
+drop(#state{table=T, path=P}) ->
+    %ok = dets:delete_all_objects(T),
+    ok = dets:close(T),
+    ok = file:delete(P).
+
 %%
 %% Test
 %%

File apps/riak/src/riak_ets_backend.erl

     true = ets:delete(State#state.t),
     ok.
 
-% get(state(), Key :: binary()) ->
+% get(state(), riak_object:bkey()) ->
 %   {ok, Val :: binary()} | {error, Reason :: term()}
 % key must be 160b
 get(SrvRef, BKey) -> gen_server:call(SrvRef,{get,BKey}).
     true = ets:insert(State#state.t, {BKey,Val}),
     ok.
 
-% delete(state(), Key :: binary()) ->
+% delete(state(), riak_object:bkey()) ->
 %   ok | {error, Reason :: term()}
 % key must be 160b
 delete(SrvRef, BKey) -> gen_server:call(SrvRef,{delete,BKey}).
     true = ets:delete(State#state.t, BKey),
     ok.
 
-% list(state()) -> [Key :: binary()]
+% list(state()) -> [riak_object:bkey()]
 list(SrvRef) -> gen_server:call(SrvRef,list).
 srv_list(State) ->
     MList = ets:match(State#state.t,{'$1','_'}),

File apps/riak/src/riak_handoff_listener.erl

+-module(riak_handoff_listener).
+
+-behavior(gen_nb_server).
+
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+-export([sock_opts/0, new_connection/2]).
+-record(state, {portnum}).
+
+start_link() ->
+    PortNum = 
+        case application:get_env(riak, riak_handoff_port) of
+            undefined -> 8099;
+            {ok, N} -> N
+        end,
+    IpAddr = 
+        case application:get_env(riak, riak_handoff_ip) of
+            undefined -> "0.0.0.0";
+            {ok, IP} -> IP
+        end,
+    gen_nb_server:start_link(?MODULE, IpAddr, PortNum, [PortNum]).
+
+init([PortNum]) -> 
+    register(?MODULE, self()),
+    {ok, #state{portnum=PortNum}}.
+
+sock_opts() -> [binary, {packet, 4}, {reuseaddr, true}, {backlog, 64}].
+
+handle_call(handoff_port, _From, State=#state{portnum=P}) -> 
+    {reply, {ok, P}, State}.
+
+handle_cast(_Msg, State) -> {noreply, State}.
+
+handle_info(_Info, State) -> {noreply, State}.
+
+terminate(_Reason, _State) -> ok.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+new_connection(Socket, State) ->
+    {ok, Pid} = riak_handoff_receiver:start_link(Socket),
+    gen_tcp:controlling_process(Socket, Pid),
+    {ok, State}.
+

File apps/riak/src/riak_handoff_receiver.erl

+-module(riak_handoff_receiver).
+
+-include("riakserver_pb.hrl").
+-behaviour(gen_server2).
+
+-export([start_link/1]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {sock, partition, vnode}).
+
+start_link(Socket) ->
+    gen_server2:start_link(?MODULE, [Socket], []).
+
+init([Socket]) -> 
+    inet:setopts(Socket, [{active, once}, {packet, 4}, {header, 1}]),
+    {ok, #state{sock=Socket}}.
+
+handle_info({tcp_closed, Socket}, State=#state{sock=Socket}) ->
+    {stop, normal, State};
+handle_info({tcp, _Sock, Data}, State=#state{sock=Socket}) ->
+    [MsgType|MsgData] = Data,
+    NewState = process_message(MsgType,MsgData,State),
+    inet:setopts(Socket, [{active, once}]),
+    {noreply, NewState}.
+
+process_message(0, MsgData, State) ->
+    <<Partition:160/integer>> = MsgData,
+    {ok, VNode} = gen_server2:call(riak_vnode_master, {get_vnode, Partition}, 60000),  
+    State#state{partition=Partition, vnode=VNode};
+process_message(1, MsgData, State=#state{vnode=VNode}) ->
+    % header of 1 is a riakobject_pb
+    RO_PB = riakserver_pb:decode_riakobject_pb(zlib:unzip(MsgData)),
+    BKey = {RO_PB#riakobject_pb.bucket,RO_PB#riakobject_pb.key},
+    ok = gen_fsm:sync_send_all_state_event(VNode, {diffobj, {BKey, RO_PB#riakobject_pb.val}}, 60000),
+    State;
+process_message(2, _MsgData, State=#state{sock=Socket}) ->
+    ok = gen_tcp:send(Socket, <<2:8,"sync">>),
+    State.
+
+handle_call(_Request, _From, State) -> {reply, ok, State}.
+
+handle_cast(_Msg, State) -> {noreply, State}.
+
+terminate(_Reason, _State) -> ok.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+

File apps/riak/src/riak_handoff_sender.erl

+-module(riak_handoff_sender).
+-export([start_link/3]).
+-include("riakserver_pb.hrl").
+
+start_link(TargetNode, Partition, BKeyList) ->
+    case global:set_lock({handoff_token, {node(), Partition}}, [node()], 0) of
+        true ->
+            Self = self(),
+            {ok, spawn_link(fun() -> start_fold(TargetNode, Partition, BKeyList, Self) end)};
+        false ->
+            {error, locked}
+    end.
+            
+
+start_fold(TargetNode, Partition, BKeyList, ParentPid) ->
+
+    [_Name,Host] = string:tokens(atom_to_list(TargetNode), "@"),
+    {ok, Port} = get_handoff_port(TargetNode),
+    {ok, Socket} = gen_tcp:connect(Host, Port, 
+                                   [binary, {packet, 4}, {header,1}, {active, once}], 15000),
+    M = <<0:8,Partition:160/integer>>,
+    ok = gen_tcp:send(Socket, M),
+    case BKeyList of
+        all ->
+            gen_server2:call(riak_vnode_master, 
+                     {fold, {Partition, fun folder/3, {Socket, ParentPid, []}}},
+                     infinity);
+        _ ->
+            inner_fold({Socket,ParentPid,[]},BKeyList)
+    end,
+    gen_fsm:send_event(ParentPid, handoff_complete).
+
+inner_fold(_FoldArg,[]) -> ok;
+inner_fold(FoldArg,[{B,K}|Tail]) ->
+    {_Socket,ParentPid,_Count} = FoldArg,
+    case gen_fsm:sync_send_event(ParentPid, {get_binary, {B,K}}, infinity) of
+        {ok, V} ->
+            inner_fold(folder({B,K},V,FoldArg),Tail);
+        _ ->
+            inner_fold(FoldArg,Tail)
+    end.
+            
+folder({B,K}, V, {Socket, ParentPid, []}) ->
+    gen_tcp:controlling_process(Socket, self()),
+    visit_item({B,K}, V, {Socket, ParentPid, 0});
+folder({B,K}, V, AccIn) ->
+    visit_item({B,K}, V, AccIn).
+
+visit_item({B,K}, V, {Socket, ParentPid, 100}) ->
+    M = <<2:8,"sync">>,
+    ok = gen_tcp:send(Socket, M),
+    inet:setopts(Socket, [{active, false}]),
+    {ok,[2|<<"sync">>]} = gen_tcp:recv(Socket, 0),
+    inet:setopts(Socket, [{active, once}]),
+    visit_item({B,K}, V, {Socket, ParentPid, 0});
+visit_item({B,K}, V, {Socket, ParentPid, Acc}) ->
+    D = zlib:zip(riakserver_pb:encode_riakobject_pb(
+                   #riakobject_pb{bucket=B, key=K, val=V})),
+    M = <<1:8,D/binary>>,
+    ok = gen_tcp:send(Socket, M),
+    {Socket, ParentPid, Acc+1}.
+    
+
+get_handoff_port(Node) when is_atom(Node) ->
+    gen_server2:call({riak_handoff_listener, Node}, handoff_port).
+
+
+
+
+
+
+

File apps/riak/src/riak_ring.erl

 	 owner_node/1,all_members/1,num_partitions/1,all_owners/1,
          transfer_node/3, rename_node/3, reconcile/2, my_indices/1,
 	 index_owner/2,diff_nodes/2,random_node/1, random_other_index/1,
+         random_other_index/2,
          get_meta/2, update_meta/3, equal_rings/2]).	 
 
 % @type riak_ring(). The opaque data type used for partition ownership.
         _ -> lists:nth(crypto:rand_uniform(1, length(L)+1), L)
     end.
 
+random_other_index(State, Exclude) when is_list(Exclude) ->
+    L = [I || {I, Owner} <- ?MODULE:all_owners(State),
+              Owner =/= node(),
+              not lists:member(I, Exclude)],
+    case L of
+        [] -> no_indices;
+        _ -> lists:nth(crypto:rand_uniform(1, length(L)+1), L)
+    end.
+
 % @doc Return the node that owns the given index.
 % @spec index_owner(State :: chstate(), Idx :: integer()) -> Node :: term()
 index_owner(State, Idx) ->

File apps/riak/src/riak_sup.erl

     VMaster = {riak_vnode_master,
                {riak_vnode_master, start_link, []},
                permanent, 5000, worker, [riak_vnode_master]},
+    HandoffListen = {riak_handoff_listener,
+               {riak_handoff_listener, start_link, []},
+               permanent, 5000, worker, [riak_handoff_listener]},
     RingMgr = {riak_ring_manager,
              {riak_ring_manager, start_link, []},
              permanent, 5000, worker, [riak_ring_manager]},
         Eventer, 
         VSup,                               
         ?IF(HasStorageBackend, VMaster, []),
+        HandoffListen,
         RingMgr, 
         Connect, 
         LocalLogger,

File apps/riak/src/riak_vnode.erl

 -export([start_link/1]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
--export([active/2,merk_waiting/2,waiting_diffobjs/2]).
--export([active/3,merk_waiting/3,waiting_diffobjs/3]).
+-export([active/2,active/3]).
 
 -define(TIMEOUT, 60000).
+-define(LOCK_RETRY_TIMEOUT, 10000).
 
--record(state, {idx,mapcache,mod,modstate,waiting_diffobjs}).
+-record(state, {idx,mapcache,mod,modstate,handoff_q}).
 
-start_link(Idx) ->
-    gen_fsm:start_link(?MODULE, [Idx], []).
+start_link(Idx) -> gen_fsm:start_link(?MODULE, [Idx], []).
 
 init([VNodeIndex]) ->
     Mod = riak:get_app_env(storage_backend),
     Configuration = riak:get_app_env(),
     {ok, ModState} = Mod:start(VNodeIndex, Configuration),
-    StateData0 = #state{idx=VNodeIndex,mod=Mod,modstate=ModState},
-    {next_state, StateName, StateData, Timeout} = hometest(StateData0),
-    {ok, StateName, StateData, Timeout}.
+    StateData0 = #state{idx=VNodeIndex,mod=Mod,modstate=ModState,
+                        handoff_q=not_in_handoff},
+    case hometest(StateData0) of
+        {next_state, StateName, StateData, Timeout} ->
+            {ok, StateName, StateData, Timeout};
+        {stop,normal,StateData} ->
+            {ok, ModState1} = Mod:start(VNodeIndex, Configuration),
+            {ok, active, StateData#state{mod=Mod, modstate=ModState1}, ?TIMEOUT}
+    end.
 
 %% @private
-hometest(StateData0=#state{idx=Idx}) ->
+hometest(StateData0=#state{idx=Idx,handoff_q=HQ}) ->
     StateData = StateData0#state{mapcache=orddict:new()},
     {ok, MyRing} = riak_ring_manager:get_my_ring(),
     Me = node(),
     case riak_ring:index_owner(MyRing, Idx) of
         Me ->
-            {next_state,active,StateData,?TIMEOUT};
+            {next_state,active,StateData#state{handoff_q=not_in_handoff},
+             ?TIMEOUT};
         TargetNode ->
             case net_adm:ping(TargetNode) of
-                pang -> {next_state,active,StateData,?TIMEOUT};
-                pong -> build_and_send_merkle(TargetNode, StateData)
+                pang -> 
+                    {next_state,active,StateData,?TIMEOUT};
+                pong -> 
+                    case HQ of
+                        not_in_handoff ->
+                            do_handoff(TargetNode, StateData);
+                        _ ->
+                            do_list_handoff(TargetNode, HQ, StateData)
+                    end
             end
     end.
 
 %% @private
-build_and_send_merkle(TargetNode,
-                      StateData=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
-    ObjList = Mod:list(ModState),
-    Merk = make_merk(StateData, ObjList),
-    gen_server:cast({riak_vnode_master, TargetNode},
-                    {vnode_merkle, {self(),Idx,Merk,ObjList}}),
-    {next_state,merk_waiting,
-     StateData#state{waiting_diffobjs=ObjList},?TIMEOUT}.
+do_handoff(TargetNode, StateData=#state{idx=Idx, mod=Mod, modstate=ModState}) ->
+    case Mod:is_empty(ModState) of
+        true ->
+            delete_and_exit(StateData);
+        false ->
+            {HQ,TO} = case riak_handoff_sender:start_link(TargetNode, Idx, all) of
+                {ok, _Pid} -> {[], ?TIMEOUT};
+                {error, locked} -> {not_in_handoff, ?LOCK_RETRY_TIMEOUT}
+            end,
+            {next_state,active,StateData#state{handoff_q=HQ},TO}
+    end.
 
 %% @private
-make_merk(StateData,ObjList) ->
-    Merk = merkerl:build_tree([]),
-    make_merk(StateData,ObjList,Merk).
-make_merk(_StateData,[],Merk) -> Merk;
-make_merk(StateData=#state{mod=Mod,modstate=ModState},
-          [BKey|Objlist],Merk) ->
-    V = Mod:get(ModState,BKey), % normally, V = {ok,BinObj}
-    make_merk(StateData,Objlist,merkerl:insert({BKey,erlang:phash2(V)},Merk)).
+do_list_handoff(TargetNode, BKeyList, StateData=#state{idx=Idx}) ->
+    case BKeyList of
+        [] ->
+            delete_and_exit(StateData);
+        _ ->
+            {HQ,TO} = case riak_handoff_sender:start_link(TargetNode, Idx, all) of
+                {ok, _Pid} -> {[], ?TIMEOUT};
+                {error, locked} -> {not_in_handoff, ?LOCK_RETRY_TIMEOUT}
+            end,
+            {next_state,active,StateData#state{handoff_q=HQ},TO}
+    end.
 
 %% @private
-send_diff_objs(TargetNode,DiffList,
-               StateData=#state{mod=Mod,modstate=ModState}) ->
-    % send each obj (BKey) in difflist to targetnode
-    % return a state with waiting_diffobjs populated
-    Sent = [K || K <- [send_diff_obj(TargetNode,BKey,Mod,ModState) 
-                       || BKey <- DiffList], K /= nop],
-    StateData#state{waiting_diffobjs=Sent}.
-send_diff_obj(TargetNode,BKey,Mod,ModState) ->
-    case Mod:get(ModState,BKey) of
-        {ok,BinObj} ->
-            gen_fsm:send_event(TargetNode, {diffobj,{BKey,BinObj,self()}}),
-            BKey;
-        _ ->
-            nop
-    end.
-
-%%%%%%%%%% in merk_waiting state, we have sent a merkle tree to the
-%%%%%%%%%% home vnode, and are waiting for a list of different objects
-merk_waiting({get_binary,_BKey}, _From, StateData) ->
-    {reply,{error, wrong_state},active,StateData,?TIMEOUT};
-merk_waiting(list, _From, StateData) ->
-    {reply,{error, wrong_state},active,StateData,?TIMEOUT}.
-merk_waiting(timeout, StateData) ->
-    % didn't get a response to our merkle tree, switch back to active mode
-    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
-merk_waiting(merk_nodiff, StateData0=#state{waiting_diffobjs=WD,
-                                            mod=Mod,modstate=ModState}) ->
-    % the far side is home and has all of the objects, cleanup time
-    StateData = StateData0#state{waiting_diffobjs=[]},
-    [Mod:delete(ModState, BKey) || BKey <- WD],
-    case Mod:list(ModState) of
-        [] -> 
-            Mod:stop(ModState),
-            {stop,normal,StateData};
-        _ ->
-            hometest(StateData)
-    end;
-merk_waiting({merk_diff,TargetVNode,DiffList}, StateData0) ->
-    StateData = send_diff_objs(TargetVNode,DiffList,StateData0),
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
-merk_waiting({diffobj,{_BKey,_BinObj,_RemNode}}, StateData) ->
-    hometest(StateData);
-merk_waiting({map, ClientPid, QTerm, BKey, KeyData},
-             StateData=#state{mapcache=Cache,mod=Mod,modstate=ModState}) ->
-    do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,self()),
-    {next_state,merk_waiting,StateData,?TIMEOUT};
-merk_waiting({put, FSM_pid, _BKey, _RObj, ReqID, _FSMTime},
-             StateData=#state{idx=Idx}) ->
-    gen_fsm:send_event(FSM_pid, {fail, Idx, ReqID}),
-    {next_state,merk_waiting,StateData,?TIMEOUT};
-merk_waiting({get, FSM_pid, BKey, ReqID}, StateData) ->
-    do_get(FSM_pid, BKey, ReqID, StateData),
-    {next_state,merk_waiting,StateData,?TIMEOUT};
-merk_waiting({vnode_merkle, {_RemoteVN,_Merkle,_ObjList}}, StateData) ->
-    hometest(StateData);
-merk_waiting({list_bucket, FSM_pid, Bucket, ReqID},
-             StateData=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
-    do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx),
-    {next_state,merk_waiting,StateData,?TIMEOUT};
-merk_waiting({delete, From, BKey, ReqID}, StateData=#state{mapcache=Cache}) ->
-    do_delete(From, BKey, ReqID, StateData),
-    {next_state,
-     merk_waiting,StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
-merk_waiting(_OtherMessage,StateData) ->
-    {next_state,merk_waiting,StateData,?TIMEOUT}.
-
-%%%%%%%%%% in waiting_diffobjs state, we have sent a list of diff objs to the
-%%%%%%%%%% home vnode, and are waiting to hear that they've been handled
-waiting_diffobjs({get_binary,_BKey}, _From, StateData) ->
-    {reply,{error, wrong_state},active,StateData,?TIMEOUT};
-waiting_diffobjs(list, _From, StateData) ->
-    {reply,{error, wrong_state},active,StateData,?TIMEOUT}.
-waiting_diffobjs(timeout, StateData) ->
-    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
-waiting_diffobjs({resolved_diffobj,K},
-         StateData0=#state{waiting_diffobjs=WD0,mod=Mod,modstate=ModState})->
-    WD = lists:delete(K,WD0),
-    Mod:delete(ModState, K),
-    StateData = StateData0#state{waiting_diffobjs=WD},
-    case WD of
-        [] -> % resolved all the intended diff objects
-            hometest(StateData);
-        _ -> % some left, keep waiting
-            {next_state,waiting_diffobjs,StateData,?TIMEOUT}
-    end;
-waiting_diffobjs(merk_nodiff, StateData) ->
-    % got merkle reply at a very strange time
-    % jump into active mode to handle some requests before trying again
-    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
-waiting_diffobjs({merk_diff,_TargetNode,_DiffList}, StateData) ->
-    % got merkle reply at a very strange time
-    % jump into active mode to handle some requests before trying again
-    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
-waiting_diffobjs({diffobj,{_BKey,_BinObj,_RemNode}}, StateData) ->
-    hometest(StateData);
-waiting_diffobjs({map, ClientPid, QTerm, BKey, KeyData},
-                 StateData=#state{mapcache=Cache,mod=Mod,modstate=ModState}) ->
-    do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,self()),
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
-waiting_diffobjs({put, FSM_pid, _BKey, _RObj, ReqID, _FSMTime},
-                 StateData=#state{idx=Idx}) ->
-    gen_fsm:send_event(FSM_pid, {fail, Idx, ReqID}),
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
-waiting_diffobjs({get, FSM_pid, BKey, ReqID}, StateData) ->
-    do_get(FSM_pid, BKey, ReqID, StateData),
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
-waiting_diffobjs({vnode_merkle, {_RemoteVN,_Merkle,_ObjList}}, StateData) ->
-    hometest(StateData);
-waiting_diffobjs({list_bucket, FSM_pid, Bucket, ReqID},
-                 StateData=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
-    do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx),
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
-waiting_diffobjs({delete, From, BKey, ReqID},
-                 StateData=#state{mapcache=Cache}) ->
-    do_delete(From, BKey, ReqID, StateData),
-    {next_state,waiting_diffobjs,
-     StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
-waiting_diffobjs(_OtherMessage,StateData) ->
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT}.
+delete_and_exit(StateData=#state{idx=Idx, mod=Mod, modstate=ModState}) ->
+    ok = Mod:drop(ModState),
+    gen_server:cast(riak_vnode_master, {add_exclusion, Idx}),
+    {stop, normal, StateData}.
 
 %%%%%%%%%% in active state, we process normal client requests
 active({get_binary,BKey}, _From, StateData=#state{mod=Mod,modstate=ModState}) ->
     {reply,{ok, Mod:list(ModState)},active,StateData,?TIMEOUT}.
 active(timeout, StateData) ->
     hometest(StateData);
-active({diffobj,{BKey,BinObj,FromVN}}, StateData) ->
+active({diffobj,{BKey,BinObj}}, StateData) ->
     case do_diffobj_put(BKey, binary_to_term(BinObj), StateData) of
         ok ->
-            gen_fsm:send_event(FromVN,{resolved_diffobj,BKey});
+            nop;
         {error, Err} ->
             error_logger:error_msg("Error storing handoff obj: ~p~n", [Err])
     end,
        StateData=#state{mapcache=Cache,mod=Mod,modstate=ModState}) ->
     do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,self()),
     {next_state,active,StateData,?TIMEOUT};
+active(handoff_complete, StateData=#state{idx=Idx}) ->
+    global:del_lock({handoff_token, {node(), Idx}}),
+    hometest(StateData);
 active({put, FSM_pid, BKey, RObj, ReqID, FSMTime},
-       StateData=#state{idx=Idx,mapcache=Cache}) ->
+       StateData=#state{idx=Idx,mapcache=Cache,handoff_q=HQ0}) ->
+    HQ = case HQ0 of
+        not_in_handoff -> not_in_handoff;
+        _ -> [BKey|HQ0]
+    end,
     gen_fsm:send_event(FSM_pid, {w, Idx, ReqID}),
     do_put(FSM_pid, BKey, RObj, ReqID, FSMTime, StateData),
     {next_state,
-     active,StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
+     active,StateData#state{mapcache=orddict:erase(BKey,Cache),
+                            handoff_q=HQ},?TIMEOUT};
 active({get, FSM_pid, BKey, ReqID}, StateData) ->
     do_get(FSM_pid, BKey, ReqID, StateData),
     {next_state,active,StateData,?TIMEOUT};
-active({vnode_merkle, {RemoteVN,Merkle,ObjList}}, StateData) ->
-    Me = self(),
-    spawn(fun() -> do_merkle(Me,RemoteVN,Merkle,ObjList,StateData) end),
-    {next_state,active,StateData,?TIMEOUT};
 active({list_bucket, FSM_pid, Bucket, ReqID},
        StateData=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
     do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx),
     end,
     KeyCache = orddict:store({M,F,Arg,KeyData},MF_Res,KeyCache0),
     {next_state,active,
-     StateData#state{mapcache=orddict:store(BKey,KeyCache,Cache)},?TIMEOUT};
-active(merk_nodiff, StateData) ->
-    hometest(StateData);
-active({merk_diff,_TargetNode,_DiffList}, StateData) ->
-    hometest(StateData);
-active({resolved_diffobj,_K}, StateData) ->
-    hometest(StateData).
+     StateData#state{mapcache=orddict:store(BKey,KeyCache,Cache)},?TIMEOUT}.
 
 %% @private
 do_get(FSM_pid, BKey, ReqID,
     end.
 
 %% @private
-do_merkle(Me,RemoteVN,RemoteMerkle,ObjList,StateData) ->
-    % given a RemoteMerkle over the ObjList from RemoteVN
-    % determine which elements in ObjList we differ on
-    MyMerkle = make_merk(StateData,ObjList),
-    case merkerl:diff(MyMerkle,RemoteMerkle) of
-        [] -> gen_fsm:send_event(RemoteVN,merk_nodiff);
-        DiffList -> gen_fsm:send_event(RemoteVN,{merk_diff,Me,DiffList})
-    end.
-
-%% @private
 syntactic_put_merge(Mod, ModState, BKey, Obj1, ReqId) ->
     case Mod:get(ModState, BKey) of
         {error, notfound} -> {newobj, Obj1};
     end.
 
 %% @private
-get_merkle(_State=#state{mod=Mod,modstate=ModState}) ->
-    KeyList = Mod:list(ModState),
-    Merk0 = merkerl:build_tree([]),
-    get_merk(Mod,ModState,KeyList,Merk0).
-%% @private
-get_merk(_Mod,_ModState,[],Merk) -> Merk;
-get_merk(Mod,ModState,[BKey|KeyList],Merk) ->
-    V = Mod:get(ModState,BKey), % normally, V = {ok,BinObj}
-    get_merk(Mod,ModState,KeyList,merkerl:insert({BKey,erlang:phash2(V)},Merk)).
-
-%% @private
 get_vclocks(KeyList,_State=#state{mod=Mod,modstate=ModState}) ->
     [{BKey, get_vclock(BKey,Mod,ModState)} || BKey <- KeyList].
 %% @private
     end.
 
 %% @private
+do_fold(Fun, Acc0, _State=#state{mod=Mod, modstate=ModState}) ->
+    Mod:fold(ModState, Fun, Acc0).
+
+%% @private
 code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
 
 %% @private
-handle_event({get_merkle, From}, StateName, State) ->
-    gen_server2:reply(From, get_merkle(State)),
-    {next_state, StateName, State, ?TIMEOUT};
 handle_event({get_vclocks, From, KeyList}, StateName, State) ->
     gen_server2:reply(From, get_vclocks(KeyList, State)),
+    {next_state, StateName, State, ?TIMEOUT};
+handle_event({fold, {Fun, Acc0, From}}, StateName, State) ->
+    gen_server2:reply(From, do_fold(Fun, Acc0, State)),
     {next_state, StateName, State, ?TIMEOUT}.
 
 %% @private
-handle_sync_event(_Event, _From, _StateName, StateData) ->
+handle_sync_event({diffobj,{BKey,BinObj}}, _From, StateName, StateData) ->
+    case do_diffobj_put(BKey, binary_to_term(BinObj), StateData) of
+        ok ->
+            {reply, ok, StateName, StateData, ?TIMEOUT};
+        {error, Err} ->
+            error_logger:error_msg("Error storing handoff obj: ~p~n", [Err]),
+            {reply, {error, Err}, StateName, StateData, ?TIMEOUT}                   
+    end;
+handle_sync_event(_Even, _From, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 
 %% @private

File apps/riak/src/riak_vnode_master.erl

 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 	 terminate/2, code_change/3]).
 -record(idxrec, {idx, pid, monref}).
--record(state, {idxtab}).
+-record(state, {idxtab, excl=ordsets:new()}).
 
 start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 %% @private
 init([]) -> {ok, #state{idxtab=ets:new(riak_vnode_idx,[{keypos,2}])}}.
 
+
 %% @private
-handle_cast({start_vnode, Partition}, State) ->
+handle_cast({start_vnode, Partition}, State=#state{excl=Excl}) ->
     _Pid = get_vnode(Partition, State),
-    {noreply, State};
+    {noreply, State#state{excl=ordsets:del_element(Partition, Excl)}};
 handle_cast({vnode_map, {Partition,_Node},
              {ClientPid,QTerm,BKey,KeyData}}, State) ->
     Pid = get_vnode(Partition, State),
             {FSM_pid, Bucket, ReqID}}, State) ->
     Pid = get_vnode(Partition, State),
     gen_fsm:send_event(Pid, {list_bucket, FSM_pid, Bucket, ReqID}),
-    {noreply, State}.
+    {noreply, State};
+handle_cast({add_exclusion, Partition}, State=#state{excl=Excl}) ->
+    {noreply, State#state{excl=ordsets:add_element(Partition, Excl)}}.
 
 %% @private
 handle_call(all_possible_vnodes, _From, State) ->
     Pid = get_vnode(Partition, State),
     spawn(fun() -> gen_fsm:send_all_state_event(
                      Pid,{get_vclocks,From,KeyList}) end),
-    {noreply, State}.
-
+    {noreply, State};
+handle_call({fold, {Partition, Fun, Acc0}}, From, State) ->
+    Pid = get_vnode(Partition, State),
+    spawn(
+      fun() -> gen_fsm:send_all_state_event(Pid, {fold, {Fun,Acc0,From}}) end),
+    {noreply, State};
+handle_call({get_vnode, Partition}, _From, State) ->
+    {reply, {ok, get_vnode(Partition, State)}, State};
+handle_call(get_exclusions, _From, State=#state{excl=Excl}) ->
+    {reply, {ok, ordsets:to_list(Excl)}, State}.
 %% @private
 handle_info({'DOWN', MonRef, process, _P, _I}, State) ->
     delmon(MonRef, State),

File apps/riak/src/riakserver.proto

-message Helo {
-        required int32 version = 1;
-}
+message RiakObject_PB {
+        required bytes bucket = 1;
+        required bytes key = 2;
+        required bytes val = 3;
+}
+