Commits

Anonymous committed a6b3f25

use new {stream, Size, Fun} webmachine body to do all body production (bz://487)

  • Participants
  • Parent commits 149af34

Comments (0)

Files changed (2)

File rebar.config

 {deps, [{skerl, "0\.1",
          {hg, "ssh://hg@bitbucket.org/basho/skerl", "1233b8462076"}},
         {webmachine, "1.7.1",
-         {hg, "http://bitbucket.org/basho/webmachine", "webmachine-1.7.1"}},
+         {hg, "http://bitbucket.org/basho/webmachine", "f103deda4daf"}},
         {riak_kv, "0.12.0",
          {hg, "http://bitbucket.org/basho/riak_kv", "92b956595db4"}}]}.
 

File src/luwak_wm_file.erl

          create_path/2,
          process_post/2,
          produce_doc_body/2,
-         produce_single_byterange/2,
-         produce_byteranges/2,
          accept_doc_body/2,
          delete_resource/2
         ]).
               prefix,       %% string() - prefix for resource uris
               handle,       %% {ok, riak_object()}|{error, term()}
                             %%   - the object found
-              method,       %% atom() - HTTP method for the request
-              ranges        %% [byterange()] - parsed Range header
+              method        %% atom() - HTTP method for the request
              }).
 
 -include_lib("webmachine/include/webmachine.hrl").
                                    RD)),
              HCtx};
         _ ->
-            case extract_ranges(RD) of
-                {ok, Ranges} ->
-                    {false, RD, HCtx#ctx{ranges=Ranges}};
-                {error, Invalid} ->
-                    {true, invalid_range_message(RD, Invalid), HCtx}
-            end
+            {false, RD, HCtx}
     end.
 
 %% @spec content_types_provided(reqdata(), context()) ->
 content_types_provided(RD, Ctx=#ctx{method=Method}=Ctx) when Method =:= 'PUT';
                                                              Method =:= 'POST' ->
     {ContentType, _} = extract_content_type(RD),
-    {ctypes_or_byteranges(ContentType, Ctx), RD, Ctx};
+    {[{ContentType, produce_doc_body}], RD, Ctx};
 content_types_provided(RD, Ctx0) ->
     case defined_attribute(Ctx0, ?MD_CTYPE) of
         {undefined, Ctx} ->
-            {ctypes_or_byteranges("application/octet-stream", Ctx), RD, Ctx};
+            {[{"application/octet-stream", produce_doc_body}], RD, Ctx};
         {Ctype, Ctx} ->
-            {ctypes_or_byteranges(Ctype, Ctx), RD, Ctx}
+            {[{Ctype, produce_doc_body}], RD, Ctx}
     end.
 
-ctypes_or_byteranges(CType, #ctx{ranges=[]}) ->
-    [{CType, produce_doc_body}];
-ctypes_or_byteranges(CType, #ctx{ranges=[_]}) ->
-    [{CType, produce_single_byterange}];
-ctypes_or_byteranges(_CType, #ctx{ranges=[_|_]}) ->
-    [{"multipart/byteranges", produce_byteranges}].
-
 %% @spec charsets_provided(reqdata(), context()) ->
 %%          {no_charset|[{Charset::string(), Producer::function()}],
 %%           reqdata(), context()}
                 end,
                 mochiweb_headers:to_list(wrq:req_headers(RD))).
 
-%% @spec extract_ranges(reqdata()) -> {ok, [byterange()]}|{error, [string()]}
-%% @type byterange() = {integer(), integer()}
-%%                    |{integer(), eof}
-%%                    |{suffix, integer()}
-extract_ranges(RD) ->
-    case wrq:get_req_header(?HEAD_RANGE, RD) of
-        undefined ->
-            {ok, []};
-        "bytes="++RawHeader ->
-            RawRanges = string:tokens(RawHeader, ", "),
-            Parsed = [ {catch parse_range(R), R} || R <- RawRanges ],
-            case [ R || {{'EXIT',_}, R} <- Parsed ] of
-                [] ->
-                    %% TODO: merge overlapping ranges
-                    {ok, [ P || {P, _} <- Parsed ]};
-                Errors ->
-                    {error, Errors}
-            end;
-        Invalid ->
-            {error, [Invalid]}
-    end.
-
-parse_range([$-|R]) ->
-    {suffix, list_to_integer(R)};
-parse_range(R) ->
-    case string:tokens(R, "-") of
-        [Start] ->
-            {list_to_integer(Start), eof};
-        [Start, End] ->
-            {list_to_integer(Start), list_to_integer(End)}
-    end.
-
-concrete_range(C, H, {suffix, Length}) ->
-    FileLength = luwak_file:length(C, H),
-    {FileLength-Length, Length};
-concrete_range(C, H, {Offset, eof}) ->
-    FileLength = luwak_file:length(C, H),
-    {Offset, FileLength-Offset};
-concrete_range(_C, _H, {Start, End}) ->
-    %% HTTP byte range is inclusive, luwak's is not
-    {Start, 1+End-Start}.
-
 %% @spec produce_doc_body(reqdata(), context()) -> {binary(), reqdata(), context()}
 %% @doc Extract the value of the document, and place it in the response
 %%      body of the request.
 produce_doc_body(RD, Ctx=#ctx{handle={ok, H}, client=C}) ->
-    {send_file(C, H, 0, luwak_file:length(C, H)),
+    {{stream, luwak_file:length(C, H), file_sender(C, H)},
      add_user_metadata(RD, H),
      Ctx}.
 
-produce_single_byterange(RD, Ctx=#ctx{handle={ok, H},
-                                      client=C,
-                                      ranges=[Range]}) ->
-    {Start, End} = concrete_range(C, H, Range),
-    FileLength = luwak_file:length(C, H),
-    RangeHead = content_range_header(Start, End, FileLength),
-    CLRD = wrq:set_resp_header(?HEAD_CRANGE, RangeHead, RD),
-    {send_file(C, H, Start, End),
-     add_user_metadata(CLRD, H),
-     Ctx}.
-
-produce_byteranges(RD, Ctx=#ctx{handle={ok, H},
-                                client=C,
-                                ranges=Ranges}) ->
-    CRanges = [ concrete_range(C, H, R) || R <- Ranges ],
-    FileLength = luwak_file:length(C, H),
-    Boundary = riak_core_util:unique_id_62(),
-    BRD = wrq:set_resp_header(?HEAD_CTYPE,
-                              "multipart/byteranges; boundary="++Boundary,
-                              RD),
-    {{stream, multi_send_file(C, H, FileLength, Boundary, CRanges)},
-     add_user_metadata(BRD, H),
-     Ctx}.
-
 add_user_metadata(RD, Handle) ->
     Attr = luwak_file:get_attributes(Handle),
     case dict:find(?MD_USERMETA, Attr) of
         error -> RD
     end.
 
-content_range_header(Start, End, Length) ->
-    io_lib:format("bytes=~b-~b/~b", [Start, Start+End-1, Length]).
-
-send_file(Client, Handle, Start, End) ->
-    Stream = luwak_get_stream:start(Client, Handle, Start, End),
-    {stream, (send_file_helper(Stream))()}.
+file_sender(C, H) ->
+    fun(Start, End) ->
+            %% HTTP specifies the last byte to send,
+            %% but luwak wants a number of bytes after offset
+            Stream = luwak_get_stream:start(C, H, Start, 1+End-Start),
+            (send_file_helper(Stream))()
+    end.
 
 -define(STREAM_TIMEOUT, 5000).
 
             end
     end.
 
-%% basic strategy of multi_send_file is to wrap up calls to
-%% send_file in thunks, so that as each range finishes sending
-%% we can start the next one
-multi_send_file(_C, _H, _Length, Boundary, []) ->
-    {[<<"\r\n--">>, Boundary, <<"--\r\n">>], done};
-multi_send_file(C, H, Length, Boundary, [{Start, End}|Rest]) ->
-    {stream, {Data, Thunk}} = send_file(C, H, Start, End),
-    {[multipart_header(Boundary, Start, End, Length), Data],
-     fun() ->
-             multi_send_helper(C, H, Length, Boundary, Rest, Thunk)
-     end}.
-
-multi_send_helper(C, H, Length, Boundary, Rest, done) ->
-    multi_send_file(C, H, Length, Boundary, Rest);
-multi_send_helper(C, H, Length, Boundary, Rest, Thunk) ->
-    {Data, NewThunk} = Thunk(),
-    {Data,
-     fun() ->
-             multi_send_helper(C, H, Length, Boundary, Rest, NewThunk)
-     end}.
-
-multipart_header(Boundary, Start, End, Length) ->
-    [<<"\r\n--">>, Boundary, <<"\r\n">>,
-     ?HEAD_CRANGE, <<": ">>,
-     content_range_header(Start, End, Length), <<"\r\n\r\n">>].
-
 %% @spec ensure_doc(context()) -> context()
 %% @doc Ensure that the 'doc' field of the context() has been filled
 %%      with the result of a riak_client:get request.  This is a
                     Reason
             end,
     wrq:append_to_response_body(Error, RD1).
-
-invalid_range_message(RD, Ranges) ->
-    RD1 = wrq:set_resp_header("Content-Type", "text/plain", RD),
-    wrq:append_to_response_body(
-      io_lib:format("The Range header specified invalid ranges: ~p",
-                    [Ranges]),
-      RD1).