riak / src / riak_client.erl

Diff from to

src/riak_client.erl

 -author('Justin Sheehy <justin@basho.com>').
 
 -export([mapred/2,mapred/3]).
+-export([mapred_stream/2,mapred_stream/3]).
 -export([get/3,get/4]).
 -export([put/2,put/3,put/4]).
 -export([delete/3,delete/4]).
 
 %% @spec mapred(Inputs :: list(),
 %%              Query :: [riak_mapreduce_fsm:mapred_queryterm()],
-%%              TimeoutMillisecs :: integer()) ->
+%%              TimeoutMillisecs :: integer()  | 'infinity') ->
 %%       {ok, riak_mapreduce_fsm:mapred_result()} |
 %%       {error, {bad_qterm, riak_mapreduce_fsm:mapred_queryterm()}} |
 %%       {error, timeout} |
 %% @doc Perform a map/reduce job across the cluster.
 %%      See the map/reduce documentation for explanation of behavior.
 mapred(Inputs,Query,Timeout)
-  when is_list(Inputs), is_list(Query), is_integer(Timeout) ->
+  when is_list(Inputs), is_list(Query),
+       (is_integer(Timeout) orelse Timeout =:= infinity) ->
     Me = self(),
+    {ok, {ReqId, MR_FSM}} = mapred_stream(Query,Me,Timeout),
+    gen_fsm:send_event(MR_FSM,{input,Inputs}),
+    gen_fsm:send_event(MR_FSM,input_done),
+    collect_mr_results(ReqId, Timeout, []).
+
+%% @spec mapred_stream(Query :: [riak_mapreduce_fsm:mapred_queryterm()],
+%%                     ClientPid :: pid()) ->
+%%       {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
+%%       {error, {bad_qterm, riak_mapreduce_fsm:mapred_queryterm()}} |
+%%       {error, Err :: term()}
+%% @doc Perform a streaming map/reduce job across the cluster.
+%%      See the map/reduce documentation for explanation of behavior.
+mapred_stream(Query,ClientPid) ->
+    mapred_stream(Query,ClientPid,?DEFAULT_TIMEOUT).
+
+%% @spec mapred_stream(Query :: [riak_mapreduce_fsm:mapred_queryterm()],
+%%                     ClientPid :: pid(),
+%%                     TimeoutMillisecs :: integer() | 'infinity') ->
+%%       {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
+%%       {error, {bad_qterm, riak_mapreduce_fsm:mapred_queryterm()}} |
+%%       {error, Err :: term()}
+%% @doc Perform a streaming map/reduce job across the cluster.
+%%      See the map/reduce documentation for explanation of behavior.
+mapred_stream(Query,ClientPid,Timeout)
+  when is_list(Query), is_pid(ClientPid),
+       (is_integer(Timeout) orelse Timeout =:= infinity) ->
     ReqId = mk_reqid(),
-    spawn(Node, riak_mapreduce_fsm, start, [ReqId,Inputs,Query,Timeout,Me]),
-    wait_for_reqid(ReqId, Timeout).
+    {ok, MR_FSM} = rpc:call(Node, riak_mapreduce_fsm, start,
+                            [ReqId,Query,Timeout,ClientPid]),
+    {ok, {ReqId, MR_FSM}}.
 
 %% @spec get(riak_object:bucket(), riak_object:key(), R :: integer()) ->
 %%       {ok, riak_object:riak_object()} |
     after Timeout ->
             {error, timeout}
     end.
+
+%% @private
+collect_mr_results(ReqId, Timeout, Acc) ->
+    receive
+        {ReqId, done} -> {ok, Acc};
+        {ReqId,{mr_results,Res}} -> collect_mr_results(ReqId,Timeout,Acc++Res)
+    after Timeout ->
+            {error, timeout}
+    end.
+
+
+            
+            
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.