1. Justin Sheehy
  2. riak

Commits

Kevin Smith  committed c062f0c

Lengthening M/R timeout and insuring sane timeouts get passed down to the various phases;Wired up a real 'make clean' pass on erlang_js to the top level Makefile

  • Participants
  • Parent commits c30af4a
  • Branches default

Comments (0)

Files changed (8)

File Makefile

View file
 	./rebar clean
 
 distclean: clean devclean relclean
+	@cd apps/erlang_js;make realclean
 
 test: 
 	./rebar eunit

File apps/js_data/src/mrstress.erl

View file
     ok;
 create_entries(Client, [{Bucket, Key}|T]) ->
     Obj = riak_object:new(Bucket, Key, <<"1">>),
-    FinalObj = case T of
-                   [] ->
-                       Obj;
-                   _ ->
-                       Next = hd(T),
-                       Md = dict:store(<<"Links">>, [{Next, <<"next">>}], dict:new()),
-                       Md1 = dict:store(<<"content-type">>, "text/plain", Md),
-                       riak_object:update_metadata(Obj, Md1)
-               end,
-    ok = Client:put(FinalObj, 1),
+    Md = dict:store(<<"content-type">>, "text/plain", dict:new()),
+    Client:put(riak_object:update_metadata(Obj, Md), 1),
     create_entries(Client, T).
 
 config(Lang, Clients, Count, KeyCount) ->
     [{lang, Lang}, {clients, Clients}, {count, Count}, {keys, KeyCount}].
 
 stress(Config) ->
+    {T1, T2, T3} = erlang:now(),
+    random:seed(T1, T2, T3),
     Lang = proplists:get_value(lang, Config),
     Count = proplists:get_value(count, Config, 100),
     Clients = proplists:get_value(clients, Config, 1),
     KeyCount = proplists:get_value(keys, Config, 10),
+    InputPercent = proplists:get_value(input_size, Config, 0.15),
+    InputSize = erlang:trunc(KeyCount * InputPercent),
+    io:format("Using ~p out of ~p entries per mapred call~n", [InputSize, KeyCount]),
     populate(KeyCount),
-    LogFile = proplists:get_value(log_file, Config, "/tmp/stress.log"),
+    RawSuffix = integer_to_list(calendar:datetime_to_gregorian_seconds(calendar:now_to_local_time(erlang:now()))),
+    Suffix = string:substr(RawSuffix, length(RawSuffix) - 5),
+    LogFile = proplists:get_value(log_file, Config, "/tmp/stress_" ++ Suffix ++ ".log"),
     stress_collector:start(LogFile),
-    start_test(Lang, Count, Clients, KeyCount),
+    Inputs = generate_inputs(<<"stress">>, KeyCount),
+    start_test(Lang, Count, Clients, InputSize, Inputs),
     wait_for_end(Clients).
 
 wait_for_end(0) ->
             wait_for_end(Clients - 1)
     end.
 
-start_test(_Lang, _Count, 0, _) ->
+start_test(_Lang, _Count, 0, _, _) ->
     ok;
-start_test(Lang, Count, Clients, KeyCount) ->
+start_test(Lang, Count, Clients, InputSize, Inputs) ->
     Owner = self(),
     spawn(fun() -> {ok, Client} = riak:client_connect('riak@127.0.0.1'),
-                   stress(Lang, Count, Client, Owner, KeyCount) end),
-    start_test(Lang, Count, Clients - 1, KeyCount).
+                   stress(Lang, Count, Client, Owner, Inputs, InputSize) end),
+    start_test(Lang, Count, Clients - 1, InputSize, Inputs).
 
-stress(_Lang, 0, _Client, Owner, _) ->
+stress(_Lang, 0, _Client, Owner, _, _) ->
     Owner ! done,
     ok;
-stress(javascript, Count, Client, Owner, KeyCount) ->
+stress(javascript, Count, Client, Owner, Inputs, InputSize) ->
     M = <<"function(v, _, _) { var value = v[\"values\"][0][\"data\"]; return [parseInt(value)]; }">>,
     R = <<"function(v, _) { var sum = 0; v.forEach(function(x) { sum = sum + x; }); return [sum]; }">>,
-    Inputs = generate_inputs(<<"stress">>, KeyCount),
-    Correct = length(Inputs),
+    Selected = select_inputs(Inputs, InputSize, []),
     Start = erlang:now(),
-    case Client:mapred(Inputs, [{map, {jsanon, M}, none, false},
-                                {reduce, {jsanon, R}, none, true}]) of
-        {ok, [Correct]} ->
+    case Client:mapred(Selected, [{map, {jsanon, M}, none, false},
+                                  {reduce, {jsanon, R}, none, true}]) of
+        {ok, [InputSize]} ->
             End = erlang:now(),
-            stress_collector:log(erlang:trunc(timer:now_diff(End, Start) / 1000), 0),
-            stress(javascript, Count - 1, Client, Owner, KeyCount);
+            stress_collector:log(timer:now_diff(End, Start), 0),
+            stress(javascript, Count - 1, Client, Owner, Inputs, InputSize);
         _Error ->
             End = erlang:now(),
-            stress_collector:log(0, erlang:trunc(timer:now_diff(End, Start) / 1000)),
-            stress(javascript, 0, Client, Owner, KeyCount)
+            io:format("Error: ~p~n", [_Error]),
+            stress_collector:log(0, timer:now_diff(End, Start)),
+            stress(javascript, 0, Client, Owner, Inputs, InputSize)
     end;
 
-stress(erlang, Count, Client, Owner, KeyCount) ->
+stress(erlang, Count, Client, Owner, Inputs, InputSize) ->
     M = fun(Obj, _, _) ->
                 Value = riak_object:get_value(Obj),
                 [list_to_integer(binary_to_list(Value))] end,
     R = fun(Values, _) -> [lists:sum(Values)] end,
-    Inputs = generate_inputs(<<"stress">>, KeyCount),
-    Correct = length(Inputs),
+    Selected = select_inputs(Inputs, InputSize, []),
+    Correct = length(Selected),
     Start = erlang:now(),
-    case Client:mapred(Inputs, [{map, {qfun, M}, none, false},
+    case Client:mapred(Selected, [{map, {qfun, M}, none, false},
                                 {reduce, {qfun, R}, none, true}]) of
         {ok, [Correct]} ->
             End = erlang:now(),
-            stress_collector:log(erlang:trunc(timer:now_diff(End, Start) / 1000), 0),
-            stress(erlang, Count - 1, Client, Owner, KeyCount);
+            stress_collector:log(timer:now_diff(End, Start), 0),
+            stress(erlang, Count - 1, Client, Owner, Inputs, InputSize);
         _Error ->
+            End = erlang:now(),
             io:format("Error: ~p~n", [_Error]),
-            End = erlang:now(),
-            stress_collector:log(0, erlang:trunc(timer:now_diff(End, Start) / 1000)),
-            stress(erlang, Count, Client, Owner, KeyCount)
+            stress_collector:log(0, timer:now_diff(End, Start)),
+            stress(erlang, Count, Client, Owner, Inputs, InputSize)
     end.
 
 generate_inputs(Bucket, Size) ->
     [{Bucket, list_to_binary("test" ++ integer_to_list(X))} || X <- lists:seq(1, Size)].
+
+select_inputs(_Inputs, InputSize, Accum) when length(Accum) == InputSize ->
+    Accum;
+select_inputs(Inputs, InputSize, Accum) ->
+    Pos = random:uniform(InputSize),
+    Input = lists:nth(Pos, Inputs),
+    case lists:member(Input, Accum) of
+        false ->
+            select_inputs(Inputs, InputSize, [Input|Accum]);
+        true ->
+            select_inputs(Inputs, InputSize, Accum)
+    end.

File apps/riak/src/riak_client.erl

View file
 -export ([add_event_handler/2, add_event_handler/3, add_event_handler/4]).
 -export ([remove_event_handler/3]).
 -export([get_stats/1]).
-%% @type default_timeout() = 15000
--define(DEFAULT_TIMEOUT, 15000).
+%% @type default_timeout() = 60000
+-define(DEFAULT_TIMEOUT, 60000).
 -define(DEFAULT_ERRTOL, 0.00003).
 
 %% @spec mapred(Inputs :: list(),

File apps/riak/src/riak_map_executor.erl

View file
 -module(riak_map_executor).
 -behaviour(gen_fsm).
 
--export([start_link/4]).
+-export([start_link/5]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 -export([wait/2]).
 
--record(state, {bkey,qterm,phase_pid,vnodes,keydata,ring}).
+-record(state, {bkey,qterm,phase_pid,vnodes,keydata,ring,timeout}).
 
 % {link, Bucket, Tag, Acc}
 % {map, FunTerm, Arg, Acc}
 % all map funs (and link funs) must return a list of values,
 % but that is not enforced at this layer
 
-start_link(Ring,{{_, _}, _}=Input,QTerm,PhasePid) ->
-    gen_fsm:start_link(?MODULE, [Ring,Input,QTerm,PhasePid], []);
-start_link(_Ring, _BadInput, _QTerm, _PhasePid) ->
+start_link(Ring, {{_, _}, _}=Input, QTerm, Timeout, PhasePid) ->
+    gen_fsm:start_link(?MODULE, [Ring,Input,QTerm,Timeout,PhasePid], []);
+start_link(_Ring, _BadInput, _QTerm, _Timeout, _PhasePid) ->
     {error, bad_input}.
 %% @private
-init([Ring,{{Bucket,Key},KeyData},QTerm0,PhasePid]) ->
+init([Ring,{{Bucket,Key},KeyData},QTerm0,Timeout,PhasePid]) ->
     riak_eventer:notify(riak_map_executor, mapexec_start, start),
     DocIdx = riak_util:chash_key({Bucket,Key}),
     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
             VNodes = try_vnode(QTerm, {Bucket,Key}, KeyData, Targets),
             {ok,wait,
              #state{bkey={Bucket,Key},qterm=QTerm,phase_pid=PhasePid,
-                    vnodes=VNodes,keydata=KeyData,ring=Ring},
-             1000}
+                    vnodes=VNodes,keydata=KeyData,ring=Ring,timeout=Timeout},
+             Timeout}
     end.
 
 try_vnode(QTerm, BKey, KeyData, [{P,VN}|VNs]) ->
     riak_phase_proto:mapexec_error(PhasePid, "all nodes failed"),
     {stop,normal,StateData};
 wait(timeout, StateData=
-     #state{vnodes=VNodes,qterm=QTerm,bkey=BKey,keydata=KeyData}) ->
+     #state{vnodes=VNodes,qterm=QTerm,bkey=BKey,keydata=KeyData,timeout=Timeout}) ->
     {next_state, wait, StateData#state{
                          vnodes=try_vnode(QTerm, BKey, KeyData, VNodes)},
-     1000};
+     Timeout};
 wait({mapexec_error, VN, ErrMsg},
      StateData=#state{phase_pid=PhasePid,vnodes=[]}) ->
     riak_eventer:notify(riak_map_executor, mapexec_vnode_err, {VN,ErrMsg}),
     riak_phase_proto:mapexec_error(PhasePid, ErrMsg),
     {stop, normal, StateData};
 wait({mapexec_error, VN, ErrMsg},StateData=
-     #state{vnodes=VNodes,qterm=QTerm,bkey=BKey,keydata=KeyData}) ->
+     #state{vnodes=VNodes,qterm=QTerm,bkey=BKey,keydata=KeyData,timeout=Timeout}) ->
     riak_eventer:notify(riak_map_executor, mapexec_vnode_err, {VN,ErrMsg}),
     {next_state, wait, StateData#state{
                          vnodes=try_vnode(QTerm, BKey, KeyData, VNodes)},
-     1000};
-wait({mapexec_reply, executing, _}, StateData) ->
-    {next_state, wait, StateData, 1000};
+     Timeout};
+wait({mapexec_reply, executing, _}, #state{timeout=Timeout}=StateData) ->
+    {next_state, wait, StateData, Timeout};
 wait({mapexec_reply, RetVal, _VN}, StateData=#state{phase_pid=PhasePid}) ->
     riak_phase_proto:mapexec_result(PhasePid, RetVal),
     {stop,normal,StateData}.

File apps/riak/src/riak_map_phase_fsm.erl

View file
 -module(riak_map_phase_fsm).
 -behaviour(gen_fsm).
 
--export([start_link/4]).
+-export([start_link/5]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 -export([wait/2]).
 
--record(state, {done,qterm,next_fsm,coord,acc,map_fsms,ring}).
+-record(state, {done,qterm,next_fsm,coord,acc,map_fsms,ring,timeout}).
 
-start_link(Ring,QTerm,NextFSM,Coordinator) ->
-    gen_fsm:start_link(?MODULE, [Ring,QTerm,NextFSM,Coordinator], []).
+start_link(Ring, QTerm, NextFSM, Coordinator, PhaseTimeout) ->
+    gen_fsm:start_link(?MODULE, [Ring, QTerm, NextFSM, Coordinator, PhaseTimeout], []).
 %% @private
-init([Ring,QTerm,NextFSM,Coordinator]) ->
+init([Ring, QTerm, NextFSM, Coordinator, PhaseTimeout]) ->
     {_,{_,_,_,Acc}} = QTerm,
     riak_eventer:notify(riak_map_phase_fsm, map_start, start),
     {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,
-                    coord=Coordinator,acc=Acc,map_fsms=[],ring=Ring}}.
+                    coord=Coordinator,acc=Acc,map_fsms=[],ring=Ring,timeout=PhaseTimeout}}.
 
 wait({mapexec_reply,Reply,MapFSM}, StateData=
-     #state{done=Done,next_fsm=NextFSM,coord=Coord,acc=Acc,map_fsms=FSMs0}) ->
+     #state{done=Done,next_fsm=NextFSM,coord=Coord,acc=Acc,map_fsms=FSMs0,timeout=Timeout}) ->
     FSMs = lists:delete(MapFSM,FSMs0),
     case NextFSM of
         final -> nop;
         true ->
             finish(StateData);
         false ->
-            {next_state, wait, StateData#state{map_fsms=FSMs}}
+            {next_state, wait, StateData#state{map_fsms=FSMs}, Timeout}
     end;
 
 wait({mapexec_error, _ErrFSM, ErrMsg}, StateData=
         _ -> riak_phase_proto:die(NextFSM)
     end,
     {stop,normal,StateData};
-wait(done, StateData=#state{map_fsms=FSMs}) ->
+wait(done, StateData=#state{map_fsms=FSMs, timeout=Timeout}) ->
     riak_eventer:notify(riak_map_phase_fsm, done_inputs, done_inputs),
     case FSMs of
         [] -> finish(StateData);
-        _ -> {next_state, wait, StateData#state{done=true}}
+        _ -> {next_state, wait, StateData#state{done=true}, Timeout}
     end;
-wait({input,Inputs0}, StateData=#state{qterm=QTerm,map_fsms=FSMs0,ring=Ring}) ->
+wait({input,Inputs0}, StateData=#state{qterm=QTerm,map_fsms=FSMs0,ring=Ring,timeout=Timeout}) ->
     Inputs = [convert_input(I) || I <- Inputs0],
-    NewFSMs = start_executors(Ring, Inputs, QTerm),
+    NewFSMs = start_executors(Ring, Inputs, QTerm, Timeout),
     FSMs = NewFSMs ++ FSMs0,
     if
         length(FSMs) == 0 ->
-            {next_state, wait, StateData#state{map_fsms=FSMs}, 100};
+            {next_state, wait, StateData#state{map_fsms=FSMs}, Timeout};
         true ->
-            {next_state, wait, StateData#state{map_fsms=FSMs}}
+            {next_state, wait, StateData#state{map_fsms=FSMs}, Timeout}
     end;
 wait(timeout, StateData) ->
     finish(StateData);
 convert_input(I) -> I.
 
 %% @private
-start_executors(Ring, Inputs, QTerm) ->
-    start_executors(Ring, Inputs, QTerm, []).
-start_executors(_Ring, [], _QTerm, Accum) ->
+start_executors(Ring, Inputs, QTerm, Timeout) ->
+    start_executors(Ring, Inputs, QTerm, Timeout, []).
+start_executors(_Ring, [], _QTerm, _Timeout, Accum) ->
     lists:reverse(Accum);
-start_executors(Ring, [H|T], QTerm, Accum) ->
-    case riak_map_executor:start_link(Ring, H, QTerm, self()) of
+start_executors(Ring, [H|T], QTerm, Timeout, Accum) ->
+    case riak_map_executor:start_link(Ring, H, QTerm, Timeout, self()) of
         {ok, FSM} ->
-            start_executors(Ring, T, QTerm, [FSM|Accum]);
+            start_executors(Ring, T, QTerm, Timeout, [FSM|Accum]);
         {error, bad_input} ->
             error_logger:warning_msg("Skipping map phase for input ~p. Map phase input must be {Bucket, Key}.~n", [H]),
-            start_executors(Ring, T, QTerm, Accum)
+            start_executors(Ring, T, QTerm, Timeout, Accum)
     end.

File apps/riak/src/riak_mapreduce_fsm.erl

View file
     gen_fsm:start(?MODULE, [ReqId,Query,Timeout,Client], []).
 %% @private
 init([ReqId,Query,Timeout,Client]) ->
+    EffectiveTimeout = erlang:trunc(Timeout  * 1.5),
+    PerPhaseTimeout = erlang:trunc(EffectiveTimeout / length(Query)),
     {ok, Ring} = riak_ring_manager:get_my_ring(),
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start, {ReqId, Query}),
     case check_query_syntax(Query) of
         {ok, Query1} ->
-            FSMs = make_phase_fsms(Query1, Ring), % Pid for each phase, in-order
+            FSMs = make_phase_fsms(Query1, Ring, PerPhaseTimeout), % Pid for each phase, in-order
             StateData = #state{client=Client,fsms=FSMs,reqid=ReqId,
-                               starttime=riak_util:moment(),timeout=Timeout,
+                               starttime=riak_util:moment(),timeout=EffectiveTimeout,
                                ring=Ring,input_done=false},
             {ok,wait,StateData,Timeout};
         {bad_qterm, QTerm} ->
     end;
 check_query_syntax([BadQTerm|_], _) -> {bad_qterm,BadQTerm}.
 
-make_phase_fsms(Query, Ring) ->
-    make_phase_fsms(lists:reverse(Query),final,[], Ring).
-make_phase_fsms([], _NextFSM, FSMs, _Ring) -> FSMs;
-make_phase_fsms([QTerm|Rest], NextFSM, FSMs, Ring) ->
+make_phase_fsms(Query, Ring, PerPhaseTimeout) ->
+    make_phase_fsms(lists:reverse(Query),final,[], Ring, PerPhaseTimeout).
+make_phase_fsms([], _NextFSM, FSMs, _Ring, _PerPhaseTimeout) -> FSMs;
+make_phase_fsms([QTerm|Rest], NextFSM, FSMs, Ring, PerPhaseTimeout) ->
     {ok, Pid} = case QTerm of
                     {_, {reduce, _, _, _}} ->
-                        riak_phase_sup:new_reduce_phase(Ring, QTerm, NextFSM, self());
+                        riak_phase_sup:new_reduce_phase(Ring, QTerm, NextFSM, self(), PerPhaseTimeout);
                     {_, {map, _, _, _}} ->
-                        riak_phase_sup:new_map_phase(Ring, QTerm, NextFSM, self());
+                        riak_phase_sup:new_map_phase(Ring, QTerm, NextFSM, self(), PerPhaseTimeout);
                     {_, {link, _, _, _}} ->
-                        riak_phase_sup:new_map_phase(Ring, QTerm, NextFSM, self())
+                        riak_phase_sup:new_map_phase(Ring, QTerm, NextFSM, self(), PerPhaseTimeout)
                 end,
-    make_phase_fsms(Rest,Pid,[Pid|FSMs], Ring).
+    make_phase_fsms(Rest,Pid,[Pid|FSMs], Ring, PerPhaseTimeout).
 
 wait({input,Inputs},
      StateData=#state{reqid=ReqId,timeout=Timeout,fsms=FSMs}) ->

File apps/riak/src/riak_phase_sup.erl

View file
 -behaviour(supervisor).
 
 %% API
--export([start_link/0, new_reduce_phase/4, new_map_phase/4]).
--export([new_map_executor/4]).
+-export([start_link/0, new_reduce_phase/5, new_map_phase/5]).
+-export([new_map_executor/5]).
 
 %% Supervisor callbacks
 -export([init/1]).
 
-new_reduce_phase(_Ring, QTerm, NextFSM, Requestor) ->
-    start_child(riak_reduce_phase_fsm, [QTerm, NextFSM, Requestor]).
+new_reduce_phase(_Ring, QTerm, NextFSM, Requestor, PerPhaseTimeout) ->
+    start_child(riak_reduce_phase_fsm, [QTerm, NextFSM, Requestor, PerPhaseTimeout]).
 
-new_map_phase(Ring, QTerm, NextFSM, Requestor) ->
-    start_child(riak_map_phase_fsm, [Ring, QTerm, NextFSM, Requestor]).
+new_map_phase(Ring, QTerm, NextFSM, Requestor, PerPhaseTimeout) ->
+    start_child(riak_map_phase_fsm, [Ring, QTerm, NextFSM, Requestor, PerPhaseTimeout]).
 
-new_map_executor(Ring, Input, QTerm, Requestor) ->
-    start_child(riak_map_executor, [Ring, Input, QTerm, Requestor]).
+new_map_executor(Ring, Input, QTerm, Requestor, Timeout) ->
+    start_child(riak_map_executor, [Ring, Input, QTerm, Requestor, Timeout]).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).

File apps/riak/src/riak_reduce_phase_fsm.erl

View file
 -module(riak_reduce_phase_fsm).
 -behaviour(gen_fsm).
 
--export([start_link/4]).
+-export([start_link/5]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 -export([wait/2]).
 
--record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input}).
+-record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input,timeout}).
 
-start_link(_Ring,QTerm,NextFSM,Coordinator) ->
-    gen_fsm:start_link(?MODULE, [QTerm,NextFSM,Coordinator], []).
+start_link(_Ring,QTerm,NextFSM,Coordinator,Timeout) ->
+    gen_fsm:start_link(?MODULE, [QTerm,NextFSM,Coordinator,Timeout], []).
 %% @private
-init([QTerm,NextFSM,Coordinator]) ->
+init([QTerm,NextFSM,Coordinator,Timeout]) ->
     {_,{_,_,_,Acc}} = QTerm,
     riak_eventer:notify(riak_reduce_phase_fsm, reduce_start, start),
     {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,fresh_input=false,
-                    coord=Coordinator,acc=Acc,reduced=[]}}.
+                    coord=Coordinator,acc=Acc,reduced=[],timeout=Timeout}}.
 
 wait(timeout, StateData=#state{next_fsm=NextFSM, done=Done, acc=Acc, coord=Coord}) ->
     case perform_reduce(StateData) of
     end;
 wait(done, StateData) ->
     {next_state, wait, StateData#state{done=true}, 1};
-wait({input,Inputs}, StateData=#state{reduced=Reduced}) ->
+wait({input,Inputs}, StateData=#state{reduced=Reduced, timeout=Timeout}) ->
     {next_state, wait,
-     StateData#state{reduced=Inputs ++ Reduced, fresh_input=true}, 100};
+     StateData#state{reduced=Inputs ++ Reduced, fresh_input=true}, Timeout};
 wait(die, StateData=#state{next_fsm=NextFSM}) ->
     riak_eventer:notify(riak_reduce_phase_fsm, die, die),
     case NextFSM of