Commits

Kevin Smith  committed deaeff6 Merge

Merging

  • Participants
  • Parent commits a6451f3, c3b3c9c

Comments (0)

Files changed (9)

File apps/js_data/ebin/js_data.app

 {application, js_data,
  [{description,  "Blub"},
   {vsn,          "0.1"},
-  {modules,      [jsd_generator]},
+  {modules,      [jsd_generator, mrstress]},
   {registered,   []},
   {applications, [kernel, stdlib]}]}.

File apps/riak/src/mapred_resource.erl

                                 {ok, ParsedQuery} ->
                                     {true, [], State#state{inputs=ParsedInputs,
                                                            mrquery=ParsedQuery}};
-                                error ->
+                                {error, Message} ->
                                     {false,
-                                     "An error occurred parsing "
-                                     "the \"query\" field.\n",
+                                     ["An error occurred parsing "
+                                      "the \"query\" field.\n",
+                                      Message],
                                      State}
                             end;
-                        error ->
+                        {error, Message} ->
                             {false,
-                             "An error occurred parsing the \"inputs\" field.\n",
+                             ["An error occurred parsing the \"inputs\" field.\n",
+                              Message],
                              State}
                     end;
                 false ->

File apps/riak/src/riak_backup.erl

 
     % Make sure all nodes in the cluster agree on the ring...
     ensure_synchronized(Ring, Members),
-    
+
     % Backup the data...
-    {ok, ?TABLE} = dets:open_file(?TABLE, [{file, Filename}]),
+    {ok, ?TABLE} = disk_log:open([{name, ?TABLE},
+                                  {file, Filename},
+                                  {mode, read_write},
+                                  {type, halt}]),
+
     [backup_node(Node) || Node <- Members],
-    ok = dets:sync(?TABLE),
-    ok = dets:close(?TABLE),
+    ok = disk_log:sync(?TABLE),
+    ok = disk_log:close(?TABLE),
     
     % Make sure the nodes are still synchronized...
     ensure_synchronized(Ring, Members),
     [backup_vnode(VNode) ||  VNode <- VNodes].
     
 backup_vnode(_VNode = {_Index, VNodePid}) ->
-    {ok, List} = gen_fsm:sync_send_event(VNodePid, list),
+    {ok, List} = gen_fsm:sync_send_event(VNodePid, list, infinity),
     [backup_key(VNodePid, Bucket, Key) || {Bucket, Key} <- List].
 
 backup_key(VNodePid, Bucket, Key) ->
-    {ok, B} = gen_fsm:sync_send_event(VNodePid, {get_binary, {Bucket, Key}}),
-    ok = dets:insert(?TABLE, [{{Bucket,Key}, B}]).
+    {ok, B} = gen_fsm:sync_send_event(VNodePid, {get_binary, {Bucket, Key}}, infinity),
+    ok = disk_log:log(?TABLE, B).
 
 
 %%% RESTORE %%%
     {ok, Client} = riak:client_connect(EntryNode),
     
     % Open the table, write it out, close the table...
-    {ok, ?TABLE} = dets:open_file(?TABLE, [{file, Filename}]),
-    Results = dets:traverse(?TABLE, fun(Entry) -> read_and_restore_function(Client, Entry) end),
-    ok = dets:close(?TABLE),
-
-    io:format("Restored ~p records.~n", [length(Results)]),
+    {ok, ?TABLE} = disk_log:open([{name, ?TABLE},
+                                  {file, Filename},
+                                  {mode, read_only},
+                                   {type, halt}]),
+    Count = traverse_backup(
+                disk_log:chunk(?TABLE, start), 
+                fun(Entry) -> read_and_restore_function(Client, Entry) end, 0),
+    ok = disk_log:close(?TABLE),
+    io:format("Restored ~p records.~n", [Count]),
     ok.
 
-read_and_restore_function(Client, {{Bucket, Key}, Value}) ->
-    Obj = binary_to_term(Value),
+traverse_backup(eof, _VisitorFun, Count) ->
+    Count;
+traverse_backup({Cont, Terms}, VisitorFun, Count) when is_list(Terms) ->
+    [VisitorFun(T) || T <- Terms],
+    traverse_backup(disk_log:chunk(?TABLE, Cont), 
+                    VisitorFun, Count+length(Terms)).
+    
 
+read_and_restore_function(Client, BinTerm) ->
+    Obj = binary_to_term(BinTerm),
+    Bucket = riak_object:bucket(Obj),
+    Key = riak_object:key(Obj),
     % Data Cleaning...
     Obj1 = make_binary_bucket(Bucket, Key, Obj),
 
     Obj2 = riak_object:update_metadata(Obj1, MetaData1),
     
     % Store the object...
-    Response = Client:put(Obj2,1,1,900000),
+    Response = Client:put(Obj2,1,1,1200000),
     {continue, Response}.
    
 %%% DATA CLEANING %%% 

File apps/riak/src/riak_console.erl

 %% @copyright 2007-2009 Basho Technologies, Inc.  All Rights Reserved.
 -module(riak_console).
 
--export([join/1]).
+-export([join/1, status/1, reip/1]).
 
 join([NodeStr]) ->
     case riak:join(NodeStr) of
     io:format("Join requires a node to join with.\n"),
     error.
 
+status([]) ->
+    case whereis(riak_stat) of
+        undefined ->
+            io:format("riak_stat is not enabled.~n");
+        _ ->
+            StatString =
+                format_stats(riak_stat:get_stats(), 
+                             ["-------------------------------------------\n",
+                              io_lib:format("1-minute stats for ~p~n",[node()])]),
+            io:format("~s~n", [StatString])
+    end.
 
+reip([OldNode, NewNode]) ->
+    application:load(riak),
+    RingStateDir = riak:get_app_env(ring_state_dir),
+    {ok, RingFile} = riak_ring_manager:find_latest_ringfile(),
+    BackupFN = filename:join([RingStateDir, filename:basename(RingFile)++".BAK"]),
+    {ok, _} = file:copy(RingFile, BackupFN),
+    io:format("Backed up existing ring file to ~p~n", [BackupFN]),
+    Ring = riak_ring_manager:read_ringfile(RingFile),
+    NewRing = riak_ring:rename_node(Ring, OldNode, NewNode),
+    riak_ring_manager:do_write_ringfile(NewRing),
+    io:format("New ring file written to ~p~n", 
+              [element(2, riak_ring_manager:find_latest_ringfile())]).
+
+
+format_stats([], Acc) ->
+    lists:reverse(Acc);
+format_stats([{vnode_gets, V}|T], Acc) ->
+    format_stats(T, [io_lib:format("vnode gets : ~p~n", [V])|Acc]);
+format_stats([{Stat, V}|T], Acc) ->
+    format_stats(T, [io_lib:format("~p : ~p~n", [Stat, V])|Acc]).
+    

File apps/riak/src/riak_get_fsm.erl

                   StateData=#state{req_id=ReqId,replied_fail=Replied0}) ->
     finalize(StateData#state{replied_fail=[{Err,Idx}|Replied0]});
 waiting_read_repair(timeout, StateData) ->
-    finalize(StateData).
+    really_finalize(StateData).
 
 finalize(StateData=#state{replied_r=R,replied_fail=F,replied_notfound=NF, n=N}) ->
     case (length(R) + length(F) + length(NF)) >= N of

File apps/riak/src/riak_mapred_json.erl

 
 parse_inputs(Bucket) when is_binary(Bucket) ->
     {ok, Bucket};
-parse_inputs(Targets) ->
-    parse_inputs(Targets, []).
+parse_inputs(Targets) when is_list(Targets) ->
+    parse_inputs(Targets, []);
+parse_inputs(Invalid) ->
+    {error, ["Unrecognized format of \"inputs\" field:",
+             "   ",mochijson2:encode(Invalid),
+             "\n\nValid formats are:\n"
+             "   - a bucket name, as a string\n"
+             "   - a list of bucket/key pairs\n"]}.
 
 parse_inputs([], Accum) ->
     if
         length(Accum) > 0 ->
             {ok, lists:reverse(Accum)};
         true ->
-            error
+            {error, "No inputs were given.\n"}
     end;
 parse_inputs([[Bucket, Key]|T], Accum) when is_binary(Bucket),
                                              is_binary(Key) ->
 parse_inputs([[Bucket, Key, KeyData]|T], Accum) when is_binary(Bucket),
                                                       is_binary(Key) ->
     parse_inputs(T, [{{Bucket, Key}, KeyData}|Accum]);
-parse_inputs(_, _Accum) ->
-    error.
+parse_inputs([Input|_], _Accum) ->
+    {error, ["Unrecognized format of input element:\n"
+             "   ",mochijson2:encode(Input),
+             "\n\nValid formats are:\n"
+             "   [Bucket, Key]\n"
+             "   [Bucket, Key, KeyData]\n"
+             "where Bucket and Key are strings\n"]}.
 
 parse_query(Query) ->
     parse_query(Query, []).
         length(Accum) > 0 ->
             {ok, lists:reverse(Accum)};
         true ->
-            error
+            {error, "No query phases were given\n"}
     end;
 parse_query([{struct, [{Type, {struct, StepDef}}]}|T], Accum)
   when Type =:= <<"map">>; Type =:= <<"reduce">>; Type =:= <<"link">> ->
                end,
     Keep = proplists:get_value(<<"keep">>, StepDef, T==[]),
     Step = case not(Keep =:= true orelse Keep =:= false) of
-               true -> error;
+               true ->
+                   {error, ["The \"keep\" field was not a boolean value in:\n"
+                            "   ",mochijson2:encode(
+                                    {struct,[{Type,{struct,StepDef}}]}),
+                            "\n"]};
                false ->
                    if StepType == link ->
                           case parse_link_step(StepDef) of
                       true -> % map or reduce
                            Lang = proplists:get_value(<<"language">>, StepDef),
                            case parse_step(Lang, StepDef) of
-                               error ->
-                                   error;
                                {ok, ParsedStep} ->
                                    Arg = proplists:get_value(<<"arg">>, StepDef, none),
-                                   {ok, {StepType, ParsedStep, Arg, Keep}}
+                                   {ok, {StepType, ParsedStep, Arg, Keep}};
+                               QError ->
+                                   QError
                            end
                    end
            end,
         {ok, S} -> parse_query(T, [S|Accum]);
         SError  -> SError
     end;
-parse_query(_, _Accum) ->
-    error.
+parse_query([Phase|_], _Accum) ->
+    {error, ["Unrecognized format of query phase:\n"
+             "   ",mochijson2:encode(Phase),
+             "\n\nValid formats are:\n"
+             "   {\"map\":{...spec...}}\n"
+             "   {\"reduce\":{...spec...}}\n"
+             "   {\"link\:{...spec}}\n"]};
+parse_query(Invalid, _Accum) ->
+    {error, ["The value of the \"query\" field was not a list:\n"
+             "   ",mochijson2:encode(Invalid),"\n"]}.
 
 parse_link_step(StepDef) ->
     Bucket = proplists:get_value(<<"bucket">>, StepDef, <<"_">>),
     Tag = proplists:get_value(<<"tag">>, StepDef, <<"_">>),
     case not(is_binary(Bucket) andalso is_binary(Tag)) of
         true ->
-            error;
+            {error, ["Invalid link step specification:\n"
+                     "   ",mochijson2:encode({struct,StepDef}),
+                     "\n\n \"bucket\" and \"tag\" fields"
+                     " must have string values.\n"]};
         false ->
             {ok, {if Bucket == <<"_">> -> '_';
                      true              -> Bucket
                 undefined ->
                     case Bucket of
                         undefined ->
-                            error;
+                            {error, ["No function specified in Javascript phase:\n"
+                                     "   ",mochijson2:encode({struct,StepDef}),
+                                     "\n\nFunctions may be specified by:\n"
+                                     "   - a \"source\" field, with source for"
+                                     " a Javascript function\n"
+                                     "   - a \"name\" field, naming a predefined"
+                                     " Javascript function\n"
+                                     "   - \"bucket\" and \"key\" fields,"
+                                     " specifying a Riak object containing"
+                                     " Javascript function source\n"]};
                         _ ->
                             case Key of
                                 undefined ->
-                                    error;
+                                    {error, ["Javascript phase was missing a"
+                                             " \"key\" field to match the \"bucket\""
+                                             " field, pointing to the function"
+                                             " to evaluate in:"
+                                             "   ",mochijson2:encode(
+                                                    {struct,StepDef}),
+                                             "\n"]};
                                 _ ->
                                     {ok, {jsanon, {Bucket, Key}}}
                             end
             {ok, {jsanon, Source}}
     end;
 parse_step(<<"erlang">>, StepDef) ->
-    Module = proplists:get_value(<<"module">>, StepDef),
-    Function = proplists:get_value(<<"function">>, StepDef),
-    bin_to_atom(Module, fun(A1) ->
-                                   bin_to_atom(Function, fun(A2) -> {modfun, A1, A2} end) end).
+    case bin_to_atom(proplists:get_value(<<"module">>, StepDef)) of
+        {ok, Module} ->
+            case bin_to_atom(proplists:get_value(<<"function">>, StepDef)) of
+                {ok, Function} ->
+                    {modfun, Module, Function};
+                error ->
+                    {error, ["Could not convert \"function\" field value"
+                             " to an atom in:"
+                             "   ",mochijson2:encode({struct, StepDef}),
+                             "\n"]}
+            end;
+        error ->
+            {error, ["Could not convert \"module\" field value"
+                     " to an atom in:"
+                     "   ",mochijson2:encode({struct, StepDef}),"\n"]}
+    end;
+parse_step(undefined, StepDef) ->
+    {error, ["No \"language\" was specified for the phase:\n",
+             "   ",mochijson2:encode({struct,StepDef}),"\n"]};
+parse_step(Language,StepDef) ->
+    {error, ["Unknown language ",mochijson2:encode(Language)," in phase:\n",
+             "   ",mochijson2:encode({struct,StepDef}),"\n"]}.
 
-bin_to_atom(Binary, Cont) ->
+bin_to_atom(Binary) ->
     L = binary_to_list(Binary),
-    Result = try
-                 list_to_existing_atom(L)
-             catch
-                 error:badarg ->
-                     try
-                         list_to_atom(L)
-                     catch
-                         error:badarg ->
-                             error
-                     end
-             end,
-    case Result of
-        error ->
-            error;
-        _ ->
-            Cont(Result)
+    try
+        {ok, list_to_atom(L)}
+    catch
+        error:badarg -> error
     end.

File apps/riak/src/riak_ring.erl

 
 -export([fresh/0,fresh/1,fresh/2,preflist/2,
 	 owner_node/1,all_members/1,num_partitions/1,all_owners/1,
-         transfer_node/3,reconcile/2, my_indices/1,
+         transfer_node/3, rename_node/3, reconcile/2, my_indices/1,
 	 index_owner/2,diff_nodes/2,random_node/1, random_other_index/1,
          get_meta/2, update_meta/3, equal_rings/2]).	 
 
                     meta=MyState#chstate.meta}
     end.
 
+% @doc  Rename OldNode to NewNode in a Riak ring.
+% @spec transfer_node(Idx :: integer(), Node :: term(), MyState :: chstate()) ->
+%           chstate()
+rename_node(State=#chstate{chring=Ring, nodename=ThisNode}, OldNode, NewNode) 
+  when is_atom(OldNode), is_atom(NewNode)  ->
+    State#chstate{
+      chring=lists:foldl(
+               fun({Idx, Owner}, AccIn) ->
+                       case Owner of
+                           OldNode -> 
+                               chash:update(Idx, NewNode, AccIn);
+                           _ -> AccIn
+                       end
+               end, Ring, riak_ring:all_owners(State)),
+      nodename=case ThisNode of OldNode -> NewNode; _ -> ThisNode end}.
+
 ancestors(RingStates) ->
     Ancest = [[O2 || O2 <- RingStates,
      vclock:descends(O1#chstate.vclock,O2#chstate.vclock),
     CHRing = chash:merge_rings(StateA#chstate.chring,StateB#chstate.chring),
     Meta = merge_meta(StateA#chstate.meta, StateB#chstate.meta),
     #chstate{nodename=MyNodeName,
-	    vclock=VClock,
-	    chring=CHRing,
-            meta=Meta}.
+             vclock=VClock,
+             chring=CHRing,
+             meta=Meta}.
 
 merge_meta(M1,M2) ->
     dict:merge(fun(_,D1,D2) -> pick_val(D1,D2) end, M1, M2).

File apps/riak/src/riak_ring_manager.erl

 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
 	 terminate/2, code_change/3]).
 -export([get_my_ring/0,set_my_ring/1,write_ringfile/0,prune_ringfiles/0,
-        read_ringfile/1,find_latest_ringfile/0]).
+        read_ringfile/1,find_latest_ringfile/0,do_write_ringfile/1]).
 
 start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
 start_link(test) -> % when started this way, run a mock server (no disk, etc)

File rel/overlay/bin/riak-admin

         $NODETOOL rpc riak_console join $@
         ;;
 
+    status)
+        # Make sure the local node IS running
+        RES=`$NODETOOL ping`
+        if [ "$RES" != "pong" ]; then
+            echo "Node is not running!"
+            exit 1
+        fi
+        shift
+        
+        $NODETOOL rpc riak_console status $@
+        ;;
+
+
+    reip)
+        ACTION=$1
+        shift
+        if [ $# -lt 2 ]; then
+            echo "Usage $SCRIPT $ACTION <old_nodename> <new_nodename>"
+            exit 1
+        fi
+        # Make sure the local node IS not running
+        RES=`$NODETOOL ping`
+        if [ "$RES" == "pong" ]; then
+            echo "Node must be down to re-ip."
+            exit 1
+        fi
+        OLDNODE=$1
+        NEWNODE=$2
+        $ERTS_PATH/erl -noshell \
+            -config $RUNNER_ETC_DIR/app.config \
+            -eval "riak_console:$ACTION(['$OLDNODE', '$NEWNODE'])" \
+            -s init stop
+        ;;
+
     backup | restore)
         ACTION=$1
         shift
 
         ;;
     *)
-        echo "Usage: $SCRIPT { join | backup | restore | logger | test }"
+        echo "Usage: $SCRIPT { join | backup | restore | logger | test | status | reip }"
         exit 1
         ;;
 esac