Commits

Anonymous committed a2988b9

Fixing issues w/link walking probably introduced when Javascript was enabled. (Wolf shirt, here I come!)

Comments (0)

Files changed (8)

apps/js_data/ebin/js_data.app

 {application, js_data,
  [{description,  "Blub"},
   {vsn,          "0.1"},
-  {modules,      [jsd_generator, mrstress]},
+  {modules,      [jsd_generator, mrstress, stress_collector]},
   {registered,   []},
   {applications, [kernel, stdlib]}]}.

apps/js_data/src/mrstress.erl

 
 -compile([export_all]).
 
-s(InputSize) ->
-    populate(InputSize),
-    mrstress:stress(config(javascript, 1, 1, InputSize)).
-
-v(InputSize) ->
-    populate(InputSize),
-    mrstress:stress(config(javascript, 200, 250, InputSize)).
-
 populate(InputSize) ->
     {ok, Client} = riak:client_connect('riak@127.0.0.1'),
-    lists:foreach(fun({Bucket, Key}) ->
-                          Obj = riak_object:new(Bucket, Key, <<"1">>),
-                          ok = Client:put(Obj, 1) end, generate_inputs(<<"stress">>, InputSize)).
+    create_entries(Client, generate_inputs(<<"stress">>, InputSize)).
+
+create_entries(_Client, []) ->
+    ok;
+create_entries(Client, [{Bucket, Key}|T]) ->
+    Obj = riak_object:new(Bucket, Key, <<"1">>),
+    FinalObj = case T of
+                   [] ->
+                       Obj;
+                   _ ->
+                       Next = hd(T),
+                       Md = dict:store(<<"Links">>, [{Next, <<"next">>}], dict:new()),
+                       Md1 = dict:store(<<"content-type">>, "text/plain", Md),
+                       riak_object:update_metadata(Obj, Md1)
+               end,
+    ok = Client:put(FinalObj, 1),
+    create_entries(Client, T).
 
 config(Lang, Clients, Count, KeyCount) ->
     [{lang, Lang}, {clients, Clients}, {count, Count}, {keys, KeyCount}].
     Count = proplists:get_value(count, Config, 100),
     Clients = proplists:get_value(clients, Config, 1),
     KeyCount = proplists:get_value(keys, Config, 10),
-    Start = erlang:now(),
+    populate(KeyCount),
+    LogFile = proplists:get_value(log_file, Config, "/tmp/stress.log"),
+    stress_collector:start(LogFile),
     start_test(Lang, Count, Clients, KeyCount),
-    case wait_for_end(Clients) of
-        ok ->
-            End = erlang:now(),
-            Elapsed = timer:now_diff(End, Start),
-            {Elapsed, (Elapsed / Clients) / Count};
-        Error ->
-            Error
-    end.
+    wait_for_end(Clients).
 
 wait_for_end(0) ->
-    ok;
+    timer:sleep(1000),
+    stress_collector:test_complete();
 wait_for_end(Clients) ->
     receive
         done ->
             wait_for_end(Clients - 1)
-    after 300000 ->
-            {error, run_timeout}
     end.
 
 start_test(_Lang, _Count, 0, _) ->
     ok;
 stress(javascript, Count, Client, Owner, KeyCount) ->
     M = <<"function(v, _, _) { var value = v[\"values\"][0][\"data\"]; return [parseInt(value)]; }">>,
-    %R = <<"function(v, _) { var sum = 0; v.forEach(function(x) { sum = sum + x; }); return [sum]; }">>,
-    %R1 = <<"function(v, _) { return v; }">>,
-    %case Client:mapred(?INPUTS, [{map, {jsanon, M}, none, false},
-    %                             {reduce, {jsanon, {<<"stress">>, <<"test10">>}}, none, false},
-    %                             {reduce, {jsanon, R1}, none, true}]) of
-    case Client:mapred(generate_inputs(<<"stress">>, KeyCount), [{map, {jsanon, M}, none, false},
-                                 {reduce, {jsfun, <<"Riak.reduceSum">>}, none, true}]) of
-        {ok, [10]} ->
+    R = <<"function(v, _) { var sum = 0; v.forEach(function(x) { sum = sum + x; }); return [sum]; }">>,
+    Inputs = generate_inputs(<<"stress">>, KeyCount),
+    Correct = length(Inputs),
+    Start = erlang:now(),
+    case Client:mapred(Inputs, [{map, {jsanon, M}, none, false},
+                                {reduce, {jsanon, R}, none, true}]) of
+        {ok, [Correct]} ->
+            End = erlang:now(),
+            stress_collector:log(erlang:trunc(timer:now_diff(End, Start) / 1000), 0),
             stress(javascript, Count - 1, Client, Owner, KeyCount);
-        {ok, WTF} ->
-            io:format("Bailing!!!! WTF: ~p~n", WTF),
-            stress(javascript, 0, Client, Owner, KeyCount);
-        Error ->
-            io:format("(~p): ~p~n", [self(), Error]),
+        _Error ->
+            End = erlang:now(),
+            stress_collector:log(0, erlang:trunc(timer:now_diff(End, Start) / 1000)),
             stress(javascript, 0, Client, Owner, KeyCount)
     end;
 
     M = fun(Obj, _, _) ->
                 Value = riak_object:get_value(Obj),
                 [list_to_integer(binary_to_list(Value))] end,
-    R = fun(Values, _) -> lists:sum(Values) / length(Values) end,
-    case Client:mapred(generate_inputs(<<"stress">>, KeyCount), [{map, {qfun, M}, none, false},
-                                                                 {reduce, {qfun, R}, none, true}]) of
-        {ok, _Result} ->
+    R = fun(Values, _) -> [lists:sum(Values)] end,
+    Inputs = generate_inputs(<<"stress">>, KeyCount),
+    Correct = length(Inputs),
+    Start = erlang:now(),
+    case Client:mapred(Inputs, [{map, {qfun, M}, none, false},
+                                {reduce, {qfun, R}, none, true}]) of
+        {ok, [Correct]} ->
+            End = erlang:now(),
+            stress_collector:log(erlang:trunc(timer:now_diff(End, Start) / 1000), 0),
             stress(erlang, Count - 1, Client, Owner, KeyCount);
-        Error ->
-            io:format("(~p): ~p~n", [self(), Error]),
+        _Error ->
+            io:format("Error: ~p~n", [_Error]),
+            End = erlang:now(),
+            stress_collector:log(0, erlang:trunc(timer:now_diff(End, Start) / 1000)),
             stress(erlang, Count, Client, Owner, KeyCount)
     end.
 

apps/js_data/src/stress_collector.erl

+-module(stress_collector).
+
+-behaviour(gen_server).
+
+%% API
+-export([start/1, log/2, test_complete/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {fd, counter}).
+
+log(Success, Error) ->
+    gen_server:cast(?SERVER, {log, Success, Error}).
+
+test_complete() ->
+    gen_server:cast(?SERVER, test_complete).
+
+start(FileName) ->
+    gen_server:start({local, ?SERVER}, ?MODULE, [FileName], []).
+
+init([FileName]) ->
+    {ok, Fd} = file:open(FileName, [write]),
+    file:write(Fd, io_lib:format("TestId,Success,Error~n", [])),
+    {ok, #state{fd=Fd, counter=1}}.
+
+handle_call(_Request, _From, State) ->
+    {reply, ignore, State}.
+
+handle_cast(test_complete, #state{fd=Fd}=State) ->
+    file:close(Fd),
+    {stop, normal, State};
+handle_cast({log, SuccessTime, ErrorTime}, #state{fd=Fd, counter=Counter}=State) ->
+    file:write(Fd, io_lib:format("~s,~s,~s~n", [integer_to_list(Counter), integer_to_list(SuccessTime),
+                                                integer_to_list(ErrorTime)])),
+    {noreply, State#state{counter=Counter + 1}};
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% Internal functions

apps/riak/priv/mapred_builtins.js

       else {
 	return data;
       }},
+     mapValuesJson: function(value, key_data, arg) {
+      return [JSON.parse(value)];
+    },
     reduceSum: function(values, arg) {
       return [values.reduce(function(prev, curr, index, array) { return prev + curr; })];
     },
       values.sort().reverse();
       return [values[0]];
     },
-    reduceAverage: function(values, arg) {
-      var total = Riak.reduceSum(values, arg);
-      return [total / values.length];
+    reduceSort: function(value, arg) {
+      var c = null;
+      if (arg) {
+	c = eval(arg);
+      }
+      if(c) {
+	return value.sort(c);
+      }
+      else {
+	return value.sort();
+      }
+    },
+    reduceLimit: function(value, arg) {
+      return value.slice(0, arg - 1);
+    },
+    reduceSlice: function(value, arg) {
+      var start = arg[0];
+      var end = arg[1];
+      if (end > value.length) {
+	return value;
+      }
+      else {
+	return value.slice(start, end);
+      }
     }
   }
 }();

apps/riak/src/riak_map_executor.erl

     DocIdx = riak_util:chash_key({Bucket,Key}),
     BucketProps = riak_bucket:get_bucket(Bucket, Ring),
     LinkFun = case QTerm0 of
-        {link,_,_,_} -> proplists:get_value(linkfun, BucketProps);
-        _ -> nop
+                  {erlang, {link,_,_,_}} -> proplists:get_value(linkfun, BucketProps);
+                  _ -> nop
     end,
     case LinkFun of
         linkfun_unset ->

apps/riak/src/riak_mapreduce_fsm.erl

                 link ->
                     case (is_binary(QT2) orelse QT2 == '_') of
                         false -> {bad_qterm, QTerm};
-                        true -> check_query_syntax(Rest)
+                        true -> check_query_syntax(Rest, [{erlang, QTerm}|Accum])
                     end;
                 _ -> % (map or reduce)
                     case QT2 of

apps/riak/src/riak_reduce_phase_fsm.erl

                             riak_phase_proto:done(NextFSM)
                     end,
                     case Acc of
-                        false ->
-                            nop;
+                      false ->
+                            riak_phase_proto:phase_done(Coord),
+                            {stop, normal, NewStateData};
                         true ->
                             riak_phase_proto:phase_results(Coord, NewStateData#state.reduced),
                             riak_phase_proto:phase_done(Coord),