Commits

Kevin Smith committed 6f3c9a6

First pass at JS reduce functions

  • Participants
  • Parent commits 779150a

Comments (0)

Files changed (2)

File apps/js_data/src/jsd_generator.erl

 generate(Client, Count) ->
     rand_init(),
     Records = generate_data(Client, Count, []),
-    lists:foreach(fun(Record) ->
-                          Obj = riak_object:new(<<"customers">>, <<"customer_list">>, Record),
-                          Client:put(Obj, 1) end, Records).
+    Client:put(riak_object:new(<<"customers">>, <<"customer_list">>, Records), 1).
 
 %% Internal functions
 generate_data(_Client, 0, Accum) ->

File apps/riak/src/riak_reduce_phase_fsm.erl

 %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 %% KIND, either express or implied.  See the License for the
 %% specific language governing permissions and limitations
-%% under the License.    
+%% under the License.
 
 -module(riak_reduce_phase_fsm).
 -behaviour(gen_fsm).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
--export([wait/2]). 
+-export([wait/2]).
 
--record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input}).
+-record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input, jsctx}).
 
 start_link(_Ring,QTerm,NextFSM,Coordinator) ->
     gen_fsm:start_link(?MODULE, [QTerm,NextFSM,Coordinator], []).
 init([QTerm,NextFSM,Coordinator]) ->
     {_,_,_,Acc} = QTerm,
     riak_eventer:notify(riak_reduce_phase_fsm, reduce_start, start),
+    {ok, Ctx} = js_driver:new(),
     {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,fresh_input=false,
-                    coord=Coordinator,acc=Acc,reduced=[]}}.
+                    coord=Coordinator,acc=Acc,reduced=[], jsctx=Ctx}}.
 
 wait(timeout, StateData=#state{next_fsm=NextFSM,done=Done,
                                acc=Acc,fresh_input=Fresh,
                                qterm={reduce,FunTerm,Arg,_Acc},
-                               coord=Coord,reduced=Reduced}) ->
+                               coord=Coord,reduced=Reduced,
+                               jsctx=JsCtx}) ->
     {Res,Red} = case Fresh of
         false ->
             {{next_state, wait, StateData#state{reduced=Reduced}},Reduced};
             try
                 NewReduced = case FunTerm of
                                  {qfun,F} -> F(Reduced,Arg);
-                                 {modfun,M,F} -> M:F(Reduced,Arg)
+                                 {modfun,M,F} -> M:F(Reduced,Arg);
+                                 {jsfun, F} -> invoke_js(JsCtx, <<"riak_reducer">>, F, Reduced, Arg)
                              end,
                 {{next_state, wait, StateData#state{reduced=NewReduced}},
                  NewReduced}
         true ->
             case NextFSM of
                 final -> nop;
-                _ -> 
+                _ ->
                     gen_fsm:send_event(NextFSM, {input, Red}),
                     gen_fsm:send_event(NextFSM, done)
             end,
 handle_info(_Info, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 %% @private
-terminate(Reason, _StateName, _State) ->
+terminate(Reason, _StateName, State) ->
     riak_eventer:notify(riak_reduce_phase_fsm, phase_end, Reason),
+    js_driver:destroy(State#state.jsctx),
     Reason.
+
 %% @private
 code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
 
+%% @private
+invoke_js(JsCtx, FunName, FunSource, Reduced, Arg) ->
+    js:define(JsCtx, list_to_binary(["var ", FunName, "=", FunSource])),
+    case js:call(JsCtx, FunName, [Reduced, Arg]) of
+        {ok, Result} ->
+            Result;
+        Error ->
+            Error
+    end.