Anonymous avatar Anonymous committed 05d8ed7

Rewrote guts of JS M/R to use a set of pooled JS VMs instead of one per vnode

Comments (0)

Files changed (14)

apps/riak/src/riak_js.erl

--module(riak_js).
-
--export([new_context/0, init_context/1, invoke_map/6, invoke_reduce/6]).
--export([fetch_fun/2]).
-
-new_context() ->
-    js_driver:new({?MODULE, init_context}).
-
-init_context(Ctx) ->
-    case load_init_source() of
-        {ok, Source} ->
-            js:define(Ctx, Source);
-        {error, Error} ->
-            {error, Error}
-    end.
-
-invoke_map(JsCtx, CSums, Args, undefined, FunName, F) ->
-    MD5 = erlang:md5(F),
-    {Continue, NewCSums} = case needs_defining(CSums, FunName, MD5) of
-                               true ->
-                                   F1 = list_to_binary(["var ", FunName, "=", F]),
-                                   case js:define(JsCtx, F1) of
-                                       ok ->
-                                           {ok, dict:store(FunName, MD5, CSums)};
-                                       {error, Error} ->
-                                           {{error, Error}, CSums}
-                                   end;
-                               false ->
-                                   {ok, CSums}
-                           end,
-    case Continue of
-        ok ->
-            case js:call(JsCtx, FunName, Args) of
-                {ok, Results} ->
-                    {Results, NewCSums};
-                {error, Err} ->
-                    {{error, Err}, CSums}
-            end;
-        Err ->
-            {Err, CSums}
-    end;
-
-invoke_map(JsCtx, CSums, Args, Class, FunName, undefined) ->
-    RealFunName = list_to_binary([Class, <<".">>, FunName]),
-    case js:call(JsCtx, RealFunName, Args) of
-        {ok, Results} ->
-            {Results, CSums};
-        {error, Error} ->
-            {{error, Error}, CSums}
-    end.
-
-invoke_reduce(JsCtx, CSums, Args, Class, FunName, undefined) when Class =:= <<"Riak">> ->
-    RealFunName = list_to_binary([Class, <<".">>, FunName]),
-    case js:call(JsCtx, RealFunName, Args) of
-        {ok, Results} ->
-            {Results, CSums};
-        {error, Error} ->
-            {{error, Error}, CSums}
-    end;
-
-invoke_reduce(JsCtx, CSums, Args, undefined, FunName, F) ->
-    MD5 = erlang:md5(F),
-    {Continue, NewCSums} = case needs_defining(CSums, FunName, MD5) of
-                               true ->
-                                   F1 = list_to_binary(["var ", FunName, "=", F]),
-                                   case js:define(JsCtx, F1) of
-                                       ok ->
-                                           {ok, dict:store(FunName, MD5, CSums)};
-                                       {error, Error} ->
-                                           {{error, Error}, CSums}
-                                   end;
-                               false ->
-                                   {ok, CSums}
-                           end,
-    case Continue of
-        ok ->
-            case js:call(JsCtx, FunName, Args) of
-                {ok, Results} ->
-                    {Results, NewCSums};
-                Err ->
-                    {Err, CSums}
-            end;
-        Err ->
-            {Err, CSums}
-    end.
-
-needs_defining(CSums, FunName, CSum) ->
-    case dict:find(FunName, CSums) of
-        error ->
-            true;
-        {ok, OldCSum} ->
-            not(OldCSum =:= CSum)
-    end.
-
-fetch_fun(Bucket, Key) ->
-    {ok, Client} = riak:local_client(),
-    Source = case Client:get(Bucket, Key) of
-                 {ok, Obj} ->
-                     riak_object:get_value(Obj);
-                 {error, Error} ->
-                     error_logger:error_report(Error),
-                     <<>>
-                         end,
-    Client:stop(),
-    Source.
-
-%% Internal functions
-priv_dir() ->
-    %% Hacky workaround to handle running from a standard app directory
-    %% and .ez package
-    case code:priv_dir(riak) of
-        {error, bad_name} ->
-            filename:join([filename:dirname(code:which(?MODULE)), "..", "priv"]);
-        Dir ->
-            Dir
-    end.
-
-load_init_source() ->
-    case js_cache:fetch("mapred_builtins") of
-        none ->
-            {ok, Contents} = file:read_file(filename:join([priv_dir(), "mapred_builtins.js"])),
-            js_cache:store("mapred_builtins", Contents),
-            {ok, Contents};
-        Contents ->
-            {ok, Contents}
-    end.

apps/riak/src/riak_js_manager.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(riak_js_manager).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/1, dispatch/1, blocking_dispatch/1, add_to_manager/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {children=[]}).
+
+dispatch(JSCall) ->
+    case select_random() of
+        no_vms ->
+            {error, no_vms};
+        Target ->
+            JobId = {Target, make_ref()},
+            riak_js_vm:dispatch(Target, self(), JobId, JSCall),
+            {ok, JobId}
+    end.
+
+blocking_dispatch(JSCall) ->
+    case select_random() of
+        no_vms ->
+            {error, no_vms};
+        Target ->
+            JobId = {Target, make_ref()},
+            riak_js_vm:blocking_dispatch(Target, JobId, JSCall)
+    end.
+
+add_to_manager() ->
+    gen_server:cast(?MODULE, {add_child, self()}).
+
+start_link(ChildCount) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [ChildCount], []).
+
+init([ChildCount]) ->
+    start_children(ChildCount),
+    {ok, #state{}}.
+
+handle_call(_Request, _From, State) ->
+    {reply, ignore, State}.
+
+handle_cast({add_child, ChildPid}, #state{children=Children}=State) ->
+    erlang:monitor(process, ChildPid),
+    {noreply, State#state{children=Children ++ [ChildPid]}};
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({'DOWN', _MRef, _Type, Pid, _Info}, #state{children=Children}=State) ->
+    case lists:member(Pid, Children) of
+        true ->
+            riak_js_sup:start_js(self()),
+            {noreply, State#state{children=lists:delete(Pid, Children)}};
+        false ->
+            {noreply, State}
+    end;
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% Internal functions
+start_children(0) ->
+    ok;
+start_children(Count) ->
+    riak_js_sup:start_js(self()),
+    start_children(Count - 1).
+
+select_random() ->
+    case pg2:get_members({node(), js_vm}) of
+        [] ->
+            no_vms;
+        {error, _} ->
+            novms;
+        Members ->
+            {T1, T2, T3} = erlang:now(),
+            random:seed(T1, T2, T3),
+            Pos = random:uniform(length(Members)),
+            lists:nth(Pos, Members)
+    end.

apps/riak/src/riak_js_sup.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+-module(riak_js_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1, stop/1]).
+-export([start_js/1]).
+
+start_js(Manager) when is_pid(Manager) ->
+    supervisor:start_child(?MODULE, [Manager]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+stop(_S) -> ok.
+
+%% @private
+init([]) ->
+    {ok,
+     {{simple_one_for_one, 10, 10},
+      [{undefined,
+        {riak_js_vm, start_link, []},
+        temporary, 2000, worker, [riak_js_vm]}]}}.

apps/riak/src/riak_js_vm.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+-module(riak_js_vm).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/1, dispatch/4, blocking_dispatch/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {manager, ctx, last_mapper, last_reducer}).
+
+start_link(Manager) ->
+    gen_server:start_link(?MODULE, [Manager], []).
+
+dispatch(VMPid, Requestor, JobId, JSCall) ->
+    gen_server:cast(VMPid, {dispatch, Requestor, JobId, JSCall}).
+
+blocking_dispatch(VMPid, JobId, JSCall) ->
+    gen_server:call(VMPid, {dispatch, JobId, JSCall}, 10000).
+
+init([Manager]) ->
+    pg2:create({node(), js_vm}),
+    pg2:join({node(), js_vm}, self()),
+    io:format("(~p) Spidermonkey VM starting~n", [self()]),
+    case new_context() of
+        {ok, Ctx} ->
+            riak_js_manager:add_to_manager(),
+            erlang:monitor(process, Manager),
+            {ok, #state{manager=Manager, ctx=Ctx}};
+        Error ->
+            {stop, Error}
+    end.
+
+handle_call({dispatch, _JobId, {{jsanon, JS}, Reduced, Arg}}, _From, #state{ctx=Ctx}=State) ->
+    {Result, NewState} = case define_anon_js(reduce, JS, State) of
+                             {ok, State1} ->
+                                 JsonReduced = mochijson2:encode(Reduced),
+                                 JsonArg = mochijson2:encode(Arg),
+                                 {js:call(Ctx, <<"riakReducer">>, [JsonReduced, JsonArg]),
+                                  State1};
+                             Error ->
+                                 Error
+                         end,
+    Reply = case Result of
+                {ok, {struct, JSError}} ->
+                    {error, JSError};
+                _ ->
+                    Result
+            end,
+    {reply, Reply, NewState};
+handle_call(_Request, _From, State) ->
+    io:format("WTF: ~p~n", [_Request]),
+    {reply, ignore, State}.
+
+handle_cast({dispatch, Requestor, _JobId, {FsmPid, {map, {jsanon, JS}, Arg, _Acc},
+                                            Value,
+                                            KeyData}}, #state{ctx=Ctx}=State) ->
+    {Result, NewState} = case define_anon_js(map, JS, State) of
+                             {ok, State1} ->
+                                 JsonValue = jsonify_object(Value),
+                                 JsonArg = jsonify_arg(Arg),
+                                 {js:call(Ctx, <<"riakMapper">>, [JsonValue, KeyData, JsonArg]),
+                                  State1};
+                             Error ->
+                                 Error
+                         end,
+    case Result of
+        {ok, {struct, JsError}} ->
+              gen_fsm:send_event(FsmPid, {mapexec_error, Requestor, JsError});
+        {ok, ReturnValue} ->
+            gen_fsm:send_event(FsmPid, {mapexec_reply, ReturnValue, Requestor});
+        ErrorResult ->
+            gen_fsm:send_event(FsmPid, {mapexec_error, Requestor, ErrorResult})
+    end,
+    {noreply, NewState};
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({'DOWN', _MRef, _Type, Manager, _Info}, #state{manager=Manager}=State) ->
+    {stop, normal, State#state{manager=undefined}};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, #state{ctx=Ctx}) ->
+    js_driver:destroy(Ctx),
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% Internal functions
+define_anon_js(Name, JS, #state{ctx=Ctx, last_mapper=LastMapper, last_reducer=LastReducer}=State) ->
+    Hash = erlang:phash2(JS),
+    {OldHash, FunName} = if
+                             Name == map ->
+                                 {LastMapper, <<"riakMapper">>};
+                             true ->
+                                 {LastReducer, <<"riakReducer">>}
+                         end,
+    if
+        Hash == OldHash ->
+            {ok, State};
+        true ->
+            case js:define(Ctx, list_to_binary([<<"var ">>, FunName, <<" = ">>, JS, <<";">>])) of
+                ok ->
+                    if
+                        Name == map ->
+                            {ok, State#state{last_mapper=Hash}};
+                        true ->
+                            {ok, State#state{last_reducer=Hash}}
+                    end;
+                Error ->
+                    {Error, State}
+            end
+    end.
+
+new_context() ->
+    InitFun = fun(Ctx) -> init_context(Ctx) end,
+    js_driver:new(InitFun).
+
+init_context(Ctx) ->
+    case load_init_source() of
+        {ok, Source} ->
+            js:define(Ctx, Source);
+        {error, Error} ->
+            {error, Error}
+    end.
+
+priv_dir() ->
+    %% Hacky workaround to handle running from a standard app directory
+    %% and .ez package
+    case code:priv_dir(riak) of
+        {error, bad_name} ->
+            filename:join([filename:dirname(code:which(?MODULE)), "..", "priv"]);
+        Dir ->
+            Dir
+    end.
+
+load_init_source() ->
+    case js_cache:fetch("mapred_builtins") of
+        none ->
+            {ok, Contents} = file:read_file(filename:join([priv_dir(), "mapred_builtins.js"])),
+            js_cache:store("mapred_builtins", Contents),
+            {ok, Contents};
+        Contents ->
+            {ok, Contents}
+    end.
+
+jsonify_object({error, notfound}) ->
+    {struct, [{<<"error">>, <<"notfound">>}]};
+jsonify_object(Obj) ->
+    {_,Vclock} = raw_http_resource:vclock_header(Obj),
+    {struct, [{<<"bucket">>, riak_object:bucket(Obj)},
+              {<<"key">>, riak_object:key(Obj)},
+              {<<"vclock">>, list_to_binary(Vclock)},
+              {<<"values">>,
+               [{struct,
+                 [{<<"metadata">>, jsonify_metadata(MD)},
+                  {<<"data">>, V}]}
+                || {MD, V} <- riak_object:get_contents(Obj)
+                      ]}]}.
+
+jsonify_metadata(MD) ->
+    MDJS = fun({LastMod, Now={_,_,_}}) ->
+                   % convert Now to JS-readable time string
+                   {LastMod, list_to_binary(
+                               httpd_util:rfc1123_date(
+                                 calendar:now_to_local_time(Now)))};
+              ({<<"Links">>, Links}) ->
+                   {<<"Links">>, [ [B, K, T] || {{B, K}, T} <- Links ]};
+              ({Name, List=[_|_]}) ->
+                   {Name, jsonify_metadata_list(List)};
+              ({Name, Value}) ->
+                   {Name, Value}
+           end,
+    {struct, lists:map(MDJS, dict:to_list(MD))}.
+
+%% @doc convert strings to binaries, and proplists to JSON objects
+jsonify_metadata_list([]) -> [];
+jsonify_metadata_list(List) ->
+    Classifier = fun({Key,_}, Type) when (is_binary(Key) orelse is_list(Key)),
+                                         Type /= array, Type /= string ->
+                         struct;
+                    (C, Type) when is_integer(C), C >= 0, C =< 256,
+                                   Type /= array, Type /= struct ->
+                         string;
+                    (_, _) ->
+                         array
+                 end,
+    case lists:foldl(Classifier, undefined, List) of
+        struct -> {struct, [ {if is_list(Key) -> list_to_binary(Key);
+                                 true         -> Key
+                              end,
+                              if is_list(Value) -> jsonify_metadata_list(Value);
+                                 true           -> Value
+                              end}
+                             || {Key, Value} <- List]};
+        string -> list_to_binary(List);
+        array -> List
+    end.
+
+jsonify_arg({Bucket,Tag}) when (Bucket == '_' orelse is_binary(Bucket)),
+                               (Tag == '_' orelse is_binary(Tag)) ->
+    %% convert link match syntax
+    {struct, [{<<"bucket">>,Bucket},
+              {<<"tag">>,Tag}]};
+jsonify_arg(Other) ->
+    mochijson2:encode(Other).

apps/riak/src/riak_map_executor.erl

             {stop,no_linkfun};
         _ ->
             QTerm = case QTerm0 of
-                        {map, _, _, _} -> QTerm0;
-                        {link, LB, LT, LAcc} -> {map, LinkFun, {LB, LT}, LAcc}
+                        {_, {map, _, _, _}} -> QTerm0;
+                        {Lang, {link, LB, LT, LAcc}} -> {Lang, {map, LinkFun, {LB, LT}, LAcc}}
                     end,
             N = proplists:get_value(n_val,BucketProps),
             Preflist = riak_ring:preflist(DocIdx, Ring),
     {next_state, wait, StateData#state{
                          vnodes=try_vnode(QTerm, BKey, KeyData, VNodes)},
      1000};
+wait({mapexec_reply, executing, _}, StateData) ->
+    {next_state, wait, StateData, 1000};
 wait({mapexec_reply, RetVal, _VN}, StateData=#state{phase_pid=PhasePid}) ->
     riak_phase_proto:mapexec_result(PhasePid, RetVal),
     {stop,normal,StateData}.

apps/riak/src/riak_map_phase_fsm.erl

     gen_fsm:start_link(?MODULE, [Ring,QTerm,NextFSM,Coordinator], []).
 %% @private
 init([Ring,QTerm,NextFSM,Coordinator]) ->
-    {_,_,_,Acc} = QTerm,
+    {_,{_,_,_,Acc}} = QTerm,
     riak_eventer:notify(riak_map_phase_fsm, map_start, start),
     {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,
                     coord=Coordinator,acc=Acc,map_fsms=[],ring=Ring}}.
     FSMs = lists:delete(MapFSM,FSMs0),
     case NextFSM of
         final -> nop;
-        _ -> riak_phase_proto:send_inputs(NextFSM, Reply)
+        _ ->
+            riak_phase_proto:send_inputs(NextFSM, Reply)
     end,
     case Acc of
         false -> nop;

apps/riak/src/riak_mapper.erl

--module(riak_mapper).
-
--record(jsenv, {ctx, csums=dict:new()}).
-
--export([init_state/0, terminate/1, do_map/8]).
-
-init_state() ->
-    {ok, JsCtx} = riak_js:new_context(),
-    #jsenv{ctx=JsCtx}.
-
-terminate(#jsenv{ctx=Ctx}) ->
-    js_driver:destroy(Ctx).
-
-do_map({map,FunTerm,Arg,_Acc}, BKey, Mod, ModState, KeyData, MapState, Cache, VNode) ->
-    CacheKey = build_key(FunTerm, Arg, KeyData),
-    CacheVal = cache_fetch(FunTerm, BKey, CacheKey, Cache),
-    case CacheVal of
-        not_cached ->
-            uncached_map(BKey, Mod, ModState, MapState, FunTerm, Arg, KeyData, VNode);
-        CV ->
-            {ok, CV, MapState}
-    end.
-
-%% Internal functions
-
-build_key({modfun, CMod, CFun}, Arg, KeyData) ->
-    {CMod, CFun, Arg, KeyData};
-build_key({jsfun, FunName}, Arg, KeyData) ->
-    {FunName, Arg, KeyData};
-build_key(_, _, _) ->
-    no_key.
-
-cache_fetch({qfun, _}, _BKey, _CacheKey, _MapState) ->
-    not_cached;
-cache_fetch({jsanon, _}, _BKey, _CacheKey, _MapState) ->
-    not_cached;
-cache_fetch({modfun, _CMod, _CFun}, BKey, CacheKey, Cache) ->
-    case orddict:find(BKey, Cache) of
-        error -> not_cached;
-        {ok,CDict} ->
-            case orddict:find(CacheKey,CDict) of
-                error -> not_cached;
-                {ok,CVal} -> CVal
-            end
-    end;
-%% TODO: Cache jsfun results, too
-cache_fetch({jsfun, _FunName}, _BKey, _CacheKey, _Cache) ->
-    not_cached.
-
-uncached_map(BKey, Mod, ModState, MapState, FunTerm, Arg, KeyData, VNode) ->
-    riak_eventer:notify(riak_vnode, uncached_map, {FunTerm, Arg, BKey}),
-    case Mod:get(ModState, BKey) of
-        {ok, Binary} ->
-            V = binary_to_term(Binary),
-            exec_map(V, MapState, FunTerm, Arg, BKey, KeyData, VNode);
-        {error, notfound} ->
-            exec_map({error, notfound}, MapState, FunTerm, Arg, BKey, KeyData, VNode);
-        X ->
-            {error, X, MapState}
-    end.
-
-exec_map(V, #jsenv{ctx=JsCtx, csums=CSums}=MapState, FunTerm, Arg, BKey, KeyData, VNode) ->
-    try
-        {MapVal, NewMapState} = case FunTerm of
-                                    {qfun, F} -> {(F)(V,KeyData,Arg), MapState};
-                                    {jsfun, F} ->
-                                        {Retval, _} = riak_js:invoke_map(JsCtx, CSums, [jsonify_object(V), KeyData, jsonify_arg(Arg)],
-                                                                         <<"Riak">>, F, undefined),
-                                        {Retval, MapState};
-                                    {jsanon, {Bucket, Key}} ->
-                                        exec_map(V, MapState, {jsanon, riak_js:fetch_fun(Bucket, Key)}, Arg, BKey, KeyData, VNode);
-                                    {jsanon, F} ->
-                                        {Retval, NewCSums} = riak_js:invoke_map(JsCtx, CSums, [jsonify_object(V), KeyData, jsonify_arg(Arg)],
-                                                                                undefined, <<"riakMapper">>, F),
-                                        {Retval, MapState#jsenv{csums=NewCSums}};
-                                    {modfun, M, F} ->
-                                        MF_Res = M:F(V,KeyData,Arg),
-                                        gen_fsm:send_event(VNode,
-                                                           {mapcache, BKey,{M,F,Arg,KeyData},MF_Res}),
-                                        {MF_Res, MapState}
-                                end,
-        {ok, MapVal, NewMapState}
-    catch C:R ->
-            Reason = {C, R, erlang:get_stacktrace()},
-            {error, Reason, MapState}
-    end.
-
-jsonify_object({error, notfound}) ->
-    {struct, [{<<"error">>, <<"notfound">>}]};
-jsonify_object(Obj) ->
-    {_,Vclock} = raw_http_resource:vclock_header(Obj),
-    {struct, [{<<"bucket">>, riak_object:bucket(Obj)},
-              {<<"key">>, riak_object:key(Obj)},
-              {<<"vclock">>, list_to_binary(Vclock)},
-              {<<"values">>,
-               [{struct,
-                 [{<<"metadata">>, jsonify_metadata(MD)},
-                  {<<"data">>, V}]}
-                || {MD, V} <- riak_object:get_contents(Obj)
-                      ]}]}.
-
-jsonify_metadata(MD) ->
-    MDJS = fun({LastMod, Now={_,_,_}}) ->
-                   % convert Now to JS-readable time string
-                   {LastMod, list_to_binary(
-                               httpd_util:rfc1123_date(
-                                 calendar:now_to_local_time(Now)))};
-              ({<<"Links">>, Links}) ->
-                   {<<"Links">>, [ [B, K, T] || {{B, K}, T} <- Links ]};
-              ({Name, List=[_|_]}) ->
-                   {Name, jsonify_metadata_list(List)};
-              ({Name, Value}) ->
-                   {Name, Value}
-           end,
-    {struct, lists:map(MDJS, dict:to_list(MD))}.
-
-%% @doc convert strings to binaries, and proplists to JSON objects
-jsonify_metadata_list([]) -> [];
-jsonify_metadata_list(List) ->
-    Classifier = fun({Key,_}, Type) when (is_binary(Key) orelse is_list(Key)),
-                                         Type /= array, Type /= string ->
-                         struct;
-                    (C, Type) when is_integer(C), C >= 0, C =< 256,
-                                   Type /= array, Type /= struct ->
-                         string;
-                    (_, _) ->
-                         array
-                 end,
-    case lists:foldl(Classifier, undefined, List) of
-        struct -> {struct, [ {if is_list(Key) -> list_to_binary(Key);
-                                 true         -> Key
-                              end,
-                              if is_list(Value) -> jsonify_metadata_list(Value);
-                                 true           -> Value
-                              end}
-                             || {Key, Value} <- List]};
-        string -> list_to_binary(List);
-        array -> List
-    end.
-
-jsonify_arg({Bucket,Tag}) when (Bucket == '_' orelse is_binary(Bucket)),
-                               (Tag == '_' orelse is_binary(Tag)) ->
-    %% convert link match syntax
-    {struct, [{<<"bucket">>,Bucket},
-              {<<"tag">>,Tag}]};
-jsonify_arg(Other) ->
-    Other.

apps/riak/src/riak_mapreduce_fsm.erl

     {ok, Ring} = riak_ring_manager:get_my_ring(),
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start, {ReqId, Query}),
     case check_query_syntax(Query) of
-        ok ->
-            FSMs = make_phase_fsms(Query, Ring), % Pid for each phase, in-order
+        {ok, Query1} ->
+            FSMs = make_phase_fsms(Query1, Ring), % Pid for each phase, in-order
             StateData = #state{client=Client,fsms=FSMs,reqid=ReqId,
                                starttime=riak_util:moment(),timeout=Timeout,
                                ring=Ring,input_done=false},
             {stop, {bad_qterm, QTerm}}
     end.
 
-check_query_syntax([]) -> ok;
-check_query_syntax([QTerm={QTermType,QT2,_QT3,Acc}|Rest])
+check_query_syntax(Query) ->
+    check_query_syntax(Query, []).
+
+check_query_syntax([], Accum) ->
+    {ok, lists:reverse(Accum)};
+check_query_syntax([QTerm={QTermType,QT2,_QT3,Acc}|Rest], Accum)
   when is_boolean(Acc) ->
     case lists:member(QTermType, [link,map,reduce]) of
         false -> {bad_qterm, QTerm};
                         {modfun, MF_M, MF_F} ->
                             case is_atom(MF_M) andalso is_atom(MF_F) of
                                 false -> {bad_qterm, QTerm};
-                                true -> check_query_syntax(Rest)
+                                true -> check_query_syntax(Rest, [{erlang, QTerm}|Accum])
                             end;
                         {qfun, QF_F} ->
                             case is_function(QF_F) of
                                 false -> {bad_qterm, QTerm};
-                                true -> check_query_syntax(Rest)
+                                true -> check_query_syntax(Rest, [{erlang, QTerm}|Accum])
                             end;
-                        {jsanon, {Bucket, Key}} when is_binary(Bucket),
-                                                     is_binary(Key) ->
-                            check_query_syntax(Rest);
-                        {jsanon, JF_F} when is_binary(JF_F) ->
-                            check_query_syntax(Rest);
-                        {jsfun, JF_F} when is_binary(JF_F) ->
-                            check_query_syntax(Rest);
+                        {jsanon, JS} when is_binary(JS) ->
+                            check_query_syntax(Rest, [{javascript, QTerm}|Accum]);
                         _ -> {bad_qterm, QTerm}
                     end
             end
     end;
-check_query_syntax([BadQTerm|_]) -> {bad_qterm,BadQTerm}.
+check_query_syntax([BadQTerm|_], _) -> {bad_qterm,BadQTerm}.
 
 make_phase_fsms(Query, Ring) ->
     make_phase_fsms(lists:reverse(Query),final,[], Ring).
 make_phase_fsms([], _NextFSM, FSMs, _Ring) -> FSMs;
 make_phase_fsms([QTerm|Rest], NextFSM, FSMs, Ring) ->
     {ok, Pid} = case QTerm of
-                    {reduce, _, _, _} ->
+                    {_, {reduce, _, _, _}} ->
                         riak_phase_sup:new_reduce_phase(Ring, QTerm, NextFSM, self());
-                    {map, _, _, _} ->
+                    {_, {map, _, _, _}} ->
                         riak_phase_sup:new_map_phase(Ring, QTerm, NextFSM, self());
-                    {link, _, _, _} ->
+                    {_, {link, _, _, _}} ->
                         riak_phase_sup:new_map_phase(Ring, QTerm, NextFSM, self())
                 end,
     make_phase_fsms(Rest,Pid,[Pid|FSMs], Ring).

apps/riak/src/riak_mapreduce_sup.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
 -module(riak_mapreduce_sup).
 
 -behaviour(supervisor).

apps/riak/src/riak_phase_proto.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
 -module(riak_phase_proto).
 
 -export([send_inputs/2, mapexec_result/2, mapexec_error/2, error/2]).

apps/riak/src/riak_phase_sup.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
 -module(riak_phase_sup).
 
 -behaviour(supervisor).

apps/riak/src/riak_reduce_phase_fsm.erl

 
 -export([wait/2]).
 
--record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input, jsctx, csums=dict:new()}).
+-record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input}).
 
 start_link(_Ring,QTerm,NextFSM,Coordinator) ->
     gen_fsm:start_link(?MODULE, [QTerm,NextFSM,Coordinator], []).
 %% @private
 init([QTerm,NextFSM,Coordinator]) ->
-    {_,_,_,Acc} = QTerm,
+    {_,{_,_,_,Acc}} = QTerm,
     riak_eventer:notify(riak_reduce_phase_fsm, reduce_start, start),
-    {ok, Ctx} = riak_js:new_context(),
     {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,fresh_input=false,
-                    coord=Coordinator,acc=Acc,reduced=[], jsctx=Ctx}}.
+                    coord=Coordinator,acc=Acc,reduced=[]}}.
 
 wait(timeout, StateData=#state{next_fsm=NextFSM,done=Done,
                                acc=Acc,fresh_input=Fresh,
-                               qterm={reduce,FunTerm,Arg,_Acc},
-                               coord=Coord,reduced=Reduced,
-                               jsctx=JsCtx, csums=CSums}) ->
+                               qterm={Lang,{reduce,FunTerm,Arg,_Acc}},
+                               coord=Coord,reduced=Reduced}) ->
     {Res,Red} = case Fresh of
         false ->
             {{next_state, wait, StateData#state{reduced=Reduced}},Reduced};
         true ->
             try
-                {NewReduced, NewCSums} = case FunTerm of
-                                             {qfun,F} -> {F(Reduced,Arg), CSums};
-                                             {modfun,M,F} -> {M:F(Reduced,Arg), CSums};
-                                             {jsanon, {Bucket, Key}} ->
-                                                 Source = riak_js:fetch_fun(Bucket, Key),
-                                                 riak_js:invoke_reduce(JsCtx, CSums, [Reduced, Arg], undefined,
-                                                                       <<"riakReducer">>, Source);
-                                             {jsanon, F} ->
-                                                 riak_js:invoke_reduce(JsCtx, CSums,
-                                                                       [Reduced, Arg], undefined, <<"riakReducer">>, F);
-                                             {jsfun, F} ->
-                                                 {Retval, _} = riak_js:invoke_reduce(JsCtx, CSums, [Reduced, Arg],
-                                                                                     <<"Riak">>, F, undefined),
-                                                 {Retval, CSums}
-                                             end,
-                {{next_state, wait, StateData#state{reduced=NewReduced, csums=NewCSums}}, NewReduced}
+                NewReduced = case {Lang, FunTerm} of
+                                 {erlang, {qfun,F}} ->
+                                     F(Reduced,Arg);
+                                 {erlang, {modfun,M,F}} ->
+                                     M:F(Reduced,Arg);
+                                 {javascript, {jsanon, _}=QTerm} ->
+                                     case riak_js_manager:blocking_dispatch({QTerm, Reduced, Arg}) of
+                                         {ok, ReturnValue} ->
+                                             ReturnValue;
+                                         Error ->
+                                             Error
+                                     end
+                             end,
+                {{next_state, wait, StateData#state{reduced=NewReduced}}, NewReduced}
             catch C:R ->
                     Reason = {C, R, erlang:get_stacktrace()},
                     case NextFSM of
 handle_info(_Info, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 %% @private
-terminate(Reason, _StateName, State) ->
+terminate(Reason, _StateName, _State) ->
     riak_eventer:notify(riak_reduce_phase_fsm, phase_end, Reason),
-    js_driver:destroy(State#state.jsctx),
     Reason.
 
 %% @private

apps/riak/src/riak_sup.erl

     RiakStat = {riak_stat,
                 {riak_stat, start_link, []},
                 permanent, 5000, worker, [riak_stat]},
+    RiakJsMgr = {riak_js_manager,
+                 {riak_js_manager, start_link, [24]},
+                 permanent, 30000, worker, [riak_js_manager]},
+    RiakJsSup = {riak_js_sup,
+                 {riak_js_sup, start_link, []},
+                 permanent, infinity, supervisor, [riak_js_sup]},
     MapReduceFSMSup = {riak_mapreduce_sup,
                        {riak_mapreduce_sup, start_link, []},
                        permanent, infinity, supervisor, [riak_mapreduce_sup]},
 
     % Build the process list...
     Processes = lists:flatten([
-        Eventer, 
-        VSup,                               
+        Eventer,
+        VSup,
         ?IF(HasStorageBackend, VMaster, []),
         RingMgr,
         Connect,
         ?IF(IsWebConfigured, RiakWeb, []),
         ?IF(IsStatEnabled, RiakStat, []),
         MapReduceFSMSup,
-        PhaseFSMSup
+        PhaseFSMSup,
+        RiakJsSup,
+        RiakJsMgr
     ]),
 
     % Run the proesses...

apps/riak/src/riak_vnode.erl

 
 -define(TIMEOUT, 60000).
 
--record(state, {idx,mapcache,mod,modstate,waiting_diffobjs,mapstate}).
+-record(state, {idx,mapcache,mod,modstate,waiting_diffobjs}).
 
 start_link(Idx) ->
     gen_fsm:start_link(?MODULE, [Idx], []).
     Mod = riak:get_app_env(storage_backend),
     Configuration = riak:get_app_env(),
     {ok, ModState} = Mod:start(VNodeIndex, Configuration),
-    StateData0 = #state{idx=VNodeIndex,mod=Mod,modstate=ModState,mapstate=riak_mapper:init_state()},
+    StateData0 = #state{idx=VNodeIndex,mod=Mod,modstate=ModState},
     {next_state, StateName, StateData, Timeout} = hometest(StateData0),
     {ok, StateName, StateData, Timeout}.
 
 merk_waiting({diffobj,{_BKey,_BinObj,_RemNode}}, StateData) ->
     hometest(StateData);
 merk_waiting({map, ClientPid, QTerm, BKey, KeyData}, StateData) ->
-
     NewState = do_map(ClientPid,QTerm,BKey,KeyData,StateData,self()),
     {next_state,merk_waiting,NewState,?TIMEOUT};
 merk_waiting({put, FSM_pid, _BKey, _RObj, ReqID, _FSMTime},
          Mult)).
 
 %% @private
-do_map(ClientPid, QTerm, BKey, KeyData, #state{mod=Mod, modstate=ModState, mapstate=MapState, mapcache=Cache}=State, VNode) ->
-    {Reply, NewState} = case riak_mapper:do_map(QTerm, BKey, Mod, ModState, KeyData, MapState, Cache, VNode) of
-                            {ok, Retval, NewMapState} ->
-                                {{mapexec_reply, Retval, self()}, State#state{mapstate=NewMapState}};
-                            {error, Error, NewMapState} ->
-                                {{mapexec_error, self(), Error}, State#state{mapstate=NewMapState}}
+do_map(ClientPid, QTerm, BKey, KeyData, #state{mod=Mod, modstate=ModState, mapcache=Cache}=State, VNode) ->
+    {Reply, NewState} = case do_map(QTerm, BKey, Mod, ModState, KeyData, Cache, VNode, ClientPid) of
+                            executing ->
+                                {{mapexec_reply, executing, self()}, State};
+                            {ok, Retval} ->
+                                {{mapexec_reply, Retval, self()}, State};
+                            {error, Error} ->
+                                {{mapexec_error, self(), Error}, State}
                         end,
     gen_fsm:send_event(ClientPid, Reply),
     NewState.
 handle_info(ok, StateName, StateData) ->
     {next_state, StateName, StateData}.
 %% @private
-terminate(_Reason, _StateName, #state{mapstate=MapState}) ->
-    riak_mapper:terminate(MapState).
+terminate(_Reason, _StateName, _State) ->
+    ok.
+
+do_map({erlang, {map, FunTerm, Arg, _Acc}}, BKey, Mod, ModState, KeyData, Cache, VNode, _ClientPid) ->
+    CacheKey = build_key(FunTerm, Arg, KeyData),
+    CacheVal = cache_fetch(FunTerm, BKey, CacheKey, Cache),
+    case CacheVal of
+        not_cached ->
+            uncached_map(BKey, Mod, ModState, FunTerm, Arg, KeyData, VNode);
+        CV ->
+            {ok, CV}
+    end;
+do_map({javascript, {map, FunTerm, Arg, _}=QTerm}, BKey, Mod, ModState, KeyData, _Cache, _VNode, ClientPid) ->
+    riak_eventer:notify(riak_vnode, uncached_map, {FunTerm, Arg, BKey}),
+    V = case Mod:get(ModState, BKey) of
+            {ok, Binary} ->
+                binary_to_term(Binary);
+            {error, notfound} ->
+                {error, notfound}
+        end,
+    riak_js_manager:dispatch({ClientPid, QTerm, V, KeyData}),
+    executing.
+
+build_key({modfun, CMod, CFun}, Arg, KeyData) ->
+    {CMod, CFun, Arg, KeyData};
+build_key(_, _, _) ->
+    no_key.
+
+cache_fetch({qfun, _}, _BKey, _CacheKey, _MapState) ->
+    not_cached;
+cache_fetch({modfun, _CMod, _CFun}, BKey, CacheKey, Cache) ->
+    case orddict:find(BKey, Cache) of
+        error -> not_cached;
+        {ok,CDict} ->
+            case orddict:find(CacheKey,CDict) of
+                error -> not_cached;
+                {ok,CVal} -> CVal
+            end
+    end.
+
+uncached_map(BKey, Mod, ModState, FunTerm, Arg, KeyData, VNode) ->
+    riak_eventer:notify(riak_vnode, uncached_map, {FunTerm, Arg, BKey}),
+    case Mod:get(ModState, BKey) of
+        {ok, Binary} ->
+            V = binary_to_term(Binary),
+            exec_map(V, FunTerm, Arg, BKey, KeyData, VNode);
+        {error, notfound} ->
+            exec_map({error, notfound}, FunTerm, Arg, BKey, KeyData, VNode);
+        X ->
+            {error, X}
+    end.
+
+exec_map(V, FunTerm, Arg, BKey, KeyData, VNode) ->
+    try
+        case FunTerm of
+            {qfun, F} -> {ok, (F)(V,KeyData,Arg)};
+            {modfun, M, F} ->
+                MF_Res = M:F(V,KeyData,Arg),
+                gen_fsm:send_event(VNode,
+                                   {mapcache, BKey,{M,F,Arg,KeyData},MF_Res}),
+                {ok, MF_Res}
+        end
+    catch C:R ->
+            Reason = {C, R, erlang:get_stacktrace()},
+            {error, Reason}
+    end.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.