Commits

Anonymous committed 355d731

Hooking up pre-defined Javascript functions, aka jsfuns, to vnode mapcache

Comments (0)

Files changed (3)

apps/js_data/src/mrstress.erl

     Owner ! done,
     ok;
 stress(javascript, Count, Client, Owner, Inputs, InputSize) ->
-    M = <<"function(v, _, _) { var value = v[\"values\"][0][\"data\"]; return [parseInt(value)]; }">>,
+    %M = <<"function(v, _, _) { var value = v[\"values\"][0][\"data\"]; return [parseInt(value)]; }">>,
     R = <<"function(v, _) { var sum = 0; v.forEach(function(x) { sum = sum + x; }); return [sum]; }">>,
+    R1 = <<"function(values, _) { return values.map(function(v) { return parseInt(v); }); }">>,
     Selected = select_inputs(Inputs, InputSize, []),
     Start = erlang:now(),
-    case Client:mapred(Selected, [{map, {jsanon, M}, none, false},
+    case Client:mapred(Selected, [{map, {jsfun, <<"Riak.mapValues">>}, none, false},
+                                  {reduce, {jsanon, R1}, none, false},
                                   {reduce, {jsanon, R}, none, true}]) of
         {ok, [InputSize]} ->
             End = erlang:now(),

apps/riak/src/riak_js_vm.erl

 
 handle_cast({dispatch, Requestor, _JobId, {FsmPid, {map, {jsanon, JS}, Arg, _Acc},
                                             Value,
-                                            KeyData}}, #state{ctx=Ctx}=State) ->
+                                            KeyData, _BKey}}, #state{ctx=Ctx}=State) ->
     {Result, NewState} = case define_anon_js(map, JS, State) of
                              {ok, State1} ->
                                  JsonValue = jsonify_object(Value),
     end;
 handle_cast({dispatch, Requestor, _JobId, {FsmPid, {map, {jsfun, JS}, Arg, _Acc},
                                             Value,
-                                            KeyData}}, #state{ctx=Ctx}=State) ->
+                                            KeyData, BKey}}, #state{ctx=Ctx}=State) ->
     JsonValue = jsonify_object(Value),
     JsonArg = jsonify_arg(Arg),
     case invoke_js(Ctx, JS, [JsonValue, KeyData, JsonArg]) of
         {ok, R} ->
+            %% Requestor should be the dispatching vnode
+            gen_fsm:send_event(Requestor, {mapcache, BKey, {JS, Arg, KeyData}, R}),
             gen_fsm:send_event(FsmPid, {mapexec_reply, R, Requestor});
         {error, Error} ->
             gen_fsm:send_event(FsmPid, {mapexec_error_noretry, Requestor, Error})

apps/riak/src/riak_vnode.erl

              ?TIMEOUT};
         TargetNode ->
             case net_adm:ping(TargetNode) of
-                pang -> 
+                pang ->
                     {next_state,active,StateData,?TIMEOUT};
-                pong -> 
+                pong ->
                     case HQ of
                         not_in_handoff ->
                             do_handoff(TargetNode, StateData);
     do_delete(From, BKey, ReqID, StateData),
     {next_state,
      active,StateData#state{mapcache=orddict:erase(BKey,Cache)},?TIMEOUT};
+active({mapcache, BKey,{FunName,Arg,KeyData},MF_Res},
+       StateData=#state{mapcache=Cache}) ->
+    KeyCache0 = case orddict:find(BKey, Cache) of
+        error -> orddict:new();
+        {ok,CDict} -> CDict
+    end,
+    KeyCache = orddict:store({FunName,Arg,KeyData},MF_Res,KeyCache0),
+    {next_state,active,
+     StateData#state{mapcache=orddict:store(BKey,KeyCache,Cache)},?TIMEOUT};
 active({mapcache, BKey,{M,F,Arg,KeyData},MF_Res},
        StateData=#state{mapcache=Cache}) ->
     KeyCache0 = case orddict:find(BKey, Cache) of
             {reply, ok, StateName, StateData, ?TIMEOUT};
         {error, Err} ->
             error_logger:error_msg("Error storing handoff obj: ~p~n", [Err]),
-            {reply, {error, Err}, StateName, StateData, ?TIMEOUT}                   
+            {reply, {error, Err}, StateName, StateData, ?TIMEOUT}
     end;
 handle_sync_event(_Even, _From, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 
 do_map({erlang, {map, FunTerm, Arg, _Acc}}, BKey, Mod, ModState, KeyData, Cache, VNode, _ClientPid) ->
     CacheKey = build_key(FunTerm, Arg, KeyData),
-    CacheVal = cache_fetch(FunTerm, BKey, CacheKey, Cache),
+    CacheVal = cache_fetch(BKey, CacheKey, Cache),
     case CacheVal of
         not_cached ->
             uncached_map(BKey, Mod, ModState, FunTerm, Arg, KeyData, VNode);
         CV ->
             {ok, CV}
     end;
-do_map({javascript, {map, FunTerm, Arg, _}=QTerm}, BKey, Mod, ModState, KeyData, _Cache, _VNode, ClientPid) ->
+do_map({javascript, {map, FunTerm, Arg, _}=QTerm}, BKey, Mod, ModState, KeyData, Cache, _VNode, ClientPid) ->
     riak_eventer:notify(riak_vnode, uncached_map, {FunTerm, Arg, BKey}),
-    V = case Mod:get(ModState, BKey) of
-            {ok, Binary} ->
-                binary_to_term(Binary);
-            {error, notfound} ->
-                {error, notfound}
-        end,
-    riak_js_manager:dispatch({ClientPid, QTerm, V, KeyData}),
-    map_executing.
+    CacheKey = build_key(FunTerm, Arg, KeyData),
+    CacheVal = cache_fetch(BKey, CacheKey, Cache),
+    case CacheVal of
+        not_cached ->
+            V = case Mod:get(ModState, BKey) of
+                    {ok, Binary} ->
+                        binary_to_term(Binary);
+                    {error, notfound} ->
+                        {error, notfound}
+                end,
+            riak_js_manager:dispatch({ClientPid, QTerm, V, KeyData, BKey}),
+            map_executing;
+        CV ->
+            {ok, CV}
+    end.
 
 build_key({modfun, CMod, CFun}, Arg, KeyData) ->
     {CMod, CFun, Arg, KeyData};
+build_key({jsfun, FunName}, Arg, KeyData) ->
+    {FunName, Arg, KeyData};
 build_key(_, _, _) ->
     no_key.
 
-cache_fetch({qfun, _}, _BKey, _CacheKey, _MapState) ->
+cache_fetch(_BKey, no_key, _Cache) ->
     not_cached;
-cache_fetch({modfun, _CMod, _CFun}, BKey, CacheKey, Cache) ->
+cache_fetch(BKey, CacheKey, Cache) ->
     case orddict:find(BKey, Cache) of
         error -> not_cached;
         {ok,CDict} ->
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.