Anonymous avatar Anonymous committed 52f32dd

Finish switching riak_backup to work locally, clean up code, and add comments.

Comments (0)

Files changed (7)

 client_connect(Node) -> 
     % Make sure we can reach this node...
     case net_adm:ping(Node) of
-        pang -> throw({could_not_reach_node, Node}) end;
+        pang -> throw({could_not_reach_node, Node});
         pong -> ok
     end,
         

src/riak_backup.erl

 %%      restore will reconcile values with the existing data.
 
 -module(riak_backup).
+-export ([backup/2, restore/2]).
+-define (TABLE, riak_backup_table).
 
--export ([backup/2, restore/2]).
-
--define (TABLE, riak_backup_table).
+%%% BACKUP %%%
 
 %% @doc 
 %% Connect to the cluster of which EntryNode is a member, 
 %% read data from the cluster, and save the data in the specified file.
-backup(EntryNode, Filename) -> throw(not_yet_supported).
+backup(EntryNode, Filename) -> 
     % Make sure we can reach the node...
     ensure_connected(EntryNode),
 
     % Get a list of nodes...
-    Ring = rpc:call(EntryNode, riak_ring_manager, get_my_ring, []),
+    {ok, Ring} = rpc:call(EntryNode, riak_ring_manager, get_my_ring, []),
     Members = riak_ring:all_members(Ring),
 
+    % Print status...
+    io:format("Backing up to '~s'.~n", [Filename]),
+    io:format("...from ~p~n", [Members]),
+
     % Make sure all nodes in the cluster agree on the ring...
     ensure_synchronized(Ring, Members),
     
     % Backup the data...
-    {ok, backup_table} = dets:open_file(?TABLE, [{file, Filename}]),
-    [backup_node(Node) || Node <- Nodes],
+    {ok, ?TABLE} = dets:open_file(?TABLE, [{file, Filename}]),
+    [backup_node(Node) || Node <- Members],
     ok = dets:sync(?TABLE),
     ok = dets:close(?TABLE),
     
     [backup_vnode(VNode) ||  VNode <- VNodes].
     
 backup_vnode(_VNode = {_Index, VNodePid}) ->
-    Keys = gen_server2:call(VN,list),
-    [backup_key(VNodePid, Key) || Key <- Keys].
+    List = gen_server2:call(VNodePid,list),
+    [backup_key(VNodePid, Bucket, Key) || {Bucket, Key} <- List].
 
-backup_key(VNodePid, Key) ->
-    {ok, V} = gen_server2:call(VNodePid, {get_binary, K}),
-    Obj = binary_to_term(V),
-    Bucket = riak_object:bucket(Obj),
-    Key = riak_object:key(Obj),
-    ok = dets:insert(?TABLE, [{{Bucket,Key}, V}]).
+backup_key(VNodePid, Bucket, Key) ->
+    {ok, B} = gen_server2:call(VNodePid, {get_binary, {Bucket, Key}}),
+    ok = dets:insert(?TABLE, [{{Bucket,Key}, B}]).
 
 
 
+%%% RESTORE %%%
 
 %% @doc
 %% Read data from the specified file created by backup/2,
 %% and write it to the cluster of which EntryNode is a member.
 restore(EntryNode, Filename) ->
-    % Make sure we can reach the node...
-    ensure_connected(EntryNode),
+    io:format("Restoring from '~s' to cluster to which '~s' belongs.~n", [Filename, EntryNode]),
     
-    {ok, r_table} = dets:open_file(r_table, [{file, Filename}]),
-    {ok, Client} = riak:client_connect(Node),
-    Trav = dets:traverse(r_table,
-      fun({{Bucket,Key},V}) ->
-              RObj0 = binary_to_term(V),
-              RObj = riak_object:update_metadata(RObj0,
-                       dict:store("no_update",no_update,
-                         riak_object:get_update_metadata(RObj0))),
-              PutRes = Client:put(RObj,1,1,900000),
-              {continue, {Bucket,Key,PutRes}}
-      end),
-    ok = dets:close(r_table),
+    % Connect to the node...
+    {ok, Client} = riak:client_connect(EntryNode),
+    
+    % Open the table, write it out, close the table...
+    {ok, ?TABLE} = dets:open_file(?TABLE, [{file, Filename}]),
+    Results = dets:traverse(?TABLE, fun(Entry) -> read_and_restore_function(Client, Entry) end),
+    ok = dets:close(?TABLE),
+
+    io:format("Restored ~p records.~n", [length(Results)]),
     ok.
+
+read_and_restore_function(Client, {{Bucket, Key}, Value}) ->
+    Obj = binary_to_term(Value),
+
+    % Data Cleaning...
+    Obj1 = make_binary_bucket(Bucket, Key, Obj),
+
+    % Use the existing metadata, and tell Riak not to 
+    % update the X-Riak-VTag or X-Riak-Last-Modified values.
+    MetaData = hd(riak_object:get_metadatas(Obj1)), 
+    MetaData1 = dict:store("no_update", no_update, MetaData),
+    Obj2 = riak_object:update_metadata(Obj1, MetaData1),
     
+    % Store the object...
+    Response = Client:put(Obj2,1,1,900000),
+    {continue, Response}.
+   
+%%% DATA CLEANING %%% 
+    
+%% If the bucket name is an atom, convert it to a binary...
+make_binary_bucket(Bucket, Key, OriginalObj) when is_atom(Bucket) ->
+    Bucket1 = list_to_binary(atom_to_list(Bucket)),
+    OriginalContents = riak_object:get_contents(OriginalObj),
+    OriginalVClock = riak_object:vclock(OriginalObj),
+
+    % We can't change the bucket name without creating a new object...
+    NewObj = riak_object:new(Bucket1, Key, placeholder),
+    NewObj1 = riak_object:set_contents(NewObj, OriginalContents),
+    _NewObj2 = riak_object:set_vclock(NewObj1, OriginalVClock);
+    
+%% If the bucket name is a binary, just pass it on through...
+make_binary_bucket(Bucket, _Key, Obj) when is_binary(Bucket) -> Obj.
 
 %% @private
-make_tmp_dir() ->
-    TmpId = io_lib:format("riptemp.~p",
-                          [erlang:phash2({random:uniform(),self()})]),
-    TempDir = filename:join("/tmp", TmpId),
-    case filelib:is_dir(TempDir) of
-        true -> make_tmp_dir();
-        false ->
-            ok = file:make_dir(TempDir),
-            TempDir
-    end.
-
-do_restore_mdbinary(Filename) ->
-    {ok, r_table} = dets:open_file(r_table, [{file, Filename}]),
-    {ok, Client} = riak:local_client(),
-    Trav = dets:traverse(r_table,
-      fun({{Bucket,Key},V}) ->
-              RObj0 = binary_to_term(V),
-              MD0 = dict:store("no_update",no_update, 
-                               riak_object:get_update_metadata(RObj0)),
-              {ObjMD,_} = hd(riak_object:get_contents(RObj0)),
-              MD1 = case dict:find("X-Riak-VTag", ObjMD) of
-                  {ok,VTag} -> dict:store(<<"X-Riak-VTag">>,VTag,MD0);
-                  error -> MD0
-              end,
-              MD2 = case dict:find("X-Riak-Last-Modified", ObjMD) of
-                  {ok,LM} -> dict:store(<<"X-Riak-Last-Modified">>,LM,MD1);
-                  error -> MD1
-              end,
-              RObj = riak_object:update_metadata(RObj0,MD2),
-              PutRes = Client:put(RObj,1,1,900000),
-              {continue, {Bucket,Key,PutRes}}
-      end),
-    ok = dets:close(r_table),
-    length(Trav).
-
-
-
-
-
-    
-    
-    
-
-
-
-
-
-
-
-
-
-
+%% Try to reach the specified node, throw exception on failure.
 ensure_connected(Node) ->
     case net_adm:ping(Node) of
-        pang -> throw({could_not_reach_node, Node}),
+        pang -> throw({could_not_reach_node, Node});
         pong -> ok
     end.
 
+%% @private
+%% Make sure that rings of all members are synchronized, 
+%% throw exception on failure.
 ensure_synchronized(Ring, Members) ->
     F = fun(Node) ->
-        Ring2 = rpc:call(Node, riak_ring_manager, get_my_ring, []),
-        (Ring#chstate.vclock == Ring2#chstate.vclock) andalso
-        (Ring#chstate.chring == Ring2#chstate.chring) andalso
-        (Ring#chstate.meta == Ring2#chstate.meta)
+        {ok, Ring2} = rpc:call(Node, riak_ring_manager, get_my_ring, []),
+        riak_ring:equal_rings(Ring, Ring2)
     end,
     case lists:all(F, Members) of
         true -> ok;
         false -> throw({nodes_not_synchronized, Members})
     end.
 
-pmap(Fun, List) ->
-    Workers = [spawn_worker(self(), Pred, Data) || X <- List],
-    [wait_result(Worker) || Worker <- Workers].
-
-spawn_worker(Parent, Fun, Data) ->
-    erlang:spawn_monitor(fun() -> Parent ! {self(), Fun(Data)} end).
-
-wait_result({Pid,Ref}) ->
-    receive
-        {'DOWN', Ref, _, _, normal} -> receive {Pid,Result} -> Result end;
-        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
-    end.
+% pmap(Fun, List) ->
+%     Workers = [spawn_worker(self(), Pred, Data) || X <- List],
+%     [wait_result(Worker) || Worker <- Workers].
+% 
+% spawn_worker(Parent, Fun, Data) ->
+%     erlang:spawn_monitor(fun() -> Parent ! {self(), Fun(Data)} end).
+% 
+% wait_result({Pid,Ref}) ->
+%     receive
+%         {'DOWN', Ref, _, _, normal} -> receive {Pid,Result} -> Result end;
+%         {'DOWN', Ref, _, _, Reason} -> exit(Reason)
+%     end.

src/riak_ring.erl

 	 owner_node/1,all_members/1,num_partitions/1,all_owners/1,
          transfer_node/3,reconcile/2, my_indices/1,
 	 index_owner/2,diff_nodes/2,random_node/1, random_other_index/1,
-         get_meta/2, update_meta/3]).	 
+         get_meta/2, update_meta/3, equal_rings/2]).	 
 
 % @type riak_ring(). The opaque data type used for partition ownership.
 -record(chstate, {

start-backup-dump.sh

-#!/usr/bin/env bash
-# ./start-backup-dump.sh <clustername> <node> <cookie> <filename>
-#
-# This will:
-#  1. Join riak cluster <clustername> at <node> using <cookie>
-#  2. Dump the entire cluster's contents to <filename>
-if [ $# -lt 4 ]; then
-    echo Usage: 1>&2
-    echo "    `basename $0` <clustername> <node> <cookie> <filename>" 1>&2
-    exit 1
-fi
-. riak-env.sh
-CLUSTERNAME=$1
-NODE=$2
-COOKIE=$3
-FILENAME=$4
-erl -noshell -pa deps/*/ebin -pa ebin -name backup_dumper -setcookie $COOKIE -run riak_backup dump_config $CLUSTERNAME -run riak start -run riak_backup do_dump $NODE $FILENAME -run init stop

start-backup-restore.sh

-#!/usr/bin/env bash
-# ./start-backup-restore.sh <clustername> <node> <cookie> <filename>
-# This will:
-#  1. Join riak cluster <clustername> at <node> using <cookie>
-#  2. Overwrite cluster data with data contained in <filename>
-if [ $# -lt 4 ]; then
-    echo Usage: 1>&2
-    echo "    `basename $0` <clustername> <node> <cookie> <filename>" 1>&2
-    exit 1
-fi
-. riak-env.sh
-CLUSTERNAME=$1
-NODE=$2
-COOKIE=$3
-FILENAME=$4
-erl -noshell -pa deps/*/ebin -pa ebin -name backup_restore -setcookie $COOKIE -run riak_backup restore_config $CLUSTERNAME -run riak start -run riak_backup do_restore $NODE $FILENAME -run init stop
-
+#!/usr/bin/env bash
+# ./start-backup.sh <node> <cookie> <filename>
+#
+# This will:
+#  1. Join riak cluster of which <node> is a member, using <cookie>.
+#  2. Dump the entire cluster's contents to <filename>.
+
+if [ $# -lt 3 ]; then
+    echo Usage: 1>&2
+    echo "    `basename $0` <node> <cookie> <filename>" 1>&2
+    exit 1
+fi
+. riak-env.sh
+NODE=$1
+COOKIE=$2
+FILENAME=$3
+erl -noshell -pa deps/*/ebin -pa ebin -name riak_backup -setcookie $COOKIE -eval "riak_backup:backup('$NODE', \"$FILENAME\")." -run init stop
+#!/usr/bin/env bash
+# ./start-restore.sh<node> <cookie> <filename>
+# This will:
+#  1. Join riak cluster of which <node> is a member, using <cookie>.
+#  2. Overwrite cluster data with data contained in <filename>
+
+if [ $# -lt 3 ]; then
+    echo Usage: 1>&2
+    echo "    `basename $0` <node> <cookie> <filename>" 1>&2
+    exit 1
+fi
+. riak-env.sh
+NODE=$1
+COOKIE=$2
+FILENAME=$3
+erl -noshell -pa deps/*/ebin -pa ebin -name riak_restore -setcookie $COOKIE -eval "riak_backup:restore('$NODE', \"$FILENAME\")." -run init stop
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.