Commits

Anonymous committed 10aa3e7

Fixing up reduce phase bugs

Comments (0)

Files changed (3)

apps/riak/ebin/riak.app

              riak_fs_backend,
              riak_gb_trees_backend,
              riak_get_fsm,
-             riak_js,
+             riak_js_manager,
+             riak_js_sup,
+             riak_js_vm,
              riak_keys_fsm,
              riak_local_logger,
              riak_map_executor,
              riak_map_localphase,
              riak_map_phase_fsm,
-             riak_mapper,
              riak_mapred_json,
              riak_mapreduce,
              riak_mapreduce_fsm,

apps/riak/src/riak_js_manager.erl

         Members ->
             {T1, T2, T3} = erlang:now(),
             random:seed(T1, T2, T3),
-            Pos = random:uniform(length(Members)),
+            Pos = pick_pos(erlang:get(?MODULE), length(Members)),
             lists:nth(Pos, Members)
     end.
+
+pick_pos(undefined, Size) ->
+    Pos = random:uniform(Size),
+    erlang:put(?MODULE, Pos),
+    Pos;
+pick_pos(OldPos, Size) ->
+    case random:uniform(Size) of
+        OldPos ->
+            pick_pos(OldPos, Size);
+        Pos ->
+            erlang:put(?MODULE, Pos),
+            Pos
+    end.

apps/riak/src/riak_reduce_phase_fsm.erl

                                  {erlang, {modfun,M,F}} ->
                                      M:F(Reduced,Arg);
                                  {javascript, {jsanon, _}=QTerm} ->
-                                     case riak_js_manager:blocking_dispatch({QTerm, Reduced, Arg}) of
-                                         {ok, ReturnValue} ->
-                                             ReturnValue;
-                                         Error ->
-                                             Error
-                                     end
+                                     js_reduce(QTerm, Reduced, Arg)
                              end,
                 {{next_state, wait, StateData#state{reduced=NewReduced}}, NewReduced}
             catch C:R ->
     end,
     {stop,normal,StateData}.
 
+js_reduce(QTerm, Reduced, Arg) ->
+    case riak_js_manager:blocking_dispatch({QTerm, Reduced, Arg}) of
+        {ok, ReturnValue} when is_list(ReturnValue) ->
+            ReturnValue;
+        {ok, ReturnValue} ->
+            [ReturnValue];
+        Error ->
+            Error
+    end.
+
+
 %% @private
 handle_event(_Event, _StateName, StateData) -> {stop,badmsg,StateData}.
 %% @private