Commits

Bryan Fink  committed 5729f13 Merge

merge

  • Participants
  • Parent commits 29962da, 1f356cc

Comments (0)

Files changed (6)

File apps/riak/src/riak_backup.erl

 
     % Make sure all nodes in the cluster agree on the ring...
     ensure_synchronized(Ring, Members),
-    
+
     % Backup the data...
-    {ok, ?TABLE} = dets:open_file(?TABLE, [{file, Filename}]),
+    {ok, ?TABLE} = disk_log:open([{name, ?TABLE},
+                                  {file, Filename},
+                                  {mode, read_write},
+                                  {type, halt}]),
+
     [backup_node(Node) || Node <- Members],
-    ok = dets:sync(?TABLE),
-    ok = dets:close(?TABLE),
+    ok = disk_log:sync(?TABLE),
+    ok = disk_log:close(?TABLE),
     
     % Make sure the nodes are still synchronized...
     ensure_synchronized(Ring, Members),
     [backup_vnode(VNode) ||  VNode <- VNodes].
     
 backup_vnode(_VNode = {_Index, VNodePid}) ->
-    {ok, List} = gen_fsm:sync_send_event(VNodePid, list),
+    {ok, List} = gen_fsm:sync_send_event(VNodePid, list, infinity),
     [backup_key(VNodePid, Bucket, Key) || {Bucket, Key} <- List].
 
 backup_key(VNodePid, Bucket, Key) ->
-    {ok, B} = gen_fsm:sync_send_event(VNodePid, {get_binary, {Bucket, Key}}),
-    ok = dets:insert(?TABLE, [{{Bucket,Key}, B}]).
+    {ok, B} = gen_fsm:sync_send_event(VNodePid, {get_binary, {Bucket, Key}}, infinity),
+    ok = disk_log:log(?TABLE, B).
 
 
 %%% RESTORE %%%
     {ok, Client} = riak:client_connect(EntryNode),
     
     % Open the table, write it out, close the table...
-    {ok, ?TABLE} = dets:open_file(?TABLE, [{file, Filename}]),
-    Results = dets:traverse(?TABLE, fun(Entry) -> read_and_restore_function(Client, Entry) end),
-    ok = dets:close(?TABLE),
-
-    io:format("Restored ~p records.~n", [length(Results)]),
+    {ok, ?TABLE} = disk_log:open([{name, ?TABLE},
+                                  {file, Filename},
+                                  {mode, read_only},
+                                   {type, halt}]),
+    Count = traverse_backup(
+                disk_log:chunk(?TABLE, start), 
+                fun(Entry) -> read_and_restore_function(Client, Entry) end, 0),
+    ok = disk_log:close(?TABLE),
+    io:format("Restored ~p records.~n", [Count]),
     ok.
 
-read_and_restore_function(Client, {{Bucket, Key}, Value}) ->
-    Obj = binary_to_term(Value),
+traverse_backup(eof, _VisitorFun, Count) ->
+    Count;
+traverse_backup({Cont, Terms}, VisitorFun, Count) when is_list(Terms) ->
+    [VisitorFun(T) || T <- Terms],
+    traverse_backup(disk_log:chunk(?TABLE, Cont), 
+                    VisitorFun, Count+length(Terms)).
+    
 
+read_and_restore_function(Client, BinTerm) ->
+    Obj = binary_to_term(BinTerm),
+    Bucket = riak_object:bucket(Obj),
+    Key = riak_object:key(Obj),
     % Data Cleaning...
     Obj1 = make_binary_bucket(Bucket, Key, Obj),
 
     Obj2 = riak_object:update_metadata(Obj1, MetaData1),
     
     % Store the object...
-    Response = Client:put(Obj2,1,1,900000),
+    Response = Client:put(Obj2,1,1,1200000),
     {continue, Response}.
    
 %%% DATA CLEANING %%% 

File apps/riak/src/riak_console.erl

 %% @copyright 2007-2009 Basho Technologies, Inc.  All Rights Reserved.
 -module(riak_console).
 
--export([join/1]).
+-export([join/1, status/1, reip/1]).
 
 join([NodeStr]) ->
     case riak:join(NodeStr) of
     io:format("Join requires a node to join with.\n"),
     error.
 
+status([]) ->
+    case whereis(riak_stat) of
+        undefined ->
+            io:format("riak_stat is not enabled.~n");
+        _ ->
+            StatString =
+                format_stats(riak_stat:get_stats(), 
+                             ["-------------------------------------------\n",
+                              io_lib:format("1-minute stats for ~p~n",[node()])]),
+            io:format("~s~n", [StatString])
+    end.
 
+reip([OldNode, NewNode]) ->
+    application:load(riak),
+    RingStateDir = riak:get_app_env(ring_state_dir),
+    {ok, RingFile} = riak_ring_manager:find_latest_ringfile(),
+    BackupFN = filename:join([RingStateDir, filename:basename(RingFile)++".BAK"]),
+    {ok, _} = file:copy(RingFile, BackupFN),
+    io:format("Backed up existing ring file to ~p~n", [BackupFN]),
+    Ring = riak_ring_manager:read_ringfile(RingFile),
+    NewRing = riak_ring:rename_node(Ring, OldNode, NewNode),
+    riak_ring_manager:do_write_ringfile(NewRing),
+    io:format("New ring file written to ~p~n", 
+              [element(2, riak_ring_manager:find_latest_ringfile())]).
+
+
+format_stats([], Acc) ->
+    lists:reverse(Acc);
+format_stats([{vnode_gets, V}|T], Acc) ->
+    format_stats(T, [io_lib:format("vnode gets : ~p~n", [V])|Acc]);
+format_stats([{Stat, V}|T], Acc) ->
+    format_stats(T, [io_lib:format("~p : ~p~n", [Stat, V])|Acc]).
+    

File apps/riak/src/riak_get_fsm.erl

                   StateData=#state{req_id=ReqId,replied_fail=Replied0}) ->
     finalize(StateData#state{replied_fail=[{Err,Idx}|Replied0]});
 waiting_read_repair(timeout, StateData) ->
-    finalize(StateData).
+    really_finalize(StateData).
 
 finalize(StateData=#state{replied_r=R,replied_fail=F,replied_notfound=NF, n=N}) ->
     case (length(R) + length(F) + length(NF)) >= N of

File apps/riak/src/riak_ring.erl

 
 -export([fresh/0,fresh/1,fresh/2,preflist/2,
 	 owner_node/1,all_members/1,num_partitions/1,all_owners/1,
-         transfer_node/3,reconcile/2, my_indices/1,
+         transfer_node/3, rename_node/3, reconcile/2, my_indices/1,
 	 index_owner/2,diff_nodes/2,random_node/1, random_other_index/1,
          get_meta/2, update_meta/3, equal_rings/2]).	 
 
                     meta=MyState#chstate.meta}
     end.
 
+% @doc  Rename OldNode to NewNode in a Riak ring.
+% @spec transfer_node(Idx :: integer(), Node :: term(), MyState :: chstate()) ->
+%           chstate()
+rename_node(State=#chstate{chring=Ring, nodename=ThisNode}, OldNode, NewNode) 
+  when is_atom(OldNode), is_atom(NewNode)  ->
+    State#chstate{
+      chring=lists:foldl(
+               fun({Idx, Owner}, AccIn) ->
+                       case Owner of
+                           OldNode -> 
+                               chash:update(Idx, NewNode, AccIn);
+                           _ -> AccIn
+                       end
+               end, Ring, riak_ring:all_owners(State)),
+      nodename=case ThisNode of OldNode -> NewNode; _ -> ThisNode end}.
+
 ancestors(RingStates) ->
     Ancest = [[O2 || O2 <- RingStates,
      vclock:descends(O1#chstate.vclock,O2#chstate.vclock),
     CHRing = chash:merge_rings(StateA#chstate.chring,StateB#chstate.chring),
     Meta = merge_meta(StateA#chstate.meta, StateB#chstate.meta),
     #chstate{nodename=MyNodeName,
-	    vclock=VClock,
-	    chring=CHRing,
-            meta=Meta}.
+             vclock=VClock,
+             chring=CHRing,
+             meta=Meta}.
 
 merge_meta(M1,M2) ->
     dict:merge(fun(_,D1,D2) -> pick_val(D1,D2) end, M1, M2).

File apps/riak/src/riak_ring_manager.erl

 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 	 terminate/2, code_change/3]).
 -export([get_my_ring/0,set_my_ring/1,write_ringfile/0,prune_ringfiles/0,
-        read_ringfile/1,find_latest_ringfile/0]).
+        read_ringfile/1,find_latest_ringfile/0,do_write_ringfile/1]).
 
 start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
 start_link(test) -> % when started this way, run a mock server (no disk, etc)

File rel/overlay/bin/riak-admin

         $NODETOOL rpc riak_console join $@
         ;;
 
+    status)
+        # Make sure the local node IS running
+        RES=`$NODETOOL ping`
+        if [ "$RES" != "pong" ]; then
+            echo "Node is not running!"
+            exit 1
+        fi
+        shift
+        
+        $NODETOOL rpc riak_console status $@
+        ;;
+
+
+    reip)
+        ACTION=$1
+        shift
+        if [ $# -lt 2 ]; then
+            echo "Usage $SCRIPT $ACTION <old_nodename> <new_nodename>"
+            exit 1
+        fi
+        # Make sure the local node IS not running
+        RES=`$NODETOOL ping`
+        if [ "$RES" == "pong" ]; then
+            echo "Node must be down to re-ip."
+            exit 1
+        fi
+        OLDNODE=$1
+        NEWNODE=$2
+        $ERTS_PATH/erl -noshell \
+            -config $RUNNER_ETC_DIR/app.config \
+            -eval "riak_console:$ACTION(['$OLDNODE', '$NEWNODE'])" \
+            -s init stop
+        ;;
+
     backup | restore)
         ACTION=$1
         shift
 
         ;;
     *)
-        echo "Usage: $SCRIPT { join | backup | restore | logger | test }"
+        echo "Usage: $SCRIPT { join | backup | restore | logger | test | status | reip }"
         exit 1
         ;;
 esac