Anonymous avatar Anonymous committed 4ea8fae

first working try at streaming response body API

Comments (0)

Files changed (3)

src/webmachine_decision_core.erl

         {halt, Code} -> respond(Code);
         nop -> d(v3o18b);
         _ -> wrcall({set_resp_body,
-                     encode_body(iolist_to_binary(FinalBody))}),
+                     encode_body(FinalBody)}),
              d(v3o18b)
     end;
 
     ChosenEnc = wrcall({get_metadata, 'content-encoding'}),
     Encoder = hd([Fun || {Enc,Fun} <- resource_call(encodings_provided),
                          ChosenEnc =:= Enc]),
-    Encoder(Charsetter(Body)).
+    case Body of
+        {stream, StreamBody} ->
+            {stream, make_encoder_stream(Encoder, Charsetter, StreamBody)};
+        _ ->
+            Encoder(Charsetter(iolist_to_binary(Body)))
+    end.
+
+make_encoder_stream(Encoder, Charsetter, {Body, done}) ->
+    {Encoder(Charsetter(Body)), done};
+make_encoder_stream(Encoder, Charsetter, {Body, Next}) ->
+    {Encoder(Charsetter(Body)),
+     fun() -> make_encoder_stream(Encoder, Charsetter, Next()) end}.
             
 choose_encoding(AccEncHdr) ->
     Encs = [Enc || {Enc,_Fun} <- resource_call(encodings_provided)],

src/webmachine_request_srv.erl

 handle_call({stream_req_body,_}, _From, State=#state{bodyfetch=standard}) ->
     {reply, stream_conflict, State};
 handle_call({stream_req_body, MaxHunk}, _From, State) ->
-    {reply, do_stream_body(State, MaxHunk), State#state{bodyfetch=stream}};
+    {reply, recv_stream_body(State, MaxHunk), State#state{bodyfetch=stream}};
 handle_call(resp_headers, _From, State) ->
     {reply, wrq:resp_headers(State#state.reqdata), State};
 handle_call(resp_redirect, _From, State) ->
       wrq:resp_headers(State#state.reqdata)), State}.
 
 send(Socket, Data) ->
-    case gen_tcp:send(Socket, Data) of
+    case gen_tcp:send(Socket, iolist_to_binary(Data)) of
 	ok -> ok;
 	{error,closed} -> ok;
 	_ -> exit(normal)
     end.
 
+send_stream_body(Socket, X) -> send_stream_body(Socket, X, 0).
+send_stream_body(Socket, {Data, done}, SoFar) ->
+    Size = send_chunk(Socket, Data),
+    send_chunk(Socket, <<>>),
+    Size + SoFar;
+send_stream_body(Socket, {Data, Next}, SoFar) ->
+    Size = send_chunk(Socket, Data),
+    send_stream_body(Socket, Next(), Size + SoFar).
+
+send_chunk(Socket, Data) ->
+    Size = iolist_size(Data),
+    send(Socket, mochihex:to_hex(Size)),
+    send(Socket, <<"\r\n">>),
+    send(Socket, Data),
+    send(Socket, <<"\r\n">>),
+    Size.
+
 send_ok_response(200, InitState) ->
     RD0 = InitState#state.reqdata,
     {Range, State} = get_range(InitState),
     end.
 
 send_response(Code, State=#state{reqdata=RD}) ->
-    Length = iolist_size([wrq:resp_body(RD)]),
+    Body0 = wrq:resp_body(RD),
+    {Body,Length} = case Body0 of
+        {stream, StreamBody} -> {StreamBody, chunked};
+        _ -> {Body0, iolist_size([Body0])}
+    end,
     send(State#state.socket,
 	 [make_version(wrq:version(RD)),
           make_code(Code), <<"\r\n">> | 
          make_headers(Code, Length, RD)]),
-    case wrq:method(RD) of 
-	'HEAD' -> ok;
-	_ -> send(State#state.socket, [wrq:resp_body(RD)])
+    FinalLength = case wrq:method(RD) of 
+	'HEAD' -> Length;
+	_ -> 
+            case Length of
+                chunked -> send_stream_body(State#state.socket, Body);
+                _ -> send(State#state.socket, Body), Length
+            end
     end,
     InitLogData = State#state.log_data,
     FinalLogData = InitLogData#wm_log_data{response_code=Code,
-					   response_length=Length},
+					   response_length=FinalLength},
     {ok, State#state{reqdata=wrq:set_response_code(Code, RD),
                      log_data=FinalLogData}}.
 
 %%      Will only receive up to the default max-body length
 do_recv_body(State=#state{reqdata=RD}) ->
     State#state{reqdata=wrq:set_req_body(
-          read_whole_stream(do_stream_body(State, ?MAX_RECV_BODY), []), RD)}.
+          read_whole_stream(recv_stream_body(State, ?MAX_RECV_BODY), []), RD)}.
 
 read_whole_stream({Hunk,Next}, Acc0) ->
     Acc = [Hunk|Acc0],
         _ -> read_whole_stream(Next(), Acc)
     end.
 
-do_stream_body(State = #state{reqdata=RD}, MaxHunkSize) ->
+recv_stream_body(State = #state{reqdata=RD}, MaxHunkSize) ->
     case get_header_value("expect", State) of
 	{"100-continue", _} ->
 	    send(State#state.socket, 
         {unknown_transfer_encoding, X} -> exit({unknown_transfer_encoding, X});
         undefined -> {<<>>, done};
         0 -> {<<>>, done};
-        chunked -> stream_chunked_body(State#state.socket, MaxHunkSize);
-        Length -> stream_unchunked_body(State#state.socket, MaxHunkSize, Length)
+        chunked -> recv_chunked_body(State#state.socket, MaxHunkSize);
+        Length -> recv_unchunked_body(State#state.socket, MaxHunkSize, Length)
     end.
 
-stream_unchunked_body(Socket, MaxHunk, DataLeft) ->
+recv_unchunked_body(Socket, MaxHunk, DataLeft) ->
     case MaxHunk >= DataLeft of
         true ->
             {ok,Data1} = gen_tcp:recv(Socket,DataLeft,?IDLE_TIMEOUT),
         false ->
             {ok,Data2} = gen_tcp:recv(Socket,MaxHunk,?IDLE_TIMEOUT),
             {Data2,
-             fun() -> stream_unchunked_body(
+             fun() -> recv_unchunked_body(
                         Socket, MaxHunk, DataLeft-MaxHunk)
              end}
     end.
     
-stream_chunked_body(Socket, MaxHunk) ->
+recv_chunked_body(Socket, MaxHunk) ->
     case read_chunk_length(Socket) of
         0 -> {<<>>, done};
-        ChunkLength -> stream_chunked_body(Socket,MaxHunk,ChunkLength)
+        ChunkLength -> recv_chunked_body(Socket,MaxHunk,ChunkLength)
     end.
-stream_chunked_body(Socket, MaxHunk, LeftInChunk) ->
+recv_chunked_body(Socket, MaxHunk, LeftInChunk) ->
     case MaxHunk >= LeftInChunk of
         true ->
             {ok,Data1} = gen_tcp:recv(Socket,LeftInChunk,?IDLE_TIMEOUT),
             {Data1,
-             fun() -> stream_chunked_body(Socket, MaxHunk)
+             fun() -> recv_chunked_body(Socket, MaxHunk)
              end};
         false ->
             {ok,Data2} = gen_tcp:recv(Socket,MaxHunk,?IDLE_TIMEOUT),
             {Data2,
-             fun() -> stream_chunked_body(Socket, MaxHunk, LeftInChunk-MaxHunk)
+             fun() -> recv_chunked_body(Socket, MaxHunk, LeftInChunk-MaxHunk)
              end}
     end.
 
                            LocNums, Data),
     {Bodies, Size};
 
+range_parts({stream, {Hunk,Next}}, Ranges) ->
+    % for now, streamed bodies are read in full for range requests
+    range_parts(read_whole_stream({Hunk,Next}, []), Ranges);
+
 range_parts(Body0, Ranges) ->
     Body = iolist_to_binary(Body0),
     Size = size(Body),
         304 ->
             mochiweb_headers:make(wrq:resp_headers(RD));
         _ -> 
-            mochiweb_headers:enter("Content-Length",integer_to_list(Length),
-                 mochiweb_headers:make(wrq:resp_headers(RD)))
+            case Length of
+                chunked ->
+                    mochiweb_headers:enter(
+                      "Transfer-Encoding","chunked",
+                      mochiweb_headers:make(wrq:resp_headers(RD)));
+                _ ->
+                    mochiweb_headers:enter(
+                      "Content-Length",integer_to_list(Length),
+                      mochiweb_headers:make(wrq:resp_headers(RD)))
+            end
     end,
     ServerHeader = "MochiWeb/1.1 WebMachine/" ++ ?WMVSN ++ " (" ++ ?QUIP ++ ")",
     WithSrv = mochiweb_headers:enter("Server", ServerHeader, Hdrs0),
 resp_headers(_RD = #wm_reqdata{resp_headers=RespH}) -> RespH. % mochiheaders
 
 resp_body(_RD = #wm_reqdata{resp_body=undefined}) -> undefined;
+resp_body(_RD = #wm_reqdata{resp_body={stream,X}}) -> {stream,X};
 resp_body(_RD = #wm_reqdata{resp_body=RespB}) when is_binary(RespB) -> RespB;
 resp_body(_RD = #wm_reqdata{resp_body=RespB}) -> iolist_to_binary(RespB).
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.