Commits

Anonymous committed 05f0a0c

Update stream/range to use new MI APIs

tags: az296

Refactor the MI backend to use the modified MI APIs. Still the
same semantics just a little different in how we go about doing
it.

  • Participants
  • Parent commits 6f7ab3d

Comments (0)

Files changed (2)

apps/riak_search/src/merge_index_backend.erl

     drop/1
 ]).
 
+-export([stream_worker/6,
+         range_worker/8]).
+
 -include_lib("riak_search/include/riak_search.hrl").
 
 % @type state() = term().
     riak_search_backend:info_response(Sender, Info1),
     noreply.
 
-stream(Index, Field, Term, FilterFun, Sender, State) ->
+stream(Index, Field, Term, Filter, Sender, State) ->
     Pid = State#state.pid,
-    OutputRef = make_ref(),
-    OutputPid = spawn_link(fun() -> result_loop(OutputRef, Sender) end),
-    merge_index:stream(Pid, Index, Field, Term, OutputPid, OutputRef, FilterFun),
+    spawn_link(?MODULE, stream_worker, [Pid, Index, Field,
+                                        Term, Filter, Sender]),
     noreply.
 
-range(Index, Field, StartTerm, EndTerm, Size, FilterFun, Sender, State) ->
+range(Index, Field, StartTerm, EndTerm, Size, Filter, Sender, State) ->
     Pid = State#state.pid,
-    OutputRef = make_ref(),
-    OutputPid = spawn_link(fun() -> result_loop(OutputRef, Sender) end),
-    merge_index:range(Pid, Index, Field, StartTerm, EndTerm, Size, OutputPid, OutputRef, FilterFun),
+    spawn_link(?MODULE, range_worker, [Pid, Index, Field, StartTerm,
+                                       EndTerm, Size, Filter, Sender]),
     noreply.
 
-result_loop(Ref, Sender) ->
-    receive
-        {result, {DocID, Props}, Ref} ->
-            riak_search_backend:response_results(Sender, [{DocID, Props}]),
-            result_loop(Ref, Sender);
-        {result_vec, ResultVec, Ref} ->
-            riak_search_backend:response_results(Sender, ResultVec),
-            result_loop(Ref, Sender);
-        {result, '$end_of_table', Ref} ->
-            riak_search_backend:response_done(Sender);
-        Other ->
-            ?PRINT({unexpected_result, Other}),
-            result_loop(Ref, Sender)
-    end.
-
 is_empty(State) ->
     Pid = State#state.pid,
     merge_index:is_empty(Pid).
 drop(State) ->
     Pid = State#state.pid,
     merge_index:drop(Pid).
+
+%%%===================================================================
+%%% Internal Functions
+%%%===================================================================
+
+stream_worker(Pid, Index, Field, Term, Filter, Sender) ->
+    Iter = merge_index:lookup(Pid, Index, Field, Term, Filter),
+    iterate(Iter(), Sender).
+
+range_worker(Pid, Index, Field, StartTerm, EndTerm, Size, Filter, Sender) ->
+    Iter = merge_index:range(Pid, Index, Field, StartTerm, EndTerm,
+                             Size, Filter),
+    iterate(Iter(), Sender).
+
+iterate(eof, Sender) ->
+    riak_search_backend:response_done(Sender);
+iterate({Results, Iter}, Sender) ->
+    riak_search_backend:response_results(Sender, Results),
+    iterate(Iter(), Sender).
        {luwak, "1.*", {git, "git://github.com/basho/luwak", 
                             {branch, "master"}}},
        {merge_index, "0.14.*", {git, "git://github.com/basho/merge_index",
-                                {branch, "master"}}}
+                                {branch, "az296-merge-in-RI-changes"}}}
        ]}.