Commits

Anonymous committed 1f04921

mapreduce, now with streaming I/O

  • Participants
  • Parent commits 00294c6

Comments (0)

Files changed (2)

src/riak_client.erl

 -author('Justin Sheehy <justin@basho.com>').
 
 -export([mapred/2,mapred/3]).
+-export([mapred_stream/2,mapred_stream/3]).
 -export([get/3,get/4]).
 -export([put/2,put/3,put/4]).
 -export([delete/3,delete/4]).
 
 %% @spec mapred(Inputs :: list(),
 %%              Query :: [riak_mapreduce_fsm:mapred_queryterm()],
-%%              TimeoutMillisecs :: integer()) ->
+%%              TimeoutMillisecs :: integer()  | 'infinity') ->
 %%       {ok, riak_mapreduce_fsm:mapred_result()} |
 %%       {error, {bad_qterm, riak_mapreduce_fsm:mapred_queryterm()}} |
 %%       {error, timeout} |
 %% @doc Perform a map/reduce job across the cluster.
 %%      See the map/reduce documentation for explanation of behavior.
 mapred(Inputs,Query,Timeout)
-  when is_list(Inputs), is_list(Query), is_integer(Timeout) ->
+  when is_list(Inputs), is_list(Query),
+       (is_integer(Timeout) orelse Timeout =:= infinity) ->
     Me = self(),
+    {ok, {ReqId, MR_FSM}} = mapred_stream(Query,Me,Timeout),
+    gen_fsm:send_event(MR_FSM,{input,Inputs}),
+    gen_fsm:send_event(MR_FSM,input_done),
+    collect_mr_results(ReqId, Timeout, []).
+
+%% @spec mapred_stream(Query :: [riak_mapreduce_fsm:mapred_queryterm()],
+%%                     ClientPid :: pid()) ->
+%%       {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
+%%       {error, {bad_qterm, riak_mapreduce_fsm:mapred_queryterm()}} |
+%%       {error, Err :: term()}
+%% @doc Perform a streaming map/reduce job across the cluster.
+%%      See the map/reduce documentation for explanation of behavior.
+mapred_stream(Query,ClientPid) ->
+    mapred_stream(Query,ClientPid,?DEFAULT_TIMEOUT).
+
+%% @spec mapred_stream(Query :: [riak_mapreduce_fsm:mapred_queryterm()],
+%%                     ClientPid :: pid(),
+%%                     TimeoutMillisecs :: integer() | 'infinity') ->
+%%       {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
+%%       {error, {bad_qterm, riak_mapreduce_fsm:mapred_queryterm()}} |
+%%       {error, Err :: term()}
+%% @doc Perform a streaming map/reduce job across the cluster.
+%%      See the map/reduce documentation for explanation of behavior.
+mapred_stream(Query,ClientPid,Timeout)
+  when is_list(Query), is_pid(ClientPid),
+       (is_integer(Timeout) orelse Timeout =:= infinity) ->
     ReqId = mk_reqid(),
-    spawn(Node, riak_mapreduce_fsm, start, [ReqId,Inputs,Query,Timeout,Me]),
-    wait_for_reqid(ReqId, Timeout).
+    {ok, MR_FSM} = rpc:call(Node, riak_mapreduce_fsm, start,
+                            [ReqId,Query,Timeout,ClientPid]),
+    {ok, {ReqId, MR_FSM}}.
 
 %% @spec get(riak_object:bucket(), riak_object:key(), R :: integer()) ->
 %%       {ok, riak_object:riak_object()} |
     after Timeout ->
             {error, timeout}
     end.
+
+%% @private
+collect_mr_results(ReqId, Timeout, Acc) ->
+    receive
+        {ReqId, done} -> {ok, Acc};
+        {ReqId,{mr_results,Res}} -> collect_mr_results(ReqId,Timeout,Acc++Res)
+    after Timeout ->
+            {error, timeout}
+    end.
+
+
+            
+            

src/riak_mapreduce_fsm.erl

 -module(riak_mapreduce_fsm).
 -behaviour(gen_fsm).
 
--export([start/5]).
+-export([start/4]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 -export([wait/2]). 
 
--record(state, {client, reqid, acc, fsms, starttime, endtime, ring}).
+-record(state, {client,reqid,fsms,starttime,timeout,ring,input_done}).
 
-start(ReqId,Inputs,Query,Timeout,Client) ->
-    gen_fsm:start(?MODULE, [ReqId,Inputs,Query,Timeout,Client], []).
+start(ReqId,Query,Timeout,Client) ->
+    gen_fsm:start(?MODULE, [ReqId,Query,Timeout,Client], []).
 %% @private
-init([ReqId,Inputs,Query,Timeout,Client]) ->
+init([ReqId,Query,Timeout,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
-    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start,
-              {ReqId, length(Inputs), length(Query)}),
+    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start, {ReqId, Query}),
     case check_query_syntax(Query) of
         ok ->
             FSMs = make_phase_fsms(Query, Ring), % Pid for each phase, in-order
-            gen_fsm:send_event(hd(FSMs), {input, Inputs}),
-            gen_fsm:send_event(hd(FSMs), done),
-            StateData = #state{client=Client,fsms=FSMs,acc=[],reqid=ReqId,
-                               starttime=riak_util:moment(),
-                               endtime=Timeout+riak_util:moment(),
-                               ring=Ring},
+            StateData = #state{client=Client,fsms=FSMs,reqid=ReqId,
+                               starttime=riak_util:moment(),timeout=Timeout,
+                               ring=Ring,input_done=false},
             {ok,wait,StateData,Timeout};
         {bad_qterm, QTerm} ->
             riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
     {ok, Pid} = PhaseMod:start_link(Ring, QTerm, NextFSM, self()),
     make_phase_fsms(Rest,Pid,[Pid|FSMs], Ring).
 
-wait({done,FSM}, StateData=#state{client=Client,acc=Acc,reqid=ReqId,
-                                  endtime=End,fsms=FSMs0}) ->
+wait({input,Inputs},
+     StateData=#state{reqid=ReqId,timeout=Timeout,fsms=FSMs}) ->
+    riak_eventer:notify(riak_mapreduce_fsm, mr_got_input,
+                        {ReqId, length(Inputs)}),
+    gen_fsm:send_event(hd(FSMs), {input, Inputs}),
+    {next_state, wait, StateData, Timeout};
+wait(input_done, StateData=#state{reqid=ReqId,fsms=FSMs}) ->
+    riak_eventer:notify(riak_mapreduce_fsm, mr_done_input, {ReqId}),
+    gen_fsm:send_event(hd(FSMs), done),
+    maybe_finish(StateData#state{input_done=true});
+wait({done,FSM}, StateData=#state{fsms=FSMs0}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done_msg, {FSM,FSMs0}),
     FSMs = lists:delete(FSM,FSMs0),
-    case FSMs of
-        [] -> 
-            riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
-                                {ok, ReqId, length(Acc)}),
-            Client ! {ReqId, {ok, Acc}},
-            {stop,normal,StateData};
-        _ ->
-            {next_state, wait, StateData#state{fsms=FSMs},
-             End-riak_util:moment()}
-    end;
+    maybe_finish(StateData#state{fsms=FSMs});
 wait({error, ErrFSM, ErrMsg}, StateData=#state{client=Client,reqid=ReqId,
                                                fsms=FSMs0}) ->
     FSMs = lists:delete(ErrFSM,FSMs0),
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {error, ReqId}),
     Client ! {ReqId, {error, ErrMsg}},
     {stop,normal,StateData};
-wait({acc,Data}, StateData=#state{acc=Acc,endtime=End}) ->
-    AccData = case Data of
-        {single, X} -> [X|Acc];
-        {list, X} -> X ++ Acc
+wait({acc,Data}, StateData=#state{reqid=ReqId,client=Client,timeout=Timeout}) ->
+    LData = case Data of
+        {single, X} -> [X];
+        {list, X} -> X
     end,
-    {next_state, wait, StateData#state{acc=AccData},End-riak_util:moment()};
+    Client ! {ReqId, {mr_results, LData}},
+    {next_state, wait, StateData,Timeout};
 wait(timeout, StateData=#state{reqid=ReqId,client=Client}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {timeout, ReqId}),
     Client ! {ReqId, {error, timeout}},
     {stop,normal,StateData}.
 
 %% @private
+maybe_finish(StateData=#state{input_done=Input_Done,fsms=FSMs,
+                client=Client,reqid=ReqId,timeout=Timeout}) ->
+    case Input_Done of
+        false ->
+            {next_state, wait, StateData, Timeout};
+        true ->
+            case FSMs of
+                [] ->
+                    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
+                                        {ok, ReqId}),
+                    Client ! {ReqId, done},
+                    {stop,normal,StateData};
+                _ ->
+                    {next_state, wait, StateData, Timeout}
+            end
+    end.
+
+%% @private
 handle_event(_Event, _StateName, StateData) ->
     {stop,badmsg,StateData}.