Commits

Kevin Smith  committed 7211f7d

Turning on multipart/mixed for chunked/streamed M/R jobs

  • Participants
  • Parent commits 2b95d2b

Comments (0)

Files changed (1)

File apps/riak/src/mapred_resource.erl

 -define(INPUTS_TOKEN, <<"inputs">>).
 -define(DEFAULT_TIMEOUT, 30000).
 
--record(state, {client, inputs, mrquery}).
+-record(state, {client, inputs, mrquery, boundary}).
 
 init(_) ->
     {ok, undefined}.
                         Client:mapred_bucket_stream(Inputs, Query, Me,
                                                     ?DEFAULT_TIMEOUT)
                 end,
-            RD1 = wrq:set_resp_header("Content-Type", "application/json", RD),
-            {true, wrq:set_resp_body({stream, stream_mapred_results(RD1, ReqId)}, RD1), State};
+            Boundary = riak_util:unique_id_62(),
+            RD1 = wrq:set_resp_header("Content-Type", "multipart/mixed;boundary=" ++ Boundary, RD),
+            State1 = State#state{boundary=Boundary},
+            {true, wrq:set_resp_body({stream, stream_mapred_results(RD1, ReqId, State1)}, RD1), State1};
         Param when Param =:= "false";
                    Param =:= undefined ->
             Results = if is_list(Inputs) ->
                          is_binary(Inputs) ->
                               Client:mapred_bucket(Inputs, Query)
                       end,
+            RD1 = wrq:set_resp_header("Content-Type", "application/json", RD),
             case Results of
                 {ok, Result} ->
-                    RD1 = wrq:set_resp_header("Content-Type", "application/json", RD),
                     {true, wrq:set_resp_body(mochijson2:encode(Result), RD1), State};
                 Error ->
                     error_logger:error_report(Error),
-                    {{halt, 500}, send_error(Error, RD), State}
+                    {{halt, 500}, send_error(Error, RD1), State}
             end
     end.
 
 %% Internal functions
 send_error(Error, RD)  ->
-    RD1 = wrq:set_resp_header("Content-Type", "application/json", RD),
-    wrq:set_resp_body(format_error(Error), RD1).
+    wrq:set_resp_body(format_error(Error), RD).
 
 format_error({error, Message}=Error) when is_atom(Message);
                                     is_binary(Message),
 format_error(_Error) ->
     mochijson2:encode({struct, [{error, map_reduce_error}]}).
 
-stream_mapred_results(RD, ReqId) ->
+stream_mapred_results(RD, ReqId, State) ->
     receive
-        {ReqId, done} -> {<<"">>, done};
+        {ReqId, done} -> {iolist_to_binary(["\n--", State#state.boundary, "--\n"]), done};
         {ReqId, {mr_results, Res}} ->
-            Body = mochijson2:encode(Res),
-            {iolist_to_binary(Body), fun() -> stream_mapred_results(RD, ReqId) end};
+            Data = mochijson2:encode(Res),
+            Body = ["\n--", State#state.boundary, "\n",
+                    "Content-Type: application/json\n\n",
+                    Data],
+            {iolist_to_binary(Body), fun() -> stream_mapred_results(RD, ReqId, State) end};
         {ReqId, {error, _}=Error} ->
             {format_error(Error), done}
     after ?DEFAULT_TIMEOUT ->