Commits

Anonymous committed 090b90d

Fixes to JSON map/reduce parser;Clean up to mapreduce resource (thanks Bryan!)

Comments (0)

Files changed (3)

apps/riak/src/mapred_resource.erl

 -module(mapred_resource).
 
 -export([init/1, service_available/2, allowed_methods/2]).
--export([malformed_request/2, process_post/2]).
+-export([malformed_request/2, process_post/2, content_types_provided/2]).
+-export([nop/2]).
 
 -include_lib("webmachine/include/webmachine.hrl").
 
                               end,
     {IsMalformed, RD, NewState}.
 
+content_types_provided(RD, State) ->
+    {[{"application/json", nop}], RD, State}.
+
+%% This should never get called
+nop(_RD, _State) ->
+    ok.
+
 process_post(RD, #state{targets=Targets, mrquery=Query}=State) ->
-    io:format("Processing body~n"),
     Me = self(),
     {ok, Client} = riak:local_client(),
     {ok, {ReqId, FSM}} = Client:mapred_stream(Query, Me, ?DEFAULT_TIMEOUT),
     gen_fsm:send_event(FSM,{input, Targets }),
     gen_fsm:send_event(FSM,input_done),
-    wrq:set_resp_header("Content-Type", "application/json", RD),
-    {true, wrq:set_resp_body({stream, stream_mapred_results(RD, ReqId)}, RD), State}.
+    RD1 = wrq:set_resp_header("Content-Type", "application/json", RD),
+    {true, wrq:set_resp_body({stream, stream_mapred_results(RD1, ReqId)}, RD1), State}.
 
 %% Internal functions
 stream_mapred_results(RD, ReqId) ->

apps/riak/src/riak_client.erl

   when is_list(Query), is_pid(ClientPid),
        (is_integer(Timeout) orelse Timeout =:= infinity) ->
     ReqId = mk_reqid(),
-    io:format("before~n"),
     {ok, MR_FSM} = rpc:call(Node, riak_mapreduce_fsm, start,
                             [ReqId,Query,Timeout,ClientPid]),
-    io:format("after~n"),
     {ok, {ReqId, MR_FSM}}.
 
 mapred_bucket_stream(Bucket, Query, ClientPid) ->

apps/riak/src/riak_mapred_json.erl

 
 -export([parse_targets/1, parse_query/1]).
 
-parse_targets({struct, Targets}) ->
+parse_targets(Targets) ->
     parse_targets(Targets, []).
 
 parse_targets([], Accum) ->
         true ->
             error
     end;
-parse_targets([{Bucket, Key}|T], Accum) when is_binary(Bucket),
+parse_targets([{struct, [{Bucket, Keys}]}|T], Accum) when is_binary(Bucket),
+                                                          is_list(Keys) ->
+    parse_targets(T, [{Bucket, Keys}|Accum]);
+parse_targets([{struct, [{Bucket, Key}]}|T], Accum) when is_binary(Bucket),
                                                          is_binary(Key) ->
     parse_targets(T, [{Bucket, Key}|Accum]);
-parse_targets([Bucket|T], Accum) when is_binary(Bucket) ->
-    parse_targets(T, [Bucket|Accum]).
+parse_targets(_, _Accum) ->
+    error.
 
 parse_query(Query) ->
     parse_query(Query, []).