Commits

Bryan Fink  committed 6c7a8fa

enable "whole bucket" map/reduce by passing the bucket name as the value of the "inputs" field

{"inputs":"mybucket","query":[...]}

  • Participants
  • Parent commits 3ae4f56

Comments (0)

Files changed (3)

File apps/riak/src/mapred_resource.erl

     {ok, Client} = riak:local_client(),
     case wrq:get_qs_value("chunked", RD) of
         "true" ->
-            {ok, {ReqId, FSM}} = Client:mapred_stream(Query, Me, ?DEFAULT_TIMEOUT),
-            gen_fsm:send_event(FSM,{input, Inputs }),
-            gen_fsm:send_event(FSM,input_done),
+            {ok, ReqId} = 
+                if is_list(Inputs) ->
+                        {ok, {RId, FSM}} = Client:mapred_stream(Query, Me,
+                                                                ?DEFAULT_TIMEOUT),
+                        gen_fsm:send_event(FSM,{input, Inputs }),
+                        gen_fsm:send_event(FSM,input_done),
+                        {ok, RId};
+                   is_binary(Inputs) ->
+                        Client:mapred_bucket_stream(Inputs, Query, Me,
+                                                    ?DEFAULT_TIMEOUT)
+                end,
             RD1 = wrq:set_resp_header("Content-Type", "application/json", RD),
             {true, wrq:set_resp_body({stream, stream_mapred_results(RD1, ReqId)}, RD1), State};
         Param when Param =:= "false";
                    Param =:= undefined ->
-            case Client:mapred(Inputs, Query) of
+            Results = if is_list(Inputs) ->
+                              Client:mapred(Inputs, Query);
+                         is_binary(Inputs) ->
+                              Client:mapred_bucket(Inputs, Query)
+                      end,
+            case Results of
                 {ok, Result} ->
                     RD1 = wrq:set_resp_header("Content-Type", "application/json", RD),
                     {true, wrq:set_resp_body(mochijson2:encode(Result), RD1), State};

File apps/riak/src/riak_mapred_json.erl

 
 -export([parse_inputs/1, parse_query/1]).
 
+parse_inputs(Bucket) when is_binary(Bucket) ->
+    {ok, Bucket};
 parse_inputs(Targets) ->
     parse_inputs(Targets, []).
 

File doc/js-mapreduce.org

    the form =[Bucket,Key]= or 3-element lists of the form
    =[Bucket,Key,KeyData]=.
 
+   You may also pass just the name of a bucket
+   (={"inputs":"mybucket",...}=), which is equivalent to passing all
+   of the keys in that bucket as inputs (i.e. "a map/reduce across the
+   whole bucket").  You should be aware that this triggers the
+   somewhat expensive "list keys" operation, so you should use it
+   sparingly.
+
 ** Query
 
    The query is given as a list of phases, each phase being of the