Commits

Andy Gross  committed c30af4a Merge

merge from 0.7.1

  • Participants
  • Parent commits 9427629, d9c0fe9

Comments (0)

Files changed (34)

 8790e3560defe0a12d31ffde5ef96c6946ac67a6 riak-0.7
 8790e3560defe0a12d31ffde5ef96c6946ac67a6 riak-0.7
 7b2af3b4c96854dab8132acc299d3f24148fc141 riak-0.7
+82100e18113bf79f5c790e0f4858b0f33854fced riak-0.7.1
+82100e18113bf79f5c790e0f4858b0f33854fced riak-0.7.1
+b6515e496da1cc5cb2203c345b6bb0ef407785b4 riak-0.7.1

File apps/mochiweb/src/mochiweb.app

 {application, mochiweb,
  [{description, "MochiMedia Web Server"},
-  {vsn, "0.9"},
+  {vsn, "0.1"},
   {modules, [
         mochihex,
         mochijson,

File apps/riak/ebin/riak.app

   {modules, [
              bloom,
              chash,
+             gen_nb_server,
              gen_server2,
              jaywalker_resource,
              jiak,
              riak_fs_backend,
              riak_gb_trees_backend,
              riak_get_fsm,
+             riak_handoff_listener,
+             riak_handoff_receiver,
+             riak_handoff_sender,
              riak_js_manager,
              riak_js_sup,
              riak_js_vm,
              riak_mapreduce_sup,
              riak_multi_backend,
              riak_object,
-             riak_osmos_backend,
              riak_phase_proto,
              riak_phase_sup,
              riak_put_fsm,

File apps/riak/rebar.config

 {erl_opts, [debug_info, fail_on_warning]}.
 {lib_dirs, [".."]}.
 
-{erl_first_files, ["src/gen_server2.erl"]}.
+{erl_first_files, ["src/gen_nb_server.erl", "src/gen_server2.erl"]}.

File apps/riak/src/gen_nb_server.erl

+%% Copyright (c) 2009 Hypothetical Labs, Inc.
+ 
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%% of this software and associated documentation files (the "Software"), to deal
+%% in the Software without restriction, including without limitation the rights
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the Software is
+%% furnished to do so, subject to the following conditions:
+%%
+%% The above copyright notice and this permission notice shall be included in
+%% all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+%% THE SOFTWARE.
+ 
+-module(gen_nb_server).
+ 
+-author('kevin@hypotheticalabs.com').
+ 
+-behaviour(gen_server).
+ 
+%% API
+-export([start_link/4]).
+ 
+%% Behavior callbacks
+-export([behaviour_info/1]).
+ 
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+ 
+-define(SERVER, ?MODULE).
+ 
+-record(state, {cb,
+                sock,
+                server_state}).
+ 
+%% @hidden
+behaviour_info(callbacks) ->
+  [{init, 1},
+   {handle_call, 3},
+   {handle_cast, 2},
+   {handle_info, 2},
+   {terminate, 2},
+   {sock_opts, 0},
+   {new_connection, 2}];
+ 
+behaviour_info(_) ->
+  undefined.
+ 
+%% @spec start_link(CallbackModule, IpAddr, Port, InitParams) -> Result
+%% CallbackModule = atom()
+%% IpAddr = string()
+%% Port = integer()
+%% InitParams = [any()]
+%% Result = {ok, pid()} | {error, any()}
+%% @doc Start server listening on IpAddr:Port
+start_link(CallbackModule, IpAddr, Port, InitParams) ->
+  gen_server:start_link(?MODULE, [CallbackModule, IpAddr, Port, InitParams], []).
+ 
+%% @hidden
+init([CallbackModule, IpAddr, Port, InitParams]) ->
+  case CallbackModule:init(InitParams) of
+    {ok, ServerState} ->
+      case listen_on(CallbackModule, IpAddr, Port) of
+        {ok, Sock} ->
+          {ok, #state{cb=CallbackModule, sock=Sock, server_state=ServerState}};
+        Error ->
+          CallbackModule:terminate(Error, ServerState),
+          Error
+      end;
+    Err ->
+      Err
+  end.
+ 
+%% @hidden
+handle_call(Request, From, #state{cb=Callback, server_state=ServerState}=State) ->
+  case Callback:handle_call(Request, From, ServerState) of
+    {reply, Reply, NewServerState} ->
+      {reply, Reply, State#state{server_state=NewServerState}};
+    {reply, Reply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+      {reply, Reply, State#state{server_state=NewServerState}, Arg};
+    {noreply, NewServerState} ->
+      {noreply, State#state{server_state=NewServerState}};
+    {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+      {noreply, State#state{server_state=NewServerState}, Arg};
+    {stop, Reason, NewServerState} ->
+      {stop, Reason, State#state{server_state=NewServerState}};
+    {stop, Reason, Reply, NewServerState} ->
+      {stop, Reason, Reply, State#state{server_state=NewServerState}}
+  end.
+ 
+%% @hidden
+handle_cast(Msg, #state{cb=Callback, server_state=ServerState}=State) ->
+  case Callback:handle_cast(Msg, ServerState) of
+    {noreply, NewServerState} ->
+      {noreply, State#state{server_state=NewServerState}};
+    {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+      {noreply, State#state{server_state=NewServerState}, Arg};
+    {stop, Reason, NewServerState} ->
+      {stop, Reason, State#state{server_state=NewServerState}}
+  end.
+ 
+%% @hidden
+handle_info({inet_async, ListSock, _Ref, {ok, CliSocket}}, #state{cb=Callback, server_state=ServerState}=State) ->
+  inet_db:register_socket(CliSocket, inet_tcp),
+  case Callback:new_connection(CliSocket, ServerState) of
+    {ok, NewServerState} ->
+      prim_inet:async_accept(ListSock, -1),
+      {noreply, State#state{server_state=NewServerState}};
+    {stop, Reason, NewServerState} ->
+      {stop, Reason, State#state{server_state=NewServerState}}
+  end;
+ 
+handle_info(Info, #state{cb=Callback, server_state=ServerState}=State) ->
+  case Callback:handle_info(Info, ServerState) of
+    {noreply, NewServerState} ->
+      {noreply, State#state{server_state=NewServerState}};
+    {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+      {noreply, State#state{server_state=NewServerState}, Arg};
+    {stop, Reason, NewServerState} ->
+      {stop, Reason, State#state{server_state=NewServerState}}
+  end.
+ 
+%% @hidden
+terminate(Reason, #state{cb=Callback, sock=Sock, server_state=ServerState}) ->
+  gen_tcp:close(Sock),
+  Callback:terminate(Reason, ServerState),
+  ok.
+ 
+%% @hidden
+code_change(_OldVsn, State, _Extra) ->
+  {ok, State}.
+ 
+%% Internal functions
+ 
+%% @hidden
+%% @spec listen_on(CallbackModule, IpAddr, Port) -> Result
+%% CallbackModule = atom()
+%% IpAddr = string()
+%% Port = integer()
+%% Result = {ok, port()} | {error, any()}
+listen_on(CallbackModule, IpAddr, Port) ->
+  SockOpts = [{ip, convert(IpAddr)}|CallbackModule:sock_opts()],
+  case gen_tcp:listen(Port, SockOpts) of
+    {ok, LSock} ->
+      {ok, _Ref} = prim_inet:async_accept(LSock, -1),
+      {ok, LSock};
+    Err ->
+      Err
+  end.
+ 
+%% @hidden
+%% @spec convert(Addr) -> Result
+%% Addr = string()
+%% Result = {integer(), integer(), integer(), integer()}
+%% @doc Converts text IP addresses "0.0.0.0" to tuples {0, 0, 0, 0}
+convert(Addr) ->
+  T = string:tokens(Addr, "."),
+  list_to_tuple([list_to_integer(X) || X <- T]).

File apps/riak/src/raw_http.hrl

 -define(Q_PROPS, "props").
 -define(Q_KEYS,  "keys").
 -define(Q_FALSE, "false").
+-define(Q_TRUE, "true").
 -define(Q_STREAM, "stream").
 -define(Q_VTAG,  "vtag").
+-define(Q_RETURNBODY, "returnbody").

File apps/riak/src/raw_http_resource.erl

     MDDoc = riak_object:update_metadata(VclockDoc, UserMetaMD),
     Doc = riak_object:update_value(MDDoc, wrq:req_body(RD)),
     ok = C:put(Doc, Ctx#ctx.w, Ctx#ctx.dw),
-    {true, RD, Ctx#ctx{doc={ok, Doc}}}.
+    {RD2, Ctx2} = case wrq:get_qs_value(?Q_RETURNBODY, RD) of
+                      ?Q_TRUE ->
+                          R = Ctx#ctx.r,
+                          DocCtx = Ctx#ctx{doc=C:get(B, K, R)},
+                          {Body, DocRD, DocCtx2} = produce_doc_body(RD, DocCtx),
+                          {wrq:append_to_response_body(Body, DocRD), DocCtx2};
+                      _ ->
+                          {RD, Ctx#ctx{doc={ok, Doc}}}
+                  end,
+    {true, RD2, Ctx2}.
 
 %% @spec extract_content_type(reqdata()) ->
 %%          {ContentType::string(), Charset::string()|undefined}

File apps/riak/src/riak_backup.erl

 %%      restore will reconcile values with the existing data.
 
 -module(riak_backup).
--export ([backup/2, restore/2]).
+-export ([backup/3, restore/2]).
 -define (TABLE, riak_backup_table).
 
 %%% BACKUP %%%
 %% @doc 
 %% Connect to the cluster of which EntryNode is a member, 
 %% read data from the cluster, and save the data in the specified file.
-backup(EntryNode, Filename) -> 
+backup(EntryNode, BaseFilename, Mode) -> 
     % Make sure we can reach the node...
     ensure_connected(EntryNode),
 
     {ok, Ring} = rpc:call(EntryNode, riak_ring_manager, get_my_ring, []),
     Members = riak_ring:all_members(Ring),
 
-    % Print status...
-    io:format("Backing up to '~s'.~n", [Filename]),
-    io:format("...from ~p~n", [Members]),
+    FileName = 
+        case Mode of
+            "all" ->
+                io:format("Backing up (all nodes) to '~s'.~n", [BaseFilename]),
+                io:format("...from ~p~n", [Members]),
+                BaseFilename;
+            "node" ->
+                io:format("Backing up (node ~p) to '~s'.~n", [EntryNode, 
+                                                              BaseFilename++"-"++atom_to_list(EntryNode)]),
+                BaseFilename ++ "-" ++ EntryNode
+        end,
 
     % Make sure all nodes in the cluster agree on the ring...
     ensure_synchronized(Ring, Members),
 
     % Backup the data...
     {ok, ?TABLE} = disk_log:open([{name, ?TABLE},
-                                  {file, Filename},
+                                  {file, FileName},
                                   {mode, read_write},
                                   {type, halt}]),
 
-    [backup_node(Node) || Node <- Members],
+
+    case Mode of
+        "all" ->
+            [backup_node(Node) || Node <- Members];
+        "node" ->
+            backup_node(EntryNode)
+    end,
     ok = disk_log:sync(?TABLE),
     ok = disk_log:close(?TABLE),
     

File apps/riak/src/riak_cache_backend.erl

 
 -module (riak_cache_backend).
 -export([start/2, stop/1, get/2, put/3, list/1, list_bucket/2, delete/2]).
+-export([drop/1, is_empty/1, fold/3]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
 -define(PRINT(Var), error_logger:info_msg("DEBUG: ~p:~p - ~p: ~p~n", [?MODULE, ?LINE, ??Var, Var])).
 delete(State, BKey) -> 
     gen_server:call(State, {delete, BKey}).
 
+drop(State) ->
+    gen_server:call(State, drop).
+
+is_empty(State) ->
+    gen_server:call(State, is_empty).
+
+fold(State, Fun0, Acc) ->
+    gen_server:call(State, {fold, Fun0, Acc}).
+
 stop(State) -> 
     gen_server:call(State, stop).
 
 handle_call(list, _From, State) ->
     % Fold through the gb_trees, gathering keys...
     F = fun(Bucket, Tree, Acc) ->
-        io:format("Bucket: ~p~n", [Bucket]),
-        io:format("Tree: ~p~n", [Tree]),
-        io:format("Acc: ~p~n", [Acc]),
+        %io:format("Bucket: ~p~n", [Bucket]),
+        %io:format("Tree: ~p~n", [Tree]),
+        %io:format("Acc: ~p~n", [Acc]),
 
         Keys = [{Bucket, Key} || Key <- gb_trees:keys(Tree)],
         Acc ++ Keys        
         none -> 
             {reply, [], State}
     end;
-
+handle_call(drop, _From, State=#state{time_tree=TT}) ->
+    [timer:cancel(E#entry.tref) || {_, E} <- gb_trees:to_list(TT)],
+    {reply, ok, State#state{obj_tree=gb_trees:empty(), 
+                            time_tree=gb_trees:empty(),
+                            used_memory=0}};
+handle_call(is_empty, _From, State) ->
+    {reply, AllKeys, State} = handle_call(list, _From, State),
+    case AllKeys of
+        [] ->
+            {reply, true, State};
+        _ ->
+            {reply, false, State}
+    end;
+handle_call({fold, Fun0, Acc}, _From, State) ->
+    {reply, Keys, _} = handle_call(list, _From, State),
+    Reply = do_fold(State, Keys, Fun0, Acc),
+    {reply, Reply, State};
 handle_call(stop, _From, State) -> 
     {reply, ok, State}.
 
 %% Private Functions
 %%
 
+do_fold(State=#state{obj_tree=_OT}, Keys, Fun0, Acc) ->
+    Objs = [element(2, obj_find(B, K, State)) || {B,K} <- Keys],
+    Fun = fun(E, AccIn) ->
+                  Fun0({E#entry.bucket, E#entry.key}, E#entry.value, AccIn)
+          end,
+    lists:foldl(Fun, Acc, Objs).
+    
 % Lookup the #entry record specified by Bucket/Key.    
 % Returns {value, Entry} or none.
 obj_find(Bucket, Key, State) -> 

File apps/riak/src/riak_connect.erl

     end.
 
 ensure_vnodes_started(Ring) ->
-    VNodes2Start = case length(riak_ring:all_members(Ring)) of
-       1 -> riak_ring:my_indices(Ring);
-       _ -> [riak_ring:random_other_index(Ring)| riak_ring:my_indices(Ring)]
-    end,
-    [begin
-        gen_server:cast({riak_vnode_master, node()}, {start_vnode, I}) 
-    end|| I <- VNodes2Start].
+    AllMembers = riak_ring:all_members(Ring),
+    VNodes2Start = 
+        case {length(AllMembers), hd(AllMembers) =:= node()} of
+            {1, true} -> riak_ring:my_indices(Ring);
+            _ -> 
+                {ok, Excl} = gen_server:call(riak_vnode_master, get_exclusions),
+                case riak_ring:random_other_index(Ring, Excl) of
+                    no_indices ->
+                        case length(Excl) =:= riak_ring:num_partitions(Ring) of
+                            true ->
+                                exit;
+                            false ->
+                                riak_ring:my_indices(Ring)
+                        end;
+                    RO ->
+                        [RO | riak_ring:my_indices(Ring)]
+                end
+        end,
+    case VNodes2Start of
+        exit ->
+            riak:stop("node removal completed, exiting.");
+        _ ->
+            [begin
+                 gen_server:cast({riak_vnode_master, node()}, {start_vnode, I}) 
+             end|| I <- VNodes2Start]
+    end.
 
 remove_from_cluster(ExitingNode) ->
     % Set the remote node to stop claiming.

File apps/riak/src/riak_dets_backend.erl

 -module(riak_dets_backend).
 
 -include_lib("eunit/include/eunit.hrl").
--export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2,fold/3, is_empty/1, drop/1]).
 
 % @type state() = term().
--record(state, {table}).
+-record(state, {table, path}).
 
 % @spec start(Partition :: integer(), Config :: proplist()) ->
 %                        {ok, state()} | {{error, Reason :: term()}, state()}
                                    {min_no_slots, 8192},
                                    {max_no_slots, 16777216}]) of
         {ok, DetsName} ->
-            {ok, #state{table=DetsName}};
+            ok = dets:sync(DetsName),
+            {ok, #state{table=DetsName, path=TablePath}};
         {error, Reason}  ->
             riak:stop("dets:open_file failed"),
             {error, Reason}
 % @spec stop(state()) -> ok | {error, Reason :: term()}
 stop(#state{table=T}) -> dets:close(T).
 
-% get(state(), Key :: binary()) ->
+% get(state(), riak_object:bkey()) ->
 %   {ok, Val :: binary()} | {error, Reason :: term()}
 % key must be 160b
 get(#state{table=T}, BKey) ->
         {error, Err} -> {error, Err}
     end.
 
-% put(state(), Key :: binary(), Val :: binary()) ->
+% put(state(), riak_object:bkey(), Val :: binary()) ->
 %   ok | {error, Reason :: term()}
 % key must be 160b
 put(#state{table=T},BKey,Val) -> dets:insert(T, {BKey,Val}).
 
-% delete(state(), Key :: binary()) ->
+% delete(state(), riak_object:bkey()) ->
 %   ok | {error, Reason :: term()}
 % key must be 160b
 delete(#state{table=T}, BKey) -> dets:delete(T, BKey).
 
-% list(state()) -> [Key :: binary()]
+% list(state()) -> [riak_object:bkey()]
 list(#state{table=T}) ->
     MList = dets:match(T,{'$1','_'}),
     list(MList,[]).
     MList = dets:match(T,MatchSpec),
     list(MList,[]).
 
+fold(#state{table=T}, Fun0, Acc) -> 
+    Fun = fun({{B,K}, V}, AccIn) -> Fun0({B,K}, V, AccIn) end,
+    dets:foldl(Fun, Acc, T).
+
+is_empty(#state{table=T}) ->
+    ok = dets:sync(T),
+    dets:info(T, size) =:= 0.
+
+drop(#state{table=T, path=P}) ->
+    ok = dets:close(T),
+    ok = file:delete(P).
+
 %%
 %% Test
 %%

File apps/riak/src/riak_ets_backend.erl

 -behaviour(gen_server).
 
 -include_lib("eunit/include/eunit.hrl").
--export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2,
+         is_empty/1, drop/1, fold/3]).
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 	 terminate/2, code_change/3]).
 handle_call({delete,BKey},_From,State) -> {reply, srv_delete(State,BKey),State};
 handle_call(list,_From,State) -> {reply, srv_list(State), State};
 handle_call({list_bucket,Bucket},_From,State) ->
-    {reply, srv_list_bucket(State, Bucket), State}.
+    {reply, srv_list_bucket(State, Bucket), State};
+handle_call(is_empty, _From, State) ->
+    {reply, ets:info(State#state.t) =:= 0, State};
+handle_call(drop, _From, State) -> 
+    ets:delete(State#state.t),
+    {reply, ok, State};
+handle_call({fold, Fun0, Acc}, _From, State) ->
+    Fun = fun({{B,K}, V}, AccIn) -> Fun0({B,K}, V, AccIn) end,
+    ets:foldl(Fun, Acc, State#state.t),
+    {reply, ok, State}.
+
 
 % @spec stop(state()) -> ok | {error, Reason :: term()}
 stop(SrvRef) -> gen_server:call(SrvRef,stop).
     true = ets:delete(State#state.t),
     ok.
 
-% get(state(), Key :: binary()) ->
+% get(state(), riak_object:bkey()) ->
 %   {ok, Val :: binary()} | {error, Reason :: term()}
 % key must be 160b
 get(SrvRef, BKey) -> gen_server:call(SrvRef,{get,BKey}).
     true = ets:insert(State#state.t, {BKey,Val}),
     ok.
 
-% delete(state(), Key :: binary()) ->
+% delete(state(), riak_object:bkey()) ->
 %   ok | {error, Reason :: term()}
 % key must be 160b
 delete(SrvRef, BKey) -> gen_server:call(SrvRef,{delete,BKey}).
     true = ets:delete(State#state.t, BKey),
     ok.
 
-% list(state()) -> [Key :: binary()]
+% list(state()) -> [riak_object:bkey()]
 list(SrvRef) -> gen_server:call(SrvRef,list).
 srv_list(State) ->
     MList = ets:match(State#state.t,{'$1','_'}),
     MList = ets:match(State#state.t,MatchSpec),
     list(MList,[]).
 
+is_empty(SrvRef) -> gen_server:call(SrvRef, is_empty).
+
+drop(SrvRef) -> gen_server:call(SrvRef, drop).
+    
+fold(SrvRef, Fun, Acc0) -> gen_server:call(SrvRef, {fold, Fun, Acc0}, infinity).
+
 %% @private
 handle_info(_Msg, State) -> {noreply, State}.
 

File apps/riak/src/riak_fs_backend.erl

 
 -module(riak_fs_backend).
 -export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([fold/3, drop/1, is_empty/1]).
+
 -include_lib("eunit/include/eunit.hrl").
 % @type state() = term().
 -record(state, {dir}).
     [location_to_bkey(X) || X <- filelib:wildcard("*/*/*/*/*",
                                                   State#state.dir)].
 
+
 %% @spec list_bucket(state(), riak_object:bucket()) ->
 %%           [riak_object:key()]
 %% @doc Get a list of the keys in a bucket
                                                       B64,"*/*/*/*"])) ]]
     end.
 
+is_empty(State) -> ?MODULE:list(State) =:= [].
+
+fold(State, Fun0, Acc) ->
+    Fun = fun(BKey, AccIn) -> 
+                  case ?MODULE:get(State, BKey) of
+                      {ok, Bin} ->
+                          Fun0(BKey, Bin, AccIn);
+                      _ ->
+                          AccIn
+                  end
+          end,
+    lists:foldl(Fun, Acc, ?MODULE:list(State)).
+
+drop(State) ->
+    [file:delete(location(State, BK)) || BK <- ?MODULE:list(State)],
+    Cmd = io_lib:format("rm -Rf ~s", [State#state.dir]),
+    os:cmd(Cmd),
+    ok.
+
 %% @spec location(state(), {riak_object:bucket(), riak_object:key()})
 %%          -> string()
 %% @doc produce the file-path at which the object for the given Bucket

File apps/riak/src/riak_gb_trees_backend.erl

 -module(riak_gb_trees_backend).
 
 -include_lib("eunit/include/eunit.hrl").
--export([start/2, stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([start/2, stop/1,get/2,put/3,list/1,list_bucket/2,delete/2,is_empty/1,fold/3,drop/1]).
 
 % @type state() = term().
 -record(state, {pid}).
     Ref = make_ref(),
     Pid ! {list_bucket, Bucket, self(), Ref},
     receive {list_bucket_response, Result, Ref} -> Result end.
+
+is_empty(#state { pid=Pid }) ->
+    Ref = make_ref(),
+    Pid ! {is_empty, self(), Ref},
+    receive {is_empty_response, Result, Ref} -> Result end.
+
+fold(#state{ pid=Pid }, Fun0, Acc) ->
+    Ref = make_ref(),
+    Pid ! {fold, Fun0, Acc, self(), Ref},
+    receive {fold_response, Result, Ref} -> Result end.
+
+drop(#state{ pid=Pid }) ->
+    Ref = make_ref(),
+    Pid ! {drop, self(), Ref},
+    receive {drop_response, Result, Ref} -> Result end.
     
 tree_loop(Tree) ->
     receive
                    srv_list_bucket(Tree, Bucket), Ref},
             tree_loop(Tree);
 
+        {is_empty, Pid, Ref} ->
+            Pid ! {is_empty_response, gb_trees:is_empty(Tree), Ref},
+            tree_loop(Tree);
+
+        {drop, Pid, Ref} ->
+            Pid ! {drop_response, ok, Ref},
+            tree_loop(gb_trees:empty());
+
+        {fold, Fun0, Acc, Pid, Ref} ->
+            Pid ! {fold_response,
+                   srv_fold(Tree, Fun0, Acc), Ref},
+            tree_loop(Tree);
+
         {stop, Pid, Ref} ->
             Pid ! {stop_response, ok, Ref}
     end.
 srv_list_bucket(Tree, Bucket) ->
     [ Key || {B, Key} <- gb_trees:keys(Tree), B == Bucket ].
 
+srv_fold(Tree, Fun0, Acc) ->
+    Iter0 = gb_trees:iterator(Tree),
+    srv_fold1(gb_trees:next(Iter0), Fun0, Acc).
+
+srv_fold1(none, _Fun0, Acc) ->
+    Acc;
+srv_fold1({K,V,Iter}, Fun0, Acc) ->
+    srv_fold1(gb_trees:next(Iter), Fun0, Fun0(K,V,Acc)).
+
 
 %%
 %% Test

File apps/riak/src/riak_handoff_listener.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+-module(riak_handoff_listener).
+-behavior(gen_nb_server).
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+-export([sock_opts/0, new_connection/2]).
+-record(state, {portnum}).
+
+start_link() ->
+    PortNum = 
+        case application:get_env(riak, riak_handoff_port) of
+            undefined -> 8099;
+            {ok, N} -> N
+        end,
+    IpAddr = 
+        case application:get_env(riak, riak_handoff_ip) of
+            undefined -> "0.0.0.0";
+            {ok, IP} -> IP
+        end,
+    gen_nb_server:start_link(?MODULE, IpAddr, PortNum, [PortNum]).
+
+init([PortNum]) -> 
+    register(?MODULE, self()),
+    {ok, #state{portnum=PortNum}}.
+
+sock_opts() -> [binary, {packet, 4}, {reuseaddr, true}, {backlog, 64}].
+
+handle_call(handoff_port, _From, State=#state{portnum=P}) -> 
+    {reply, {ok, P}, State}.
+
+handle_cast(_Msg, State) -> {noreply, State}.
+
+handle_info(_Info, State) -> {noreply, State}.
+
+terminate(_Reason, _State) -> ok.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+new_connection(Socket, State) ->
+    {ok, Pid} = riak_handoff_receiver:start_link(Socket),
+    gen_tcp:controlling_process(Socket, Pid),
+    {ok, State}.
+

File apps/riak/src/riak_handoff_receiver.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+-module(riak_handoff_receiver).
+-include("riakserver_pb.hrl").
+-behaviour(gen_server2).
+
+-export([start_link/1]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {sock, partition, vnode}).
+
+start_link(Socket) ->
+    gen_server2:start_link(?MODULE, [Socket], []).
+
+init([Socket]) -> 
+    inet:setopts(Socket, [{active, once}, {packet, 4}, {header, 1}]),
+    {ok, #state{sock=Socket}}.
+
+handle_info({tcp_closed, Socket}, State=#state{sock=Socket}) ->
+    {stop, normal, State};
+handle_info({tcp, _Sock, Data}, State=#state{sock=Socket}) ->
+    [MsgType|MsgData] = Data,
+    NewState = process_message(MsgType,MsgData,State),
+    inet:setopts(Socket, [{active, once}]),
+    {noreply, NewState}.
+
+process_message(0, MsgData, State) ->
+    <<Partition:160/integer>> = MsgData,
+    {ok, VNode} = gen_server2:call(riak_vnode_master, {get_vnode, Partition}, 60000),  
+    State#state{partition=Partition, vnode=VNode};
+process_message(1, MsgData, State=#state{vnode=VNode}) ->
+    % header of 1 is a riakobject_pb
+    RO_PB = riakserver_pb:decode_riakobject_pb(zlib:unzip(MsgData)),
+    BKey = {RO_PB#riakobject_pb.bucket,RO_PB#riakobject_pb.key},
+    Msg = {diffobj, {BKey, RO_PB#riakobject_pb.val}},
+    ok = gen_fsm:sync_send_all_state_event(VNode, Msg, 60000),
+    State;
+process_message(2, _MsgData, State=#state{sock=Socket}) ->
+    ok = gen_tcp:send(Socket, <<2:8,"sync">>),
+    State.
+
+handle_call(_Request, _From, State) -> {reply, ok, State}.
+
+handle_cast(_Msg, State) -> {noreply, State}.
+
+terminate(_Reason, _State) -> ok.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+

File apps/riak/src/riak_handoff_sender.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+-module(riak_handoff_sender).
+-export([start_link/3]).
+-include("riakserver_pb.hrl").
+
+start_link(TargetNode, Partition, BKeyList) ->
+    case global:set_lock({handoff_token, {node(), Partition}}, [node()], 0) of
+        true ->
+            Self = self(),
+            {ok, spawn_link(fun()->start_fold(TargetNode, Partition, BKeyList, Self) end)};
+        false ->
+            {error, locked}
+    end.
+            
+
+start_fold(TargetNode, Partition, BKeyList, ParentPid) ->
+    [_Name,Host] = string:tokens(atom_to_list(TargetNode), "@"),
+    {ok, Port} = get_handoff_port(TargetNode),
+    {ok, Socket} = gen_tcp:connect(Host, Port, 
+                                   [binary, 
+                                    {packet, 4}, 
+                                    {header,1}, 
+                                    {active, once}], 15000),
+    M = <<0:8,Partition:160/integer>>,
+    ok = gen_tcp:send(Socket, M),
+    case BKeyList of
+        all ->
+            gen_server2:call(riak_vnode_master, 
+                     {fold, {Partition, fun folder/3, {Socket, ParentPid, []}}},
+                     infinity);
+        _ ->
+            inner_fold({Socket,ParentPid,[]},BKeyList)
+    end,
+    gen_fsm:send_event(ParentPid, handoff_complete).
+
+inner_fold(_FoldArg,[]) -> ok;
+inner_fold(FoldArg,[{B,K}|Tail]) ->
+    {_Socket,ParentPid,_Count} = FoldArg,
+    case gen_fsm:sync_send_event(ParentPid, {get_binary, {B,K}}, infinity) of
+        {ok, V} ->
+            inner_fold(folder({B,K},V,FoldArg),Tail);
+        _ ->
+            inner_fold(FoldArg,Tail)
+    end.
+            
+folder({B,K}, V, {Socket, ParentPid, []}) ->
+    gen_tcp:controlling_process(Socket, self()),
+    visit_item({B,K}, V, {Socket, ParentPid, 0});
+folder({B,K}, V, AccIn) ->
+    visit_item({B,K}, V, AccIn).
+
+visit_item({B,K}, V, {Socket, ParentPid, 100}) ->
+    M = <<2:8,"sync">>,
+    ok = gen_tcp:send(Socket, M),
+    inet:setopts(Socket, [{active, false}]),
+    {ok,[2|<<"sync">>]} = gen_tcp:recv(Socket, 0),
+    inet:setopts(Socket, [{active, once}]),
+    visit_item({B,K}, V, {Socket, ParentPid, 0});
+visit_item({B,K}, V, {Socket, ParentPid, Acc}) ->
+    D = zlib:zip(riakserver_pb:encode_riakobject_pb(
+                   #riakobject_pb{bucket=B, key=K, val=V})),
+    M = <<1:8,D/binary>>,
+    ok = gen_tcp:send(Socket, M),
+    {Socket, ParentPid, Acc+1}.
+    
+
+get_handoff_port(Node) when is_atom(Node) ->
+    gen_server2:call({riak_handoff_listener, Node}, handoff_port).
+
+
+
+
+
+
+

File apps/riak/src/riak_multi_backend.erl

 %% @doc riak_gb_trees_backend is a Riak storage backend using Erlang gb_trees.
 
 -module (riak_multi_backend).
--export([start/2, stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
+-export([start/2, stop/1,get/2,put/3,list/1,list_bucket/2,delete/2,is_empty/1,drop/1,fold/3]).
 
 -include_lib("eunit/include/eunit.hrl").
 
 list_bucket(State, Bucket) ->
     {_Name, Module, SubState} = get_backend(Bucket, State),
     Module:list_bucket(SubState, Bucket).
-    
+
+is_empty(State) ->
+    F = fun({_, Module, SubState}, Acc) ->
+                [Module:is_empty(SubState)|Acc]
+        end,
+    lists:all(fun(I) -> I end, lists:foldl(F, [], State#state.backends)).
+
+drop(State) ->
+    F = fun({_, Module, SubState}, Acc) ->
+                [Module:drop(SubState)|Acc]
+        end,
+    lists:foldl(F, [], State#state.backends),
+    ok.
+
+fold(State, Fun0, Acc) ->    
+    F = fun({_, Module, SubState}, AccIn) ->
+                [Module:fold(SubState, Fun0, AccIn)|AccIn]
+        end,
+    lists:flatten(lists:foldl(F, Acc, State#state.backends)).
+
 
 % Given a Bucket name and the State, return the
 % backend definition. (ie: {Name, Module, SubState})

File apps/riak/src/riak_osmos_backend.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.    
-
-%% @doc riak_osmos_backend is a storage backend that makes use of
-%%      the Dukes of Erl "osmos" file storage system.
-%%      http://dukesoferl.blogspot.com/2009/07/osmos.html
-%%      http://code.google.com/p/osmos/
-%%
-%% Configuration parameters:
-%%
-%%  riak_osmos_backend_root:
-%%       Directory in which to keep osmos files
-%%       Required - riak will exit if this option is not set                    
-%%
-%%  riak_osmost_backend_block_size:
-%%       "Block size" parameter, as documented in osmos's docs
-%%       osmos_table_format:new/3
-%%       Optional - defaulted to 2048
-%%
-%% Notes:
-%%  - Support for the 'delete' operation is strange in osmos.  As such,
-%%    delete is implemented here as writing an empty binary for a key's
-%%    value.  This is safe because no riak_object will ever serialize
-%%    to an empty binary.
-%%  - The osmos documentation is not explicit about it, but opening two
-%%    tables in the same directory will cause osmos to become confused.
-%%    To solve this, each partition opens its table in a unique subdirectory
-%%    under riak_osmos_backend_root.
--module(riak_osmos_backend).
-
--export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
--include_lib("eunit/include/eunit.hrl").
--record(state, {table}).
-
-%% @spec start(Partition :: integer(), Config :: proplist()) ->
-%%                        {ok, state()} | {{error, Reason :: term()}, state()}
-start(Partition, Config) ->
-    ConfigRoot = proplists:get_value(riak_osmos_backend_root, Config),
-    if ConfigRoot =:= undefined ->
-            riak:stop("riak_osmos_backend_root unset, failing.");
-       true -> ok
-    end,
-
-    %% put each table in its own directory, named after the partition
-    %% osmos requires this - each table must have its own directory
-    TableRoot = filename:join([ConfigRoot, integer_to_list(Partition)]),
-    case filelib:ensure_dir(TableRoot) of
-        ok -> ok;
-        _Error ->
-            riak:stop("riak_osmos_backend could not ensure"
-                      " the existence of its root directory")
-    end,
-
-    case application:start(osmos) of
-        ok -> ok;
-        {error,{already_started,osmos}} -> ok
-    end,
-
-    BlockSize = riak:get_app_env(riak_osmos_backend_block_size, 2048),
-    Format = osmos_table_format:new(binary, binary_replace, BlockSize),
-
-    Ready = case osmos:open(Partition, 
-                            [{directory, TableRoot},
-                             {format, Format}]) of
-                {ok, Partition} ->
-                    ok;
-                {error, Reason} ->
-                    riak:stop("osmos:open failed"),
-                    {error, Reason}
-            end,
-    {Ready, #state{table=Partition}}.
-
-%% @spec stop(state()) -> ok | {error, Reason :: term()}
-stop(#state{table=Table}) ->
-    osmos:close(Table).
-
-%% get(state(), BKey :: riak_object:bkey()) ->
-%%   {ok, Val :: binary()} | {error, Reason :: term()}
-%% key must be 160b
-get(#state{table=Table}, BKey) ->
-    case osmos:read(Table, term_to_binary(BKey)) of
-        {ok, <<>>}  -> {error, notfound}; %% sentinal for delete
-        {ok, Value} -> {ok, Value};
-        not_found   -> {error, notfound}
-    end.
-
-%% put(state(), BKey :: riak_object:bkey(), Val :: binary()) ->
-%%   ok | {error, Reason :: term()}
-%% key must be 160b
-put(#state{table=Table},BKey,Val) ->       
-    osmos:write(Table, term_to_binary(BKey), Val).
-
-%% delete(state(), Key :: riak_object:key()) ->
-%%   ok | {error, Reason :: term()}
-%% key must be 160b
-delete(#state{table=Table}, BKey) ->
-    osmos:write(Table, term_to_binary(BKey), <<>>). %% sentinal for delete
-
--define(SELECT_CHUNK, 1000).
-
-%% list(state()) -> [Key :: riak_object:key()]
-list(#state{table=Table}) ->
-    accum(Table, fun(K,_) -> {true, binary_to_term(K)} end).
-
-%% list_bucket(state(), Bucket :: riak_object:bucket()) -> [Key :: binary()]
-list_bucket(#state{table=Table}, Bucket) ->
-    accum(Table,
-          fun(Key,_) ->
-                  {B, K} = binary_to_term(Key),
-                  case Bucket of
-                      '_' -> {true, B};
-                      {filter, B, Fun} -> 
-                                  case Fun([K]) of
-                                      true -> {true, K};
-                                      _    -> false
-                                  end;
-                      {filter, _, _} -> false;
-                      B -> {true, K};
-                      _ -> false
-                  end
-          end).
-
-%% @spec accum(osmos_table(), function()) -> [term()]
-%% @doc map across the rows in the osmos table, and return
-%%      terms R for which Fun returns {true, R}
-%% Explanation of osmos:select_range params:
-%%   The three functions are, in order, LessLo, LessHi, and Select.
-%%   We are trying to select *all* keys, not a limited range, thus,
-%%   LessLo always returns 'false' to say "all keys are not less than
-%%   our desired lower bound" and LessHi always returns 'true' to say
-%%   "all keys are less than our desired upper bound".
-%%   Select's only function is to throw away the keys that have been
-%%   deleted (i.e. that have the delete sentinal stored as their value).
-accum(Table, Fun) ->
-    accum2(Table, Fun,
-           osmos:select_range(Table,
-                              fun(_) -> false end,
-                              fun(_) -> true end,
-                              fun(_,V) -> V /= <<>> end,
-                              ?SELECT_CHUNK),
-           []).
-                          
-%% simple accumulator to exhaust select_range's continuation should
-%% there be more than SELECT_CHUNK keys to return
-accum2(_, _, {ok, [], _}, Acc) -> lists:append(Acc);
-accum2(_, _, {error, _}, Acc)  -> lists:append(Acc);
-accum2(Table, Fun, {ok, NewList, Continue}, Acc) ->
-    accum2(Table, Fun,
-           osmos:select_continue(Table, Continue, ?SELECT_CHUNK),
-           [[ A || {true, A} <- [ Fun(K,V) || {K,V} <- NewList]]|Acc]).
-
-%%
-%% Test
-%%
-
-simple_test() ->
-    case application:start(osmos) of
-       ok ->
-            ?assertCmd("rm -rf test/osmos-backend"),
-            Config = [{riak_osmos_backend_root, "test/osmos-backend"}],
-            riak_test_util:standard_backend_test(riak_osmos_backend, Config);
-       Error ->
-            ?debugFmt("Skipping osmos tests: ~p", [Error])
-    end.
-

File apps/riak/src/riak_ring.erl

 	 owner_node/1,all_members/1,num_partitions/1,all_owners/1,
          transfer_node/3, rename_node/3, reconcile/2, my_indices/1,
 	 index_owner/2,diff_nodes/2,random_node/1, random_other_index/1,
+         random_other_index/2,
          get_meta/2, update_meta/3, equal_rings/2]).	 
 
 % @type riak_ring(). The opaque data type used for partition ownership.
         _ -> lists:nth(crypto:rand_uniform(1, length(L)+1), L)
     end.
 
+random_other_index(State, Exclude) when is_list(Exclude) ->
+    L = [I || {I, Owner} <- ?MODULE:all_owners(State),
+              Owner =/= node(),
+              not lists:member(I, Exclude)],
+    case L of
+        [] -> no_indices;
+        _ -> lists:nth(crypto:rand_uniform(1, length(L)+1), L)
+    end.
+
 % @doc Return the node that owns the given index.
 % @spec index_owner(State :: chstate(), Idx :: integer()) -> Node :: term()
 index_owner(State, Idx) ->
     end.
 
 % @doc  Rename OldNode to NewNode in a Riak ring.
-% @spec transfer_node(Idx :: integer(), Node :: term(), MyState :: chstate()) ->
+% @spec rename_node(State :: chstate(), OldNode :: atom(), NewNode :: atom()) ->
 %           chstate()
 rename_node(State=#chstate{chring=Ring, nodename=ThisNode}, OldNode, NewNode) 
   when is_atom(OldNode), is_atom(NewNode)  ->

File apps/riak/src/riak_ring_manager.erl

                     Timestamps = [TS || {"riak_ring", C1, TS} <- 
                      [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames],
                                         C1 =:= Cluster],
-                    TSPat = [io_lib:fread("~4d~2d~2d~2d~2d~2d",TS) ||
-                                TS <- Timestamps],
-                    TSL = lists:reverse(lists:sort([TS ||
-                                                       {ok,TS,[]} <- TSPat])),
-                    Keep = prune_list(TSL),
-                    KeepTSs = [lists:flatten(
-                         io_lib:format(
-                           "~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",K))
-                               || K <- Keep],
-                    DelFNs = [Dir ++ "/" ++ FN || FN <- Filenames, 
-                                                  lists:all(fun(TS) -> 
-                                                      string:str(FN,TS)=:=0
-                                                            end, KeepTSs)],
-                    riak_eventer:notify(riak_ring_manager, prune_ringfiles,
-                                        {length(DelFNs),length(Timestamps)}),
-                    [file:delete(DelFN) || DelFN <- DelFNs],
-                    ok
+                    if Timestamps /= [] ->
+                            %% there are existing ring files
+                            TSPat = [io_lib:fread("~4d~2d~2d~2d~2d~2d",TS) ||
+                                        TS <- Timestamps],
+                            TSL = lists:reverse(lists:sort([TS ||
+                                                               {ok,TS,[]} <- TSPat])),
+                            Keep = prune_list(TSL),
+                            KeepTSs = [lists:flatten(
+                                         io_lib:format(
+                                           "~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",K))
+                                       || K <- Keep],
+                            DelFNs = [Dir ++ "/" ++ FN || FN <- Filenames, 
+                                                          lists:all(fun(TS) -> 
+                                                                            string:str(FN,TS)=:=0
+                                                                    end, KeepTSs)],
+                            riak_eventer:notify(riak_ring_manager, prune_ringfiles,
+                                                {length(DelFNs),length(Timestamps)}),
+                            [file:delete(DelFN) || DelFN <- DelFNs],
+                            ok;
+                       true ->
+                            %% directory wasn't empty, but there are no ring
+                            %% files in it
+                            ok
+                    end
             end
     end.
 

File apps/riak/src/riak_sup.erl

     VMaster = {riak_vnode_master,
                {riak_vnode_master, start_link, []},
                permanent, 5000, worker, [riak_vnode_master]},
+    HandoffListen = {riak_handoff_listener,
+               {riak_handoff_listener, start_link, []},
+               permanent, 5000, worker, [riak_handoff_listener]},
     RingMgr = {riak_ring_manager,
              {riak_ring_manager, start_link, []},
              permanent, 5000, worker, [riak_ring_manager]},
         Eventer,
         VSup,
         ?IF(HasStorageBackend, VMaster, []),
-        RingMgr,
-        Connect,
+        HandoffListen,
+        RingMgr, 
+        Connect, 
         LocalLogger,
         ?IF(IsWebConfigured, RiakWeb, []),
         ?IF(IsStatEnabled, RiakStat, []),

File apps/riak/src/riak_vnode.erl

 -export([start_link/1]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
--export([active/2,merk_waiting/2,waiting_diffobjs/2]).
--export([active/3,merk_waiting/3,waiting_diffobjs/3]).
+-export([active/2,active/3]).
 
 -define(TIMEOUT, 60000).
+-define(LOCK_RETRY_TIMEOUT, 10000).
 
--record(state, {idx,mapcache,mod,modstate,waiting_diffobjs}).
+-record(state, {idx,mapcache,mod,modstate,handoff_q}).
 
-start_link(Idx) ->
-    gen_fsm:start_link(?MODULE, [Idx], []).
+start_link(Idx) -> gen_fsm:start_link(?MODULE, [Idx], []).
 
 init([VNodeIndex]) ->
     Mod = riak:get_app_env(storage_backend),
     Configuration = riak:get_app_env(),
     {ok, ModState} = Mod:start(VNodeIndex, Configuration),
-    StateData0 = #state{idx=VNodeIndex,mod=Mod,modstate=ModState},
-    {next_state, StateName, StateData, Timeout} = hometest(StateData0),
-    {ok, StateName, StateData, Timeout}.
+    StateData0 = #state{idx=VNodeIndex,mod=Mod,modstate=ModState,
+                        handoff_q=not_in_handoff},
+    case hometest(StateData0) of
+        {next_state, StateName, StateData, Timeout} ->
+            {ok, StateName, StateData, Timeout};
+        {stop,normal,StateData} ->
+            {ok, ModState1} = Mod:start(VNodeIndex, Configuration),
+            {ok, active, StateData#state{mod=Mod, modstate=ModState1}, ?TIMEOUT}
+    end.
 
 %% @private
-hometest(StateData0=#state{idx=Idx}) ->
+hometest(StateData0=#state{idx=Idx,handoff_q=HQ}) ->
     StateData = StateData0#state{mapcache=orddict:new()},
     {ok, MyRing} = riak_ring_manager:get_my_ring(),
     Me = node(),
     case riak_ring:index_owner(MyRing, Idx) of
         Me ->
-            {next_state,active,StateData,?TIMEOUT};
+            {next_state,active,StateData#state{handoff_q=not_in_handoff},
+             ?TIMEOUT};
         TargetNode ->
             case net_adm:ping(TargetNode) of
-                pang -> {next_state,active,StateData,?TIMEOUT};
-                pong -> build_and_send_merkle(TargetNode, StateData)
+                pang -> 
+                    {next_state,active,StateData,?TIMEOUT};
+                pong -> 
+                    case HQ of
+                        not_in_handoff ->
+                            do_handoff(TargetNode, StateData);
+                        _ ->
+                            do_list_handoff(TargetNode, HQ, StateData)
+                    end
             end
     end.
 
 %% @private
-build_and_send_merkle(TargetNode,
-                      StateData=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
-    ObjList = Mod:list(ModState),
-    Merk = make_merk(StateData, ObjList),
-    gen_server:cast({riak_vnode_master, TargetNode},
-                    {vnode_merkle, {self(),Idx,Merk,ObjList}}),
-    {next_state,merk_waiting,
-     StateData#state{waiting_diffobjs=ObjList},?TIMEOUT}.
+do_handoff(TargetNode, StateData=#state{idx=Idx, mod=Mod, modstate=ModState}) ->
+    case Mod:is_empty(ModState) of
+        true ->
+            delete_and_exit(StateData);
+        false ->
+            {HQ,TO} = case riak_handoff_sender:start_link(TargetNode, Idx, all) of
+                {ok, _Pid} -> {[], ?TIMEOUT};
+                {error, locked} -> {not_in_handoff, ?LOCK_RETRY_TIMEOUT}
+            end,
+            {next_state,active,StateData#state{handoff_q=HQ},TO}
+    end.
 
 %% @private
-make_merk(StateData,ObjList) ->
-    Merk = merkerl:build_tree([]),
-    make_merk(StateData,ObjList,Merk).
-make_merk(_StateData,[],Merk) -> Merk;
-make_merk(StateData=#state{mod=Mod,modstate=ModState},
-          [BKey|Objlist],Merk) ->
-    V = Mod:get(ModState,BKey), % normally, V = {ok,BinObj}
-    make_merk(StateData,Objlist,merkerl:insert({BKey,erlang:phash2(V)},Merk)).
+do_list_handoff(TargetNode, BKeyList, StateData=#state{idx=Idx}) ->
+    case BKeyList of
+        [] ->
+            delete_and_exit(StateData);
+        _ ->
+            {HQ,TO} = case riak_handoff_sender:start_link(TargetNode, Idx, all) of
+                {ok, _Pid} -> {[], ?TIMEOUT};
+                {error, locked} -> {not_in_handoff, ?LOCK_RETRY_TIMEOUT}
+            end,
+            {next_state,active,StateData#state{handoff_q=HQ},TO}
+    end.
 
 %% @private
-send_diff_objs(TargetNode,DiffList,
-               StateData=#state{mod=Mod,modstate=ModState}) ->
-    % send each obj (BKey) in difflist to targetnode
-    % return a state with waiting_diffobjs populated
-    Sent = [K || K <- [send_diff_obj(TargetNode,BKey,Mod,ModState)
-                       || BKey <- DiffList], K /= nop],
-    StateData#state{waiting_diffobjs=Sent}.
-send_diff_obj(TargetNode,BKey,Mod,ModState) ->
-    case Mod:get(ModState,BKey) of
-        {ok,BinObj} ->
-            gen_fsm:send_event(TargetNode, {diffobj,{BKey,BinObj,self()}}),
-            BKey;
-        _ ->
-            nop
-    end.
-
-%%%%%%%%%% in merk_waiting state, we have sent a merkle tree to the
-%%%%%%%%%% home vnode, and are waiting for a list of different objects
-merk_waiting({get_binary,_BKey}, _From, StateData) ->
-    {reply,{error, wrong_state},active,StateData,?TIMEOUT};
-merk_waiting(list, _From, StateData) ->
-    {reply,{error, wrong_state},active,StateData,?TIMEOUT}.
-merk_waiting(timeout, StateData) ->
-    % didn't get a response to our merkle tree, switch back to active mode
-    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
-merk_waiting(merk_nodiff, StateData0=#state{waiting_diffobjs=WD,
-                                            mod=Mod,modstate=ModState}) ->
-    % the far side is home and has all of the objects, cleanup time
-    StateData = StateData0#state{waiting_diffobjs=[]},
-    [Mod:delete(ModState, BKey) || BKey <- WD],
-    case Mod:list(ModState) of
-        [] ->
-            Mod:stop(ModState),
-            {stop,normal,StateData};
-        _ ->
-            hometest(StateData)
-    end;
-merk_waiting({merk_diff,TargetVNode,DiffList}, StateData0) ->
-    StateData = send_diff_objs(TargetVNode,DiffList,StateData0),
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
-merk_waiting({diffobj,{_BKey,_BinObj,_RemNode}}, StateData) ->
-    hometest(StateData);
-merk_waiting({map, ClientPid, QTerm, BKey, KeyData}, StateData) ->
-    NewState = do_map(ClientPid,QTerm,BKey,KeyData,StateData,self()),
-    {next_state,merk_waiting,NewState,?TIMEOUT};
-merk_waiting({put, FSM_pid, _BKey, _RObj, ReqID, _FSMTime},
-             StateData=#state{idx=Idx}) ->
-    gen_fsm:send_event(FSM_pid, {fail, Idx, ReqID}),
-    {next_state,merk_waiting,StateData,?TIMEOUT};
-merk_waiting({get, FSM_pid, BKey, ReqID}, StateData) ->
-    do_get(FSM_pid, BKey, ReqID, StateData),
-    {next_state,merk_waiting,StateData,?TIMEOUT};
-merk_waiting({vnode_merkle, {_RemoteVN,_Merkle,_ObjList}}, StateData) ->
-    hometest(StateData);
-merk_waiting({list_bucket, FSM_pid, Bucket, ReqID},
-             StateData=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
-    do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx),
-    {next_state,merk_waiting,StateData,?TIMEOUT};
-merk_waiting({delete, From, BKey, ReqID}, StateData=#state{mapcache=Cache}) ->
-    do_delete(From, BKey, ReqID, StateData),
-    {next_state,
-     merk_waiting,StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
-merk_waiting(_OtherMessage,StateData) ->
-    {next_state,merk_waiting,StateData,?TIMEOUT}.
-
-%%%%%%%%%% in waiting_diffobjs state, we have sent a list of diff objs to the
-%%%%%%%%%% home vnode, and are waiting to hear that they've been handled
-waiting_diffobjs({get_binary,_BKey}, _From, StateData) ->
-    {reply,{error, wrong_state},active,StateData,?TIMEOUT};
-waiting_diffobjs(list, _From, StateData) ->
-    {reply,{error, wrong_state},active,StateData,?TIMEOUT}.
-waiting_diffobjs(timeout, StateData) ->
-    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
-waiting_diffobjs({resolved_diffobj,K},
-         StateData0=#state{waiting_diffobjs=WD0,mod=Mod,modstate=ModState})->
-    WD = lists:delete(K,WD0),
-    Mod:delete(ModState, K),
-    StateData = StateData0#state{waiting_diffobjs=WD},
-    case WD of
-        [] -> % resolved all the intended diff objects
-            hometest(StateData);
-        _ -> % some left, keep waiting
-            {next_state,waiting_diffobjs,StateData,?TIMEOUT}
-    end;
-waiting_diffobjs(merk_nodiff, StateData) ->
-    % got merkle reply at a very strange time
-    % jump into active mode to handle some requests before trying again
-    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
-waiting_diffobjs({merk_diff,_TargetNode,_DiffList}, StateData) ->
-    % got merkle reply at a very strange time
-    % jump into active mode to handle some requests before trying again
-    {next_state,active,StateData#state{waiting_diffobjs=[]},?TIMEOUT};
-waiting_diffobjs({diffobj,{_BKey,_BinObj,_RemNode}}, StateData) ->
-    hometest(StateData);
-waiting_diffobjs({map, ClientPid, QTerm, BKey, KeyData}, StateData) ->
-    NewState = do_map(ClientPid,QTerm,BKey,KeyData,StateData,self()),
-    {next_state,waiting_diffobjs,NewState,?TIMEOUT};
-waiting_diffobjs({put, FSM_pid, _BKey, _RObj, ReqID, _FSMTime},
-                 StateData=#state{idx=Idx}) ->
-    gen_fsm:send_event(FSM_pid, {fail, Idx, ReqID}),
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
-waiting_diffobjs({get, FSM_pid, BKey, ReqID}, StateData) ->
-    do_get(FSM_pid, BKey, ReqID, StateData),
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
-waiting_diffobjs({vnode_merkle, {_RemoteVN,_Merkle,_ObjList}}, StateData) ->
-    hometest(StateData);
-waiting_diffobjs({list_bucket, FSM_pid, Bucket, ReqID},
-                 StateData=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
-    do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx),
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT};
-waiting_diffobjs({delete, From, BKey, ReqID},
-                 StateData=#state{mapcache=Cache}) ->
-    do_delete(From, BKey, ReqID, StateData),
-    {next_state,waiting_diffobjs,
-     StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
-waiting_diffobjs(_OtherMessage,StateData) ->
-    {next_state,waiting_diffobjs,StateData,?TIMEOUT}.
+delete_and_exit(StateData=#state{idx=Idx, mod=Mod, modstate=ModState}) ->
+    ok = Mod:drop(ModState),
+    gen_server:cast(riak_vnode_master, {add_exclusion, Idx}),
+    {stop, normal, StateData}.
 
 %%%%%%%%%% in active state, we process normal client requests
 active({get_binary,BKey}, _From, StateData=#state{mod=Mod,modstate=ModState}) ->
     {reply,{ok, Mod:list(ModState)},active,StateData,?TIMEOUT}.
 active(timeout, StateData) ->
     hometest(StateData);
-active({diffobj,{BKey,BinObj,FromVN}}, StateData) ->
+active({diffobj,{BKey,BinObj}}, StateData) ->
     case do_diffobj_put(BKey, binary_to_term(BinObj), StateData) of
         ok ->
-            gen_fsm:send_event(FromVN,{resolved_diffobj,BKey});
+            nop;
         {error, Err} ->
             error_logger:error_msg("Error storing handoff obj: ~p~n", [Err])
     end,
 active({map, ClientPid, QTerm, BKey, KeyData}, StateData) ->
     NewState = do_map(ClientPid,QTerm,BKey,KeyData,StateData,self()),
     {next_state,active,NewState,?TIMEOUT};
+active(handoff_complete, StateData=#state{idx=Idx}) ->
+    global:del_lock({handoff_token, {node(), Idx}}),
+    hometest(StateData);
 active({put, FSM_pid, BKey, RObj, ReqID, FSMTime},
-       StateData=#state{idx=Idx,mapcache=Cache}) ->
+       StateData=#state{idx=Idx,mapcache=Cache,handoff_q=HQ0}) ->
+    HQ = case HQ0 of
+        not_in_handoff -> not_in_handoff;
+        _ -> [BKey|HQ0]
+    end,
     gen_fsm:send_event(FSM_pid, {w, Idx, ReqID}),
     do_put(FSM_pid, BKey, RObj, ReqID, FSMTime, StateData),
     {next_state,
-     active,StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
+     active,StateData#state{mapcache=orddict:erase(BKey,Cache),
+                            handoff_q=HQ},?TIMEOUT};
 active({get, FSM_pid, BKey, ReqID}, StateData) ->
     do_get(FSM_pid, BKey, ReqID, StateData),
     {next_state,active,StateData,?TIMEOUT};
-active({vnode_merkle, {RemoteVN,Merkle,ObjList}}, StateData) ->
-    Me = self(),
-    spawn(fun() -> do_merkle(Me,RemoteVN,Merkle,ObjList,StateData) end),
-    {next_state,active,StateData,?TIMEOUT};
 active({list_bucket, FSM_pid, Bucket, ReqID},
        StateData=#state{mod=Mod,modstate=ModState,idx=Idx}) ->
     do_list_bucket(FSM_pid,ReqID,Bucket,Mod,ModState,Idx),
     end,
     KeyCache = orddict:store({M,F,Arg,KeyData},MF_Res,KeyCache0),
     {next_state,active,
-     StateData#state{mapcache=orddict:store(BKey,KeyCache,Cache)},?TIMEOUT};
-active(merk_nodiff, StateData) ->
-    hometest(StateData);
-active({merk_diff,_TargetNode,_DiffList}, StateData) ->
-    hometest(StateData);
-active({resolved_diffobj,_K}, StateData) ->
-    hometest(StateData).
+     StateData#state{mapcache=orddict:store(BKey,KeyCache,Cache)},?TIMEOUT}.
 
 %% @private
 do_get(FSM_pid, BKey, ReqID,
     gen_fsm:send_event(ClientPid, Reply),
     NewState.
 
-do_merkle(Me,RemoteVN,RemoteMerkle,ObjList,StateData) ->
-    % given a RemoteMerkle over the ObjList from RemoteVN
-    % determine which elements in ObjList we differ on
-    MyMerkle = make_merk(StateData,ObjList),
-    case merkerl:diff(MyMerkle,RemoteMerkle) of
-        [] -> gen_fsm:send_event(RemoteVN,merk_nodiff);
-        DiffList -> gen_fsm:send_event(RemoteVN,{merk_diff,Me,DiffList})
-    end.
-
 %% @private
 syntactic_put_merge(Mod, ModState, BKey, Obj1, ReqId) ->
     case Mod:get(ModState, BKey) of
     end.
 
 %% @private
+do_fold(Fun, Acc0, _State=#state{mod=Mod, modstate=ModState}) ->
+    Mod:fold(ModState, Fun, Acc0).
+
+%% @private
 code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
 
 %% @private
     {next_state, StateName, State, ?TIMEOUT};
 handle_event({get_vclocks, From, KeyList}, StateName, State) ->
     gen_server2:reply(From, get_vclocks(KeyList, State)),
+    {next_state, StateName, State, ?TIMEOUT};
+handle_event({fold, {Fun, Acc0, From}}, StateName, State) ->
+    gen_server2:reply(From, do_fold(Fun, Acc0, State)),
     {next_state, StateName, State, ?TIMEOUT}.
 
 %% @private
-handle_sync_event(_Event, _From, _StateName, StateData) ->
+handle_sync_event({diffobj,{BKey,BinObj}}, _From, StateName, StateData) ->
+    case do_diffobj_put(BKey, binary_to_term(BinObj), StateData) of
+        ok ->
+            {reply, ok, StateName, StateData, ?TIMEOUT};
+        {error, Err} ->
+            error_logger:error_msg("Error storing handoff obj: ~p~n", [Err]),
+            {reply, {error, Err}, StateName, StateData, ?TIMEOUT}                   
+    end;
+handle_sync_event(_Even, _From, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 
 %% @private

File apps/riak/src/riak_vnode_master.erl

 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 	 terminate/2, code_change/3]).
 -record(idxrec, {idx, pid, monref}).
--record(state, {idxtab}).
+-record(state, {idxtab, excl=ordsets:new()}).
 
 start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 %% @private
 init([]) -> {ok, #state{idxtab=ets:new(riak_vnode_idx,[{keypos,2}])}}.
 
+
 %% @private
-handle_cast({start_vnode, Partition}, State) ->
+handle_cast({start_vnode, Partition}, State=#state{excl=Excl}) ->
     _Pid = get_vnode(Partition, State),
-    {noreply, State};
+    {noreply, State#state{excl=ordsets:del_element(Partition, Excl)}};
 handle_cast({vnode_map, {Partition,_Node},
              {ClientPid,QTerm,BKey,KeyData}}, State) ->
     Pid = get_vnode(Partition, State),
             {FSM_pid, Bucket, ReqID}}, State) ->
     Pid = get_vnode(Partition, State),
     gen_fsm:send_event(Pid, {list_bucket, FSM_pid, Bucket, ReqID}),
-    {noreply, State}.
+    {noreply, State};
+handle_cast({add_exclusion, Partition}, State=#state{excl=Excl}) ->
+    {noreply, State#state{excl=ordsets:add_element(Partition, Excl)}}.
 
 %% @private
 handle_call(all_possible_vnodes, _From, State) ->
     Pid = get_vnode(Partition, State),
     spawn(fun() -> gen_fsm:send_all_state_event(
                      Pid,{get_vclocks,From,KeyList}) end),
-    {noreply, State}.
-
+    {noreply, State};
+handle_call({fold, {Partition, Fun, Acc0}}, From, State) ->
+    Pid = get_vnode(Partition, State),
+    spawn(
+      fun() -> gen_fsm:send_all_state_event(Pid, {fold, {Fun,Acc0,From}}) end),
+    {noreply, State};
+handle_call({get_vnode, Partition}, _From, State) ->
+    {reply, {ok, get_vnode(Partition, State)}, State};
+handle_call(get_exclusions, _From, State=#state{excl=Excl}) ->
+    {reply, {ok, ordsets:to_list(Excl)}, State}.
 %% @private
 handle_info({'DOWN', MonRef, process, _P, _I}, State) ->
     delmon(MonRef, State),

File apps/riak/src/riakserver.proto

-message Helo {
-        required int32 version = 1;
-}
+message RiakObject_PB {
+        required bytes bucket = 1;
+        required bytes key = 2;
+        required bytes val = 3;
+}
+

File apps/webmachine/ebin/webmachine.app

 {application, webmachine,
  [{description, "webmachine"},
-  {vsn, "0.9"},
+  {vsn, "1.5.2"},
   {modules, [
              webmachine,
              webmachine_app,

File apps/webmachine/src/webmachine.app

 {application, webmachine,
  [{description, "webmachine"},
-  {vsn, "0.9"},
+  {vsn, "1.5.2"},
   {modules, [
     webmachine,
     webmachine_app,

File client_lib/jiak.rb

     # and 'write_mask'
     def set_bucket_schema(bucket, schema)
       schema['required_fields'] ||= []
-      schema['read_mask']       ||= schema['required_fields']
+      schema['read_mask']       ||= schema['allowed_fields']
       schema['write_mask']      ||= schema['read_mask']
 
       do_req(set_data(Net::HTTP::Put.new(path(bucket)),

File doc/admin.org

               * =riak_fs_backend_root= :: root directory where files
                    are stored
 
-         - =riak_osmos_backend= :: data is stored in osmos tables
-
-              Osmos must be installed separately.  It is available at
-              [[http://code.google.com/p/osmos/]].  More details are
-              available at
-              [[http://dukesoferl.blogspot.com/2009/07/osmos.html]].
-
-              * =riak_osmos_backend_root= :: root directory where
-                   osmos files are stored
-
          - =riak_multi_backend= :: enables storing data for different
               buckets in different backends
 
                to that file.
 
 ** Simple startup
-*** TODO simple example startup of one node
+
+   To start a Riak node, simply install riak (by either copying the
+   rel/riak directory from an existing build, or compiling with =make
+   rel= on the new machine), and then run =bin/riak start=.
+
+   The node will start in the background.  To attach to the running
+   node's Erlang console, run =bin/riak attach=.  Use Control-D to
+   exit the console, but leave the node running.
+
 ** Cluster startup
-*** TODO example of forming a cluster of two nodes
+
+   A single node is its own cluster.  To add new nodes to a cluster,
+   first start a new node, just as you would for solitary operation:
+   =bin/riak start=.
+
+   Once the new node is up, ask it to connect to the existing cluster
+   by running =bin/riak-admin join NODE@HOST=, where =NODE@HOST= is
+   the name of a node in the existing cluster (from the =-name=
+   argument in vm.args).  You should see a message of the form "Sent
+   join request to NODE@HOST" (see [[Startup Errors]] if you don't).
+
+   After the node has joined the cluster, you can verify that it has
+   claimed partitions by attaching to a console in the cluster and
+   requesting a copy of the claim ring:
+
+: 1> {ok, R} = riak_ring_manager:get_my_ring().
+: {ok,{chstate,...
+: 2> riak_ring:all_members(R).
+: ['riak@10.0.0.1','riak@10.0.0.2','riak@10.0.0.3']
+
+   You should see the names of all the nodes in your cluster in the
+   list returned from that command.
+
+   Re-starting a node, after it has been shut down is even easier.  As
+   long as you haven't removed the on-disk ringfile, you should only
+   need to run =bin/riak start=.  The startup will read the ringfile,
+   and automatically connect to the cluster it was part of when it
+   shut down.
+
 ** Verifying your Installation
-*** TODO show what happens with riak-admin test?
 
+   A simple way to verify a running Riak installation is with
+   =bin/riak-admin test=:
+
+: $ bin/riak-admin test
+:
+: =INFO REPORT==== 25-Jan-2010::14:09:08 ===
+: Successfully completed 1 read/write cycle to 'riak@127.0.0.1'
+
+   The script attempts to write a value, and then read it back.  If
+   all goes well, you should see output similar to the example above.
+   See [[Client Errors]] for help with error messages from this script.
+
+** Shutting down a node
+   
+   Stopping a Riak node can be done at any time, simply by running
+   =bin/riak stop=:
+
+: $ dev/dev3/bin/riak stop
+: ok
+
+   This halts the Riak node, but it does not change any claims on the
+   ring.  That is, the rest of the cluster still believes that the
+   node that just shut down is still responsible for storing some
+   slice of the cluster's data.
+
+   To change ring ownership, such that a node is no longer responsible
+   for storing any data, first get a riak client, then use
+   =remove_from_cluster/1= to move that node's claims to other nodes:
+
+: $ bin/riak attach
+: Attaching to ... (^D to exit)
+:
+: (riak@10.0.0.1)1> {ok, C} = riak:local_client().
+: {ok,{riak_client,'riak@10.0.0.1',<<0,166,141,137>>}}
+: (riak@10.0.0.1)2> C:remove_from_cluster('riak@10.0.0.3').
+: [ok,ok,ok,ok|...]
+
+   If the node that is leaving is alive and connected to the cluster,
+   it will start transfering data to the nodes that have claimed the
+   partitions it just owned.  If the node is down, no data transfer
+   happens.
+
+   Read and write requests immediately start hitting the nodes that
+   have claimed the partitions that the exiting node just gave up.
+   This means that some reads where R=N will fail for a time, until
+   data exists everywhere.  Data will exist everywhere it is supposed
+   to after handoff finishes, after a successful read repair, or after
+   a write to all partitions responsible for a value.
 
 * Client Interaction
 ** TODO Client Libraries
     + change the application version in the =.app= and
       =rel/reltool.config= files
 
-** TODO Startup Errors
+** Startup Errors
+
+*** Node is not running! / Node 'XXX' not responding to pings.
+
+    If the =bin/riak= or =bin/riak= refuse to connect to a node, one
+    of several things may be going awry:
+
+    - The node may not be running.  Use =ps= to check for running
+      instances of =beam=, the Erlang virtual machine.  The arguments
+      to that VM should include paths to your Riak installation.
+
+    - If =etc/vm.args= is using the short-form of Erlang node names
+      (without =...@IP-OR-HOSTNAME=), then DNS on the machine may be
+      configured incorrectly.  The easiest fix is to explicitly set
+      the hostname using the long form =-name NODE@IP-OR-HOSTNAME= in
+      =etc/vm.args=.
+
+    - Erlang distribution cookies may not match.  If you started the
+      Riak node with =bin/riak console=, or you are able to open a
+      console with =bin/riak attach=, check the node's cookie with
+      =erlang:get_cookie/0=:
+
+: (riak@127.0.0.1)1> erlang:get_cookie().
+: riak
+      
+      That cookie should match the cookie in =etc/vm.args=.  The
+      =bin/riak= and =bin/riak-admin= scripts should be using the same
+      cookie.
+
+      The startup or admin scripts may be tricked into using different
+      cookies by specifying an =ERL_FLAGS= environment variable.  If
+      you have specified such a variable in your shell, unset it, and
+      move those settings to =etc/vm.args=.
+
+*** TODO riak-admin join errors
+
+
 ** TODO Client Errors
+*** TODO riak-admin test errors
 
 * TODO FAQ
 

File doc/architecture.txt

  2. riak_ets_backend, which stores data in ETS tables (which makes it
     volatile storage, but great for debugging)
  3. riak_dets_backend, which stores data on-disk in DETS tables
- 4. riak_osmos_backend, which stores data in Osmos tables[5]
-
 
 [1] http://s3.amazonaws.com/AllThingsDistributed/sosp/amazon-dynamo-sosp2007.pdf
 [2] http://portal.acm.org/citation.cfm?doid=564585.564601
 [3] http://portal.acm.org/citation.cfm?id=359563
 [4] http://portal.acm.org/citation.cfm?id=704751
-[5] http://code.google.com/p/osmos/

File doc/basic-setup.txt

 
   Name of the module that implements the storage for a vnode.  The
   four backends that ship with Riak are riak_fs_backend,
-  riak_ets_backend, riak_dets_backend, and riak_osmos_backend. Some
+  riak_ets_backend, and riak_dets_backend. Some
   backends have their own set of configuration parameters.
 
   riak_fs_backend:
     riak_dets_backend_root: string
       The directory under which this backend will store its files.
 
-  riak_osmos_backend:
-    A backend that uses Osmos to store its data.
-    http://code.google.com/p/osmos/
-
-    riak_osmos_backend_root: string
-      The directory under which this backend will store its files.
-
-    riak_osmos_backend_block_size: integer
-      The "block size" configuration parameter for Osmos.
-      Defaults to 2048.
-
 
 Single-node Configuration
 ---

File doc/raw-http-howto.txt

 
    $ curl -X PUT -H "content-type: application/json" \
      http://127.0.0.1:8098/raw/example \
-     --data "{\"props\":{\"allow_mult\":true}}
+     --data "{\"props\":{\"allow_mult\":true}}"
    $ curl -X PUT -H "content-type: text/plain" \
      http://127.0.0.1:8098/raw/example/sib --data "one thing"
    $ curl -X PUT -H "content-type: text/plain" \
 header to multipart/mixed.  Riak will hand back each of the versions
 as a separate part of a multipart/mixed document:
 
-   $ curl -i -H "accept: multipart/mixed" http://127.0.0.1:8098/raw/sib/one
+   $ curl -i -H "accept: multipart/mixed" http://127.0.0.1:8098/raw/example/sib
    HTTP/1.1 300 Multiple Choices
    X-Riak-Vclock: a85hYGBgz2DKBVJszUlMCj+kMpgSGfNYGVy/xR/hg0iwMPg9FYMK2yELvzwvhEU1yyfGjVDh4hUIYXbJhmaocB5CGGgps/qNZEz1QAm2fc6lyNZmAQA=
    Vary: Accept, Accept-Encoding
    
    --IASZJsIrb8ykNEc1fuue9LitToc
    Content-Type: text/plain
-   Link: </raw/sib>; rel="up"
+   Link: </raw/example>; rel="up"
    Etag: 6YUKo8vXvtBAXD0Hjy8crv
    Last-Modified: Fri, 30 Oct 2009 18:12:03 GMT
    
-   goodbye
+   one thing
    --IASZJsIrb8ykNEc1fuue9LitToc
    Content-Type: text/plain
-   Link: </raw/sib>; rel="up"
+   Link: </raw/example>; rel="up"
    Etag: 4XcBgFOm0Ab517wWjNXeWc
    Last-Modified: Fri, 30 Oct 2009 18:11:58 GMT
    
-   hello
+   another
    --IASZJsIrb8ykNEc1fuue9LitToc--
 
 To resolve the conflict, just issue another PUT, with the body you

File rel/overlay/bin/riak-admin

             -s init stop
         ;;
 
-    backup | restore)
+    restore)
         ACTION=$1
         shift
 
 
         $ERTS_PATH/erl -noshell $NAME_PARAM riak_backup$NAME_HOST -setcookie $COOKIE \
                        -eval "riak_backup:$ACTION('$NODE', \"$FILENAME\")" -s init stop
+        ;;        
+
+    backup)
+        ACTION=$1
+        shift
+
+        if [ $# -lt 4 ]; then
+            echo "Usage: $SCRIPT $ACTION <node> <cookie> <filename> [node|all]"
+            exit 1
+        fi
+
+        NODE=$1
+        COOKIE=$2
+        FILENAME=$3
+        TYPE=$4
+
+        $ERTS_PATH/erl -noshell $NAME_PARAM riak_backup$NAME_HOST -setcookie $COOKIE \
+                       -eval "riak_backup:$ACTION('$NODE', \"$FILENAME\", \"$TYPE\")" -s init stop
         ;;
 
     logger)

File releasenotes/riak-0.7.1.txt

+------------------------
+Riak 0.7.1 Release Notes