Commits

Anonymous committed 58a7b94

Rewrite riak_backup:backup to hit nodes directly.

Comments (0)

Files changed (2)

 %%      cookie or a poorly-connected distributed erlang network.
 client_connect(Node) -> 
     % Make sure we can reach this node...
-    try pong = net_adm:ping(Node)
-    catch _ : _ -> throw({could_not_reach_node, Node}) end,
+    case net_adm:ping(Node) of
+        pang -> throw({could_not_reach_node, Node}) end;
+        pong -> ok
+    end,
         
     % Return the newly created node...
     {ok, riak_client:new(Node, riak_util:mkclientid(Node))}.

src/riak_backup.erl

 
 -module(riak_backup).
 
--export([dump_config/1,do_dump/1,restore_config/1,do_restore/1]).
--export([do_restore_mdbinary/1]).
+-export ([backup/2, restore/2]).
 
-%% @type dump_config_params() = list()
-%% @spec dump_config(dump_config_params()) -> term()
-%% @doc Configure/prep a node to perform backup for Cluster, using CookieStr.
-%%      The argument is a list of the form
-%%      [Cluster :atom(), CookieStr :: list()].
-dump_config([Cluster]) ->
-    RipConf = [{no_config, true}, {cluster_name, Cluster},
-       {ring_state_dir, "<nostore>"},
-       {ring_creation_size, 12}, {gossip_interval, 1000},
-       {wants_claim_fun, {riak_claim, never_wants_claim}},
-       {storage_backend, undefined}],
-    backup_config(RipConf).
+-define (TABLE, riak_backup_table).
 
-%% @type restore_config_params() = list()
-%% @spec restore_config(restore_config_params()) -> term()
-%% @doc Configure/prep a node to perform restore for Cluster, using CookieStr.
-%%      The argument is a list of the form
-%%      [Cluster :: atom(), CookieStr :: list()].
-restore_config([Cluster]) ->
-    TempDir = make_tmp_dir(),
-    RipConf = [{no_config, true}, {cluster_name, Cluster},
-       {ring_state_dir, "<nostore>"},
-       {ring_creation_size, 12}, {gossip_interval, 60000},
-       {wants_claim_fun, {riak_claim, never_wants_claim}},
-       {riak_web_ip, "undefined"}, {backup, true},
-       {riak_fs_backend_root, filename:join(TempDir, "storage")},
-       {storage_backend, undefined}],
-    backup_config(RipConf).
+%% @doc 
+%% Connect to the cluster of which EntryNode is a member, 
+%% read data from the cluster, and save the data in the specified file.
+backup(EntryNode, Filename) -> throw(not_yet_supported).
+    % Make sure we can reach the node...
+    ensure_connected(EntryNode),
 
-%% @private
-backup_config(RipConf) ->
-    application:stop(sasl),
-    application:unload(sasl),
-    ok = application:load({application,sasl,[{errlog_type,error}]}),
-    ok = application:start(sasl),
-    [application:set_env(riak,K,V) || {K,V} <- RipConf].
+    % Get a list of nodes...
+    Ring = rpc:call(EntryNode, riak_ring_manager, get_my_ring, []),
+    Members = riak_ring:all_members(Ring),
 
-%% @type dump_params() = list()
-%% @spec do_dump(dump_params()) -> ok
-%% @doc Connect to the cluster via IP:PortStr, and make a dumpfile at Filename.
-%%      The argument is a list of the form
-%%      [IP :: list(), PortStr :: list(), Filename :: list()].
-do_dump([Node, Filename]) when is_list(Node) -> do_dump([list_to_atom(Node), Filename]);
-do_dump([Node, Filename]) ->
-    ReqID = erlang:phash2({random:uniform(), self()}),
-    io:format("starting dump ID ~p~n", [ReqID]),
-    riak_startup:join_cluster(Node),
-    timer:sleep(5 * 1000),
-    All_I_VN = lists:flatten(
-          [gen_server:call({riak_vnode_master, X},all_possible_vnodes) ||
-                  X <- nodes()]),
-    IV_Lists = [{I, VN, gen_server2:call(VN,list)} || {I,VN} <- All_I_VN],
-    {ok, dumptable} = dets:open_file(dumptable, [{file, Filename}]),
-    dump_records(IV_Lists),
-    ok = dets:sync(dumptable),
-    ok = dets:close(dumptable),
-    io:format("dump ID ~p stored to ~p~n", [ReqID,Filename]),
+    % Make sure all nodes in the cluster agree on the ring...
+    ensure_synchronized(Ring, Members),
+    
+    % Backup the data...
+    {ok, backup_table} = dets:open_file(?TABLE, [{file, Filename}]),
+    [backup_node(Node) || Node <- Nodes],
+    ok = dets:sync(?TABLE),
+    ok = dets:close(?TABLE),
+    
+    % Make sure the nodes are still synchronized...
+    ensure_synchronized(Ring, Members),
     ok.
+    
+backup_node(Node) ->
+    VNodes = gen_server:call({riak_vnode_master, Node}, all_possible_vnodes),
+    [backup_vnode(VNode) ||  VNode <- VNodes].
+    
+backup_vnode(_VNode = {_Index, VNodePid}) ->
+    Keys = gen_server2:call(VN,list),
+    [backup_key(VNodePid, Key) || Key <- Keys].
 
-%% @private
-dump_records([]) -> ok;
-dump_records([{_I,VN,List}|IVL_Tail]) ->
-    dump_records1(VN,List),
-    dump_records(IVL_Tail).
-
-%% @private
-dump_records1(_VN,[]) -> ok;
-dump_records1(VN,[K|K_Tail]) ->
-    {ok, V} = gen_server2:call(VN, {get_binary, K}),
+backup_key(VNodePid, Key) ->
+    {ok, V} = gen_server2:call(VNodePid, {get_binary, K}),
     Obj = binary_to_term(V),
     Bucket = riak_object:bucket(Obj),
     Key = riak_object:key(Obj),
-    ok = dets:insert(dumptable, [{{Bucket,Key}, V}]),
-    dump_records1(VN,K_Tail).
+    ok = dets:insert(?TABLE, [{{Bucket,Key}, V}]).
 
-%% @type restore_params() = list()
-%% @spec do_restore(restore_params()) -> ok
-%% @doc Connect via IP:PortStr / Cookie, and restore using dump at Filename.
-%%      Note that this reconciles instead of blindly overwriting.
-%%      The argument is a list of the form
-%%      [IP :: list(), PortStr :: list(), Cookie :: list(), Filename :: list()].
-do_restore([Node, Filename]) when is_list(Node) -> do_restore([list_to_atom(Node), Filename]);
-do_restore([Node, Filename]) ->
-    ReqID = erlang:phash2({random:uniform(), self()}),
-    io:format("starting restore ID ~p~n", [ReqID]),
+
+
+
+%% @doc
+%% Read data from the specified file created by backup/2,
+%% and write it to the cluster of which EntryNode is a member.
+restore(EntryNode, Filename) ->
+    % Make sure we can reach the node...
+    ensure_connected(EntryNode),
+    
     {ok, r_table} = dets:open_file(r_table, [{file, Filename}]),
     {ok, Client} = riak:client_connect(Node),
     Trav = dets:traverse(r_table,
               {continue, {Bucket,Key,PutRes}}
       end),
     ok = dets:close(r_table),
-    io:format("restore ID ~p completed with ~p objects.~n",
-              [ReqID,length(Trav)]),
     ok.
+    
 
 %% @private
 make_tmp_dir() ->
       end),
     ok = dets:close(r_table),
     length(Trav).
+
+
+
+
+
+    
+    
+    
+
+
+
+
+
+
+
+
+
+
+ensure_connected(Node) ->
+    case net_adm:ping(Node) of
+        pang -> throw({could_not_reach_node, Node}),
+        pong -> ok
+    end.
+
+ensure_synchronized(Ring, Members) ->
+    F = fun(Node) ->
+        Ring2 = rpc:call(Node, riak_ring_manager, get_my_ring, []),
+        (Ring#chstate.vclock == Ring2#chstate.vclock) andalso
+        (Ring#chstate.chring == Ring2#chstate.chring) andalso
+        (Ring#chstate.meta == Ring2#chstate.meta)
+    end,
+    case lists:all(F, Members) of
+        true -> ok;
+        false -> throw({nodes_not_synchronized, Members})
+    end.
+
+pmap(Fun, List) ->
+    Workers = [spawn_worker(self(), Pred, Data) || X <- List],
+    [wait_result(Worker) || Worker <- Workers].
+
+spawn_worker(Parent, Fun, Data) ->
+    erlang:spawn_monitor(fun() -> Parent ! {self(), Fun(Data)} end).
+
+wait_result({Pid,Ref}) ->
+    receive
+        {'DOWN', Ref, _, _, normal} -> receive {Pid,Result} -> Result end;
+        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
+    end.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.