Anonymous avatar Anonymous committed 98d0b28

Initial impl of named JS map & reduce funs

Comments (0)

Files changed (11)

apps/erlang_js/src/js_driver.erl

 
 new() ->
     {ok, Port} = new(no_json),
-%% Load json converter for use later
+    %% Load json converter for use later
     case define_js(Port, <<"json2.js">>, json_converter(), ?SCRIPT_TIMEOUT) of
         ok ->
             {ok, Port};
     case Initializer(Port) of
         ok ->
             {ok, Port};
-        _ ->
+        Error ->
+            error_logger:error_report(Error),
             throw({error, init_failed})
     end;
 new({InitMod, InitFun}) ->
     case InitMod:InitFun(Port) of
         ok ->
             {ok, Port};
-        _ ->
+        Error ->
+            error_logger:error_report(Error),
             throw({error, init_failed})
     end.
 

apps/js_data/src/jsd_generator.erl

 -compile([export_all]).
 %-export([generate/2]).
 
+start_proc_count() ->
+    spawn(fun() -> proc_count([], []) end).
+
+proc_count([], []) ->
+    timer:sleep(5000),
+    proc_count(erlang:processes(), []);
+proc_count(Last, Accum) ->
+    timer:sleep(5000),
+    Current = erlang:processes(),
+    case diff(Current, Last) of
+        [] ->
+            proc_count(Current, Accum);
+        Procs ->
+            NewAccum = prune(Procs ++ Accum),
+            Info = get_info(NewAccum),
+            lists:foreach(fun({Pid, Mod}) ->
+                                  io:format("~p: ~p~n", [Pid, Mod]) end, Info),
+            io:format("~p total orphans~n", [length(NewAccum)]),
+            proc_count(Current, NewAccum)
+    end.
+
+prune(Procs) ->
+    lists:filter(fun(P) ->
+                         lists:member(P, erlang:processes()) end, Procs).
+
+diff(Current, Last) ->
+    lists:foldl(fun(C, A) ->
+                        case lists:member(C, Last) of
+                            false ->
+                                [C|A];
+                            true ->
+                                A
+                        end end, [], Current).
+
+get_info(Procs) ->
+    lists:map(fun(P) ->
+                      {_, {Mod, _, _}} = erlang:process_info(P, current_function),
+                      {P, Mod} end, Procs).
+
 generate(Client, Count) ->
     rand_init(),
     Records = generate_data(Client, Count, []),
     Client:put(riak_object:new(<<"customers">>, <<"customer_list">>, Records), 1).
 
+stress(Count) ->
+    {ok, C} = riak:client_connect('riak@127.0.0.1'),
+    jsd_generator:generate(C, 1000),
+    Start = erlang:now(),
+    stress(C, Count),
+    End = erlang:now(),
+    Elapsed = timer:now_diff(End, Start),
+    {Elapsed, Elapsed / Count}.
+
+stress(_Client, 0) ->
+    io:format("~n"),
+    ok;
+stress(Client, Count) ->
+    if
+        Count rem 1000 == 0 ->
+            io:format(".");
+        true ->
+            nop
+    end,
+    %M = fun(Obj, _, _) ->
+    %            Values = riak_object:get_value(Obj), [proplists:get_value(avg_sales, V) || V <- Values] end,
+    %R = fun(Values, _) ->
+    %            lists:sum(Values) / length(Values) end,
+    M = <<"function(values, key_data, arg) { var v = values[0]; v.map(function(value) { Riak.emit(value[\"avg_sales\"]); }); return true;};">>,
+    R = <<"function(values, arg) { var accum = 0; values.map(function(v) { accum = accum + v;}); return accum / values.length; };">>,
+    {ok, _} = Client:mapred([{<<"customers">>, <<"customer_list">>}], [{map, {jsanon, M}, none, false},
+                                                                       {reduce, {jsanon, R}, none, true}]),
+    stress(Client, Count - 1).
+
+
 %% Internal functions
 generate_data(_Client, 0, Accum) ->
     lists:reverse(Accum);

apps/riak/ebin/riak.app

              riak_fs_backend,
              riak_gb_trees_backend,
              riak_get_fsm,
+             riak_js,
              riak_keys_fsm,
              riak_local_logger,
              riak_map_executor,

apps/riak/priv/mapred_builtins.js

+var Riak = function() {
+  var _mapResults = [];
+
+  return {
+    emit: function(data) { _mapResults[_mapResults.length] = data; return true; },
+    getResults: function() { var retval = _mapResults; _mapResults = []; return retval; },
+    mapIdentity: function(values, key_data, arg) { return values; },
+    mapSelectField: function(values, key_data, fieldName) { if (values instanceof Array) {
+	if (values.length == 1 && (values[0] instanceof Array && values[0].length > 0)) {
+	  values = values[0];
+        }
+	values.forEach(function(value) { Riak.emit(value[fieldName]); });
+	return true;
+      }
+    },
+    reduceUnion: function(values, arg) {
+      var retval = [];
+      values.foreach(function(value) {
+	  if (retval.indexOf(value) == -1) {
+	    retval[retval.length] = value;
+	  } });
+      return retval;
+    },
+    reduceSum: function(values, arg) {
+      return values.reduce(function(prev, curr, index, array) { return prev + curr; });
+    },
+    reduceMin: function(values, arg) {
+      values.sort();
+      return values[0];
+    },
+    reduceMax: function(values, arg) {
+      values.sort().reverse();
+      return values[0];
+    },
+    reduceAverage: function(values, arg) {
+      var total = Riak.reduceSum(values, arg);
+      return total / values.length;
+    },
+    reduceRandom: function(values, arg) {
+      var index = Math.floor(Math.random() * values.length - 1);
+      return values[index];
+    }
+  }
+}();

apps/riak/src/riak_js.erl

+-module(riak_js).
+
+-export([new_context/0, init_context/1, invoke_map/6, invoke_reduce/6]).
+
+new_context() ->
+    js_driver:new({?MODULE, init_context}).
+
+init_context(Ctx) ->
+    case load_init_source() of
+        {ok, Source} ->
+            js:define(Ctx, Source);
+        Error ->
+            Error
+    end.
+
+
+invoke_map(JsCtx, CSums, Args, Class, FunName, undefined) when Class =:= <<"Riak">> ->
+    RealFunName = list_to_binary([Class, <<".">>, FunName]),
+    case js:call(JsCtx, RealFunName, Args) of
+        {ok, _} ->
+            case js:call(JsCtx, <<"Riak.getResults">>, []) of
+                {ok, Results} ->
+                    {Results, CSums};
+                Error ->
+                    {Error, CSums}
+            end;
+        Error ->
+            {Error, CSums}
+    end;
+
+invoke_map(JsCtx, CSums, Args, undefined, FunName, F) ->
+    MD5 = erlang:md5(F),
+    {Continue, NewCSums} = case needs_defining(CSums, FunName, MD5) of
+                               true ->
+                                   F1 = list_to_binary(["var ", FunName, "=", F]),
+                                   case js:define(JsCtx, F1) of
+                                       ok ->
+                                           {ok, dict:store(FunName, MD5, CSums)};
+                                       Error ->
+                                           {Error, CSums}
+                                   end;
+                               false ->
+                                   {ok, CSums}
+                           end,
+    case Continue of
+        ok ->
+            case js:call(JsCtx, FunName, [Args]) of
+                {ok, _} ->
+                    case js:call(JsCtx, <<"Riak.getResults">>, []) of
+                        {ok, Results} ->
+                            {Results, NewCSums};
+                        Err ->
+                            {Err, CSums}
+                    end;
+                Err ->
+                    {Err, CSums}
+            end;
+        Err ->
+            {Err, CSums}
+    end.
+
+invoke_reduce(JsCtx, CSums, Args, Class, FunName, undefined) when Class =:= <<"Riak">> ->
+    RealFunName = list_to_binary([Class, <<".">>, FunName]),
+    case js:call(JsCtx, RealFunName, Args) of
+        {ok, Results} ->
+            {Results, CSums};
+        Error ->
+            {Error, CSums}
+    end;
+
+invoke_reduce(JsCtx, CSums, Args, undefined, FunName, F) ->
+    MD5 = erlang:md5(F),
+    {Continue, NewCSums} = case needs_defining(CSums, FunName, MD5) of
+                               true ->
+                                   F1 = list_to_binary(["var ", FunName, "=", F]),
+                                   case js:define(JsCtx, F1) of
+                                       ok ->
+                                           {ok, dict:store(FunName, MD5, CSums)};
+                                       Error ->
+                                           {Error, CSums}
+                                   end;
+                               false ->
+                                   {ok, CSums}
+                           end,
+    case Continue of
+        ok ->
+            case js:call(JsCtx, FunName, Args) of
+                {ok, Results} ->
+                    {Results, NewCSums};
+                Err ->
+                    {Err, CSums}
+            end;
+        Err ->
+            {Err, CSums}
+    end.
+
+needs_defining(CSums, FunName, CSum) ->
+    case dict:find(FunName, CSums) of
+        error ->
+            true;
+        {ok, OldCSum} ->
+            not(OldCSum =:= CSum)
+    end.
+
+%% Internal functions
+priv_dir() ->
+    %% Hacky workaround to handle running from a standard app directory
+    %% and .ez package
+    case code:priv_dir(riak) of
+        {error, bad_name} ->
+            filename:join([filename:dirname(code:which(?MODULE)), "..", "priv"]);
+        Dir ->
+            Dir
+    end.
+
+load_init_source() ->
+    case js_cache:fetch("mapred_builtins") of
+        none ->
+            {ok, Contents} = file:read_file(filename:join([priv_dir(), "mapred_builtins.js"])),
+            js_cache:store("mapred_builtins", Contents),
+            {ok, Contents};
+        Contents ->
+            {ok, Contents}
+    end.

apps/riak/src/riak_map_phase_fsm.erl

 %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 %% KIND, either express or implied.  See the License for the
 %% specific language governing permissions and limitations
-%% under the License.    
+%% under the License.
 
 -module(riak_map_phase_fsm).
 -behaviour(gen_fsm).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
--export([wait/2]). 
+-export([wait/2]).
 
 -record(state, {done,qterm,next_fsm,coord,acc,map_fsms,ring}).
 
         false -> nop;
         true -> gen_fsm:send_event(Coord, {acc, {list, Reply}})
     end,
-    case FSMs of
-        [] -> 
-            case Done of
-                true -> finish(StateData);
-                _ -> nop
-            end;
-        _ -> nop
-    end,
-    {next_state, wait, StateData#state{map_fsms=FSMs}};
+    case FSMs =:= [] andalso Done =:= true of
+        true ->
+            finish(StateData);
+        false ->
+            {next_state, wait, StateData#state{map_fsms=FSMs}}
+    end;
+
 wait({mapexec_error, _ErrFSM, ErrMsg}, StateData=
      #state{next_fsm=NextFSM,coord=Coord}) ->
     riak_eventer:notify(riak_map_phase_fsm, error, ErrMsg),
   when is_binary(_B) andalso (is_list(_K) orelse is_binary(_K)) -> I;
 convert_input(I={_B,_K})
   when is_binary(_B) andalso (is_list(_K) orelse is_binary(_K)) -> {I,undefined}.
-

apps/riak/src/riak_mapper.erl

 
 -record(jsenv, {ctx, csums=dict:new()}).
 
--export([init_state/0, terminate/1, init_js/1, do_map/8]).
+-export([init_state/0, terminate/1, do_map/8]).
 
 init_state() ->
-    {ok, JsCtx} = js_driver:new({?MODULE, init_js}),
+    {ok, JsCtx} = riak_js:new_context(),
     #jsenv{ctx=JsCtx}.
 
-init_js(Ctx) ->
-    EmitFunction = <<"var __map_results__ = []; function emit(data) { __map_results__[__map_results__.length] = data; };">>,
-    GetResultsFunction = <<"function __get_map_results__() { var retval = __map_results__; __map_results__ = []; return retval; };">>,
-    ok = js:define(Ctx, EmitFunction),
-    ok = js:define(Ctx, GetResultsFunction),
-    ok.
-
 terminate(#jsenv{ctx=Ctx}) ->
     js_driver:destroy(Ctx).
 
     end.
 
 %% Internal functions
+
 build_key({modfun, CMod, CFun}, Arg, KeyData) ->
     {CMod, CFun, Arg, KeyData};
 build_key(_, _, _) ->
     not_cached;
 cache_fetch({jsfun, _}, _BKey, _CacheKey, _MapState) ->
     not_cached;
+cache_fetch({jsanon, _}, _BKey, _CacheKey, _MapState) ->
+    not_cached;
 cache_fetch({modfun, _CMod, _CFun}, BKey, CacheKey, Cache) ->
     case orddict:find(BKey, Cache) of
         error -> not_cached;
             {error, X, MapState}
     end.
 
-exec_map(V, MapState, FunTerm, Arg, BKey, KeyData, VNode) ->
+exec_map(V, #jsenv{ctx=JsCtx, csums=CSums}=MapState, FunTerm, Arg, BKey, KeyData, VNode) ->
     try
         {MapVal, NewMapState} = case FunTerm of
-            {qfun,F} -> {(F)(V,KeyData,Arg), MapState};
-            {jsfun,F} -> invoke_js(MapState, [extract_values(V),KeyData,Arg], <<"riak_mapper">>, F);
-            {modfun,M,F} ->
-                MF_Res = M:F(V,KeyData,Arg),
-                gen_fsm:send_event(VNode,
-                                   {mapcache, BKey,{M,F,Arg,KeyData},MF_Res}),
-                {MF_Res, MapState}
-        end,
+                                    {qfun, F} -> {(F)(V,KeyData,Arg), MapState};
+                                    {jsfun, F} ->
+                                        {Retval, _} = riak_js:invoke_map(JsCtx, CSums, [extract_values(V), KeyData, Arg],
+                                                                         <<"Riak">>, F, undefined),
+                                        {Retval, MapState};
+                                    {jsanon, F} ->
+                                        {Retval, NewCSums} = riak_js:invoke_map(JsCtx, CSums, [extract_values(V), KeyData, Arg],
+                                                                                undefined, <<"riakMapper">>, F),
+                                        {Retval, MapState#jsenv{csums=NewCSums}};
+                                    {modfun, M, F} ->
+                                        MF_Res = M:F(V,KeyData,Arg),
+                                        gen_fsm:send_event(VNode,
+                                                           {mapcache, BKey,{M,F,Arg,KeyData},MF_Res}),
+                                        {MF_Res, MapState}
+                                end,
         {ok, MapVal, NewMapState}
     catch C:R ->
             Reason = {C, R, erlang:get_stacktrace()},
         _ ->
             riak_object:get_values(V)
     end.
-
-invoke_js(#jsenv{ctx=JsCtx, csums=CSums}=MapState, V, FunName, F) ->
-    MD5 = erlang:md5(F),
-    NewState = case needs_defining(CSums, FunName, MD5) of
-                   true ->
-                       F1 = list_to_binary(["var ", FunName, "=", F]),
-                       js:define(JsCtx, F1),
-                       MapState#jsenv{csums=dict:store(FunName, MD5, CSums)};
-                   false ->
-                       MapState
-               end,
-    case js:call(JsCtx, FunName, [V]) of
-        {ok, _} ->
-            case js:call(JsCtx, <<"__get_map_results__">>, []) of
-                {ok, Results} ->
-                    {Results, NewState};
-                Error ->
-                    {Error, NewState}
-            end
-    end.
-
-needs_defining(CSums, FunName, CSum) ->
-    case dict:find(FunName, CSums) of
-        error ->
-            true;
-        {ok, OldCSum} ->
-            not(OldCSum =:= CSum)
-    end.

apps/riak/src/riak_mapreduce_fsm.erl

                                 false -> {bad_qterm, QTerm};
                                 true -> check_query_syntax(Rest)
                             end;
+                        {jsanon, JF_F} when is_binary(JF_F) ->
+                            check_query_syntax(Rest);
                         {jsfun, JF_F} when is_binary(JF_F) ->
                             check_query_syntax(Rest);
                         _ -> {bad_qterm, QTerm}

apps/riak/src/riak_reduce_phase_fsm.erl

 
 -export([wait/2]).
 
--record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input, jsctx}).
+-record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input, jsctx, csums=dict:new()}).
 
 start_link(_Ring,QTerm,NextFSM,Coordinator) ->
     gen_fsm:start_link(?MODULE, [QTerm,NextFSM,Coordinator], []).
 init([QTerm,NextFSM,Coordinator]) ->
     {_,_,_,Acc} = QTerm,
     riak_eventer:notify(riak_reduce_phase_fsm, reduce_start, start),
-    {ok, Ctx} = js_driver:new(),
+    {ok, Ctx} = riak_js:new_context(),
     {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,fresh_input=false,
                     coord=Coordinator,acc=Acc,reduced=[], jsctx=Ctx}}.
 
                                acc=Acc,fresh_input=Fresh,
                                qterm={reduce,FunTerm,Arg,_Acc},
                                coord=Coord,reduced=Reduced,
-                               jsctx=JsCtx}) ->
+                               jsctx=JsCtx, csums=CSums}) ->
     {Res,Red} = case Fresh of
         false ->
             {{next_state, wait, StateData#state{reduced=Reduced}},Reduced};
         true ->
             try
-                NewReduced = case FunTerm of
-                                 {qfun,F} -> F(Reduced,Arg);
-                                 {modfun,M,F} -> M:F(Reduced,Arg);
-                                 {jsfun, F} -> invoke_js(JsCtx, <<"riak_reducer">>, F, Reduced, Arg)
-                             end,
-                {{next_state, wait, StateData#state{reduced=NewReduced}},
+                {NewReduced, NewCSums} = case FunTerm of
+                                             {qfun,F} -> {F(Reduced,Arg), CSums};
+                                             {modfun,M,F} -> {M:F(Reduced,Arg), CSums};
+                                             {jsanon, F} ->
+                                                 {Retval, CSums1} = riak_js:invoke_reduce(JsCtx, CSums,
+                                                                                          [Reduced, Arg], undefined, <<"riakReducer">>, F),
+                                                 {Retval, CSums1};
+                                             {jsfun, F} ->
+                                                 {Retval, _} = riak_js:invoke_reduce(JsCtx, CSums, [Reduced, Arg],
+                                                                                     <<"Riak">>, F, undefined),
+                                                 {Retval, CSums}
+                                             end,
+                {{next_state, wait, StateData#state{reduced=NewReduced, csums=NewCSums}},
                  NewReduced}
             catch C:R ->
                     Reason = {C, R, erlang:get_stacktrace()},
 
 %% @private
 code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
-
-%% @private
-invoke_js(JsCtx, FunName, FunSource, Reduced, Arg) ->
-    js:define(JsCtx, list_to_binary(["var ", FunName, "=", FunSource])),
-    case js:call(JsCtx, FunName, [Reduced, Arg]) of
-        {ok, Result} ->
-            Result;
-        Error ->
-            Error
-    end.

apps/riak/src/riak_vnode.erl

     {ok, ModState} = Mod:start(VNodeIndex, Configuration),
     StateData0 = #state{idx=VNodeIndex,mod=Mod,modstate=ModState,mapstate=riak_mapper:init_state()},
     {next_state, StateName, StateData, Timeout} = hometest(StateData0),
+    dbg:stop(),
     {ok, StateName, StateData, Timeout}.
 
 %% @private

apps/riak/src/riak_vnode_master.erl

 %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 %% KIND, either express or implied.  See the License for the
 %% specific language governing permissions and limitations
-%% under the License.    
+%% under the License.
 
 -module(riak_vnode_master).
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.