Anonymous avatar Anonymous committed 40650a5

More error propagation fixes

Comments (0)

Files changed (4)

apps/js_data/src/mrstress.erl

 -module(mrstress).
 
--define(INPUTS, [{<<"stress">>, <<"test1">>},
-                 {<<"stress">>, <<"test2">>},
-                 {<<"stress">>, <<"test3">>},
-                 {<<"stress">>, <<"test4">>},
-                 {<<"stress">>, <<"test5">>},
-                 {<<"stress">>, <<"test6">>},
-                 {<<"stress">>, <<"test7">>},
-                 {<<"stress">>, <<"test8">>},
-                 {<<"stress">>, <<"test9">>},
-                 {<<"stress">>, <<"test10">>}]).
-
 -compile([export_all]).
 
-s() ->
-    populate(),
-    mrstress:stress(mrstress:config(javascript, 1, 1)).
+s(InputSize) ->
+    populate(InputSize),
+    mrstress:stress(config(javascript, 1, 1, InputSize)).
 
-v() ->
-    populate(),
-    mrstress:stress(mrstress:config(javascript, 10, 250)).
+v(InputSize) ->
+    populate(InputSize),
+    mrstress:stress(config(javascript, 200, 250, InputSize)).
 
-populate() ->
+populate(InputSize) ->
     {ok, Client} = riak:client_connect('riak@127.0.0.1'),
     lists:foreach(fun({Bucket, Key}) ->
                           Obj = riak_object:new(Bucket, Key, <<"1">>),
-                          ok = Client:put(Obj, 1) end, ?INPUTS).
+                          ok = Client:put(Obj, 1) end, generate_inputs(<<"stress">>, InputSize)).
 
-config(Lang, Clients, Count) ->
-    [{lang, Lang}, {clients, Clients}, {count, Count}].
+config(Lang, Clients, Count, KeyCount) ->
+    [{lang, Lang}, {clients, Clients}, {count, Count}, {keys, KeyCount}].
 
 stress(Config) ->
     Lang = proplists:get_value(lang, Config),
     Count = proplists:get_value(count, Config, 100),
     Clients = proplists:get_value(clients, Config, 1),
+    KeyCount = proplists:get_value(keys, Config, 10),
     Start = erlang:now(),
-    start_test(Lang, Count, Clients),
+    start_test(Lang, Count, Clients, KeyCount),
     case wait_for_end(Clients) of
         ok ->
             End = erlang:now(),
             {error, run_timeout}
     end.
 
-start_test(_Lang, _Count, 0) ->
+start_test(_Lang, _Count, 0, _) ->
     ok;
-start_test(Lang, Count, Clients) ->
+start_test(Lang, Count, Clients, KeyCount) ->
     Owner = self(),
     spawn(fun() -> {ok, Client} = riak:client_connect('riak@127.0.0.1'),
-                   stress(Lang, Count, Client, Owner) end),
-    start_test(Lang, Count, Clients - 1).
+                   stress(Lang, Count, Client, Owner, KeyCount) end),
+    start_test(Lang, Count, Clients - 1, KeyCount).
 
-stress(_Lang, 0, _Client, Owner) ->
+stress(_Lang, 0, _Client, Owner, _) ->
     Owner ! done,
     ok;
-stress(javascript, Count, Client, Owner) ->
-    %%M = <<"function(v, _, _) { var value = v[\"values\"][0][\"data\"]; return [parseInt(value)]; }">>,
+stress(javascript, Count, Client, Owner, KeyCount) ->
+    M = <<"function(v, _, _) { var value = v[\"values\"][0][\"data\"]; return [parseInt(value)]; }">>,
     %R = <<"function(v, _) { var sum = 0; v.forEach(function(x) { sum = sum + x; }); return [sum]; }">>,
     %R1 = <<"function(v, _) { return v; }">>,
     %case Client:mapred(?INPUTS, [{map, {jsanon, M}, none, false},
     %                             {reduce, {jsanon, {<<"stress">>, <<"test10">>}}, none, false},
     %                             {reduce, {jsanon, R1}, none, true}]) of
-    case Client:mapred(?INPUTS, [{map, {jsanon, {<<"stress">>, <<"map">>}}, none, false},
+    case Client:mapred(generate_inputs(<<"stress">>, KeyCount), [{map, {jsanon, M}, none, false},
                                  {reduce, {jsfun, <<"Riak.reduceSum">>}, none, true}]) of
         {ok, [10]} ->
-            stress(javascript, Count - 1, Client, Owner);
+            stress(javascript, Count - 1, Client, Owner, KeyCount);
         {ok, WTF} ->
             io:format("Bailing!!!! WTF: ~p~n", WTF),
-            stress(javascript, 0, Client, Owner);
+            stress(javascript, 0, Client, Owner, KeyCount);
         Error ->
             io:format("(~p): ~p~n", [self(), Error]),
-            stress(javascript, Count, Client, Owner)
+            stress(javascript, 0, Client, Owner, KeyCount)
     end;
 
-stress(erlang, Count, Client, Owner) ->
+stress(erlang, Count, Client, Owner, KeyCount) ->
     M = fun(Obj, _, _) ->
                 Value = riak_object:get_value(Obj),
                 [list_to_integer(binary_to_list(Value))] end,
     R = fun(Values, _) -> lists:sum(Values) / length(Values) end,
-    case Client:mapred(?INPUTS, [{map, {qfun, M}, none, false},
-                                 {reduce, {qfun, R}, none, true}]) of
+    case Client:mapred(generate_inputs(<<"stress">>, KeyCount), [{map, {qfun, M}, none, false},
+                                                                 {reduce, {qfun, R}, none, true}]) of
         {ok, _Result} ->
-            stress(erlang, Count - 1, Client, Owner);
+            stress(erlang, Count - 1, Client, Owner, KeyCount);
         Error ->
             io:format("(~p): ~p~n", [self(), Error]),
-            stress(erlang, Count, Client, Owner)
+            stress(erlang, Count, Client, Owner, KeyCount)
     end.
+
+generate_inputs(Bucket, Size) ->
+    [{Bucket, list_to_binary("test" ++ integer_to_list(X))} || X <- lists:seq(1, Size)].

apps/riak/src/riak_client.erl

 collect_mr_results(ReqId, Timeout, Acc) ->
     receive
         {ReqId, done} -> {ok, Acc};
+        {ReqId, {error, _}=Error} ->
+            Error;
         {ReqId,{mr_results,Res}} ->
             collect_mr_results(ReqId,Timeout,Acc++Res)
     after Timeout ->

apps/riak/src/riak_js_vm.erl

                                      Error ->
                                          {Error, State}
                                  end;
-                             Error ->
-                                 {Error, State}
+                             {Error, State1} ->
+                                 {Error, State1}
                          end,
     {reply, Result, NewState};
 handle_call({dispatch, _JobId, {{jsfun, JS}, Reduced, Arg}}, _From, #state{ctx=Ctx}=State) ->
                                      Error ->
                                          {Error, State}
                                  end;
-                             Error ->
-                                 {Error, State}
+                             {_, _}=Error->
+                                 Error
                          end,
     case Result of
         {ok, ReturnValue} ->
                 undefined ->
                     {ok, R};
                 _ ->
-                    error_logger:warning_msg("Javascript evaluation error: ~p, source: ~p, line: ~p~n", [proplists:get_value(<<"message">>, R),
-                                                                                                         proplists:get_value(<<"source">>, R),
-                                                                                                         proplists:get_value(<<"lineno">>, R)]),
                     {error, R}
             end;
         R ->
                         true ->
                             {ok, State#state{last_reducer=Hash}}
                     end;
-                Error ->
+                {error, _}=Error ->
                     error_logger:warning_msg("Error defining Javascript expression: ~p~n", [Error]),
                     {Error, State}
             end

apps/riak/src/riak_reduce_phase_fsm.erl

 wait(timeout, StateData=#state{next_fsm=NextFSM, done=Done, acc=Acc, coord=Coord}) ->
     case perform_reduce(StateData) of
         {HasUpdates, NewStateData} when HasUpdates =:= true;
-                                    HasUpdates =:= false ->
+                                        HasUpdates =:= false ->
             case Done of
                 false ->
                     {next_state, wait, NewStateData};
                             nop;
                         true ->
                             riak_phase_proto:phase_results(Coord, NewStateData#state.reduced),
+                            riak_phase_proto:phase_done(Coord),
                             {stop, normal, NewStateData}
                     end
             end;
 js_reduce(QTerm, Reduced, Arg) ->
     case riak_js_manager:blocking_dispatch({QTerm, Reduced, Arg}) of
         {ok, Result} ->
-            Result;
+            {ok, Result};
         Error ->
             Error
     end.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.