Commits

Anonymous committed 079d9f2

More M/R debugging;Wiring up M/R to HTTP

  • Participants
  • Parent commits 98d0b28

Comments (0)

Files changed (9)

File apps/riak/ebin/riak.app

              jiak_resource,
              jiak_util,
              json_pp,
+             mapred_resource,
              merkerl,
              priority_queue,
              raw_http_resource,
              riak_map_localphase,
              riak_map_phase_fsm,
              riak_mapper,
+             riak_mapred_json,
              riak_mapreduce,
              riak_mapreduce_fsm,
 			 riak_multi_backend,

File apps/riak/src/mapred_resource.erl

+-module(mapred_resource).
+
+-export([init/1, service_available/2, allowed_methods/2]).
+-export([malformed_request/2, process_post/2]).
+
+-include_lib("webmachine/include/webmachine.hrl").
+
+-define(QUERY_TOKEN, <<"query">>).
+-define(TARGETS_TOKEN, <<"targets">>).
+-define(DEFAULT_TIMEOUT, 30000).
+
+-record(state, {client, targets, mrquery}).
+
+init(_) ->
+    {ok, undefined}.
+
+service_available(RD, State) ->
+    case riak:local_client() of
+        {ok, Client} ->
+            {true, RD, #state{client=Client}};
+        Error ->
+            error_logger:error_report(Error),
+            {false, RD, State}
+    end.
+
+allowed_methods(RD, State) ->
+    {['POST'], RD, State}.
+
+malformed_request(RD, State) ->
+    {IsMalformed, NewState} = case wrq:req_body(RD) of
+                                  undefined ->
+                                      {true, State};
+                                  Body ->
+                                      {Verified, State1} = verify_body(Body, State),
+                                      io:format("Verified: ~p~nState:~p~n", [Verified, State1]),
+                                      {not(Verified), State1}
+                              end,
+    {IsMalformed, RD, NewState}.
+
+process_post(RD, #state{targets=Targets, mrquery=Query}=State) ->
+    io:format("Processing body~n"),
+    Me = self(),
+    {ok, Client} = riak:local_client(),
+    {ok, {ReqId, FSM}} = Client:mapred_stream(Query, Me, ?DEFAULT_TIMEOUT),
+    gen_fsm:send_event(FSM,{input, Targets }),
+    gen_fsm:send_event(FSM,input_done),
+    wrq:set_resp_header("Content-Type", "application/json", RD),
+    {true, wrq:set_resp_body({stream, stream_mapred_results(RD, ReqId)}, RD), State}.
+
+%% Internal functions
+stream_mapred_results(RD, ReqId) ->
+    receive
+        {ReqId, done} -> {<<"">>, done};
+        {ReqId, {mr_results, Res}} ->
+            Body = case is_proplist(Res) of
+                       true ->
+                           mochijson2:encode({struct, Res});
+                       false ->
+                           mochijson2:encode(Res)
+                   end,
+            {iolist_to_binary(Body), fun() -> stream_mapred_results(RD, ReqId) end};
+        WTF ->
+            io:format("WTF: ~p~n", [WTF])
+    after ?DEFAULT_TIMEOUT ->
+            {error, timeout}
+    end.
+
+is_proplist([H|_]) when is_tuple(H) andalso size(H) == 2 ->
+    true;
+is_proplist(_) ->
+    false.
+
+verify_body(Body, State) ->
+    case mochijson2:decode(Body) of
+        {struct, MapReduceDesc} ->
+            Targets = proplists:get_value(?TARGETS_TOKEN, MapReduceDesc),
+            Query = proplists:get_value(?QUERY_TOKEN, MapReduceDesc),
+            case not(Targets =:= undefined) andalso not(Query =:= undefined) of
+                true ->
+                    case riak_mapred_json:parse_targets(Targets) of
+                        {ok, ParsedTargets} ->
+                            case riak_mapred_json:parse_query(Query) of
+                                {ok, ParsedQuery} ->
+                                    {true, State#state{targets=ParsedTargets, mrquery=ParsedQuery}};
+                                error ->
+                                    {false, State}
+                            end;
+                        error ->
+                            {false, State}
+                    end;
+                false ->
+                    {false, State}
+            end
+    end.

File apps/riak/src/riak_client.erl

 %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 %% KIND, either express or implied.  See the License for the
 %% specific language governing permissions and limitations
-%% under the License.    
+%% under the License.
 
 %% @doc The client object used for all access into the riak system.
 %% @type riak_client() = term()
   when is_list(Query), is_pid(ClientPid),
        (is_integer(Timeout) orelse Timeout =:= infinity) ->
     ReqId = mk_reqid(),
+    io:format("before~n"),
     {ok, MR_FSM} = rpc:call(Node, riak_mapreduce_fsm, start,
                             [ReqId,Query,Timeout,ClientPid]),
+    io:format("after~n"),
     {ok, {ReqId, MR_FSM}}.
 
 mapred_bucket_stream(Bucket, Query, ClientPid) ->
 %%      Key lists are updated asynchronously, so this may be slightly
 %%      out of date if called immediately after a put or delete.
 %% @equiv list_keys(Bucket, default_timeout()*8)
-list_keys(Bucket) -> 
+list_keys(Bucket) ->
     list_keys(Bucket, ?DEFAULT_TIMEOUT*8).
 
 %% @spec list_keys(riak_object:bucket(), TimeoutMillisecs :: integer()) ->
 %% @doc List the keys known to be present in Bucket.
 %%      Key lists are updated asynchronously, so this may be slightly
 %%      out of date if called immediately after a put or delete.
-list_keys(Bucket, Timeout) -> 
+list_keys(Bucket, Timeout) ->
     list_keys(Bucket, Timeout, ?DEFAULT_ERRTOL).
-list_keys(Bucket, Timeout, ErrorTolerance) -> 
+list_keys(Bucket, Timeout, ErrorTolerance) ->
     Me = self(),
     ReqId = mk_reqid(),
     spawn(Node, riak_keys_fsm, start,
           [ReqId,Bucket,Timeout,plain,ErrorTolerance,Me]),
     wait_for_listkeys(ReqId, Timeout).
 
-stream_list_keys(Bucket) -> 
+stream_list_keys(Bucket) ->
     stream_list_keys(Bucket, ?DEFAULT_TIMEOUT).
 
-stream_list_keys(Bucket, Timeout) -> 
+stream_list_keys(Bucket, Timeout) ->
     stream_list_keys(Bucket, Timeout, ?DEFAULT_ERRTOL).
 
-stream_list_keys(Bucket, Timeout, ErrorTolerance) -> 
+stream_list_keys(Bucket, Timeout, ErrorTolerance) ->
     Me = self(),
     stream_list_keys(Bucket, Timeout, ErrorTolerance, Me).
 
-stream_list_keys(Bucket, Timeout, ErrorTolerance, Client) -> 
+stream_list_keys(Bucket, Timeout, ErrorTolerance, Client) ->
     stream_list_keys(Bucket, Timeout, ErrorTolerance, Client, plain).
 
 %% @spec stream_list_keys(riak_object:bucket(),
 %%      keys in Bucket on any single vnode.
 %%      If ClientType is set to 'mapred' instead of 'plain', then the
 %%      messages will be sent in the form of a MR input stream.
-stream_list_keys(Bucket, Timeout, ErrorTolerance, Client, ClientType) -> 
+stream_list_keys(Bucket, Timeout, ErrorTolerance, Client, ClientType) ->
     ReqId = mk_reqid(),
     spawn(Node, riak_keys_fsm, start,
           [ReqId,Bucket,Timeout,ClientType,ErrorTolerance,Client]),
 %%       {ok, [Key :: riak_object:key()]} |
 %%       {error, timeout} |
 %%       {error, Err :: term()}
-%% @doc List the keys known to be present in Bucket, 
+%% @doc List the keys known to be present in Bucket,
 %%      filtered at the vnode according to Fun, via lists:filter.
 %%      Key lists are updated asynchronously, so this may be slightly
 %%      out of date if called immediately after a put or delete.
 %% @equiv filter_keys(Bucket, Fun, default_timeout()*8)
-filter_keys(Bucket, Fun) -> 
+filter_keys(Bucket, Fun) ->
     list_keys({filter, Bucket, Fun}, ?DEFAULT_TIMEOUT*8).
 
 %% @spec filter_keys(riak_object:bucket(), Fun :: function(), TimeoutMillisecs :: integer()) ->
 %%       {ok, [Key :: riak_object:key()]} |
 %%       {error, timeout} |
 %%       {error, Err :: term()}
-%% @doc List the keys known to be present in Bucket, 
+%% @doc List the keys known to be present in Bucket,
 %%      filtered at the vnode according to Fun, via lists:filter.
 %%      Key lists are updated asynchronously, so this may be slightly
 %%      out of date if called immediately after a put or delete.
-filter_keys(Bucket, Fun, Timeout) -> 
+filter_keys(Bucket, Fun, Timeout) ->
     list_keys({filter, Bucket, Fun}, Timeout).
 
 %% @spec list_buckets() ->
 %%      either adds the first key or removes the last remaining key from
 %%      a bucket.
 %% @equiv list_buckets(default_timeout()*8)
-list_buckets() -> 
+list_buckets() ->
     list_buckets(?DEFAULT_TIMEOUT*8).
 
 %% @spec list_buckets(TimeoutMillisecs :: integer()) ->
 %%      out of date if called immediately after any operation that
 %%      either adds the first key or removes the last remaining key from
 %%      a bucket.
-list_buckets(Timeout) -> 
+list_buckets(Timeout) ->
     list_keys('_', Timeout).
 
 %% @spec set_bucket(riak_object:bucket(), [BucketProp :: {atom(),term()}]) -> ok
              [client_event, EventName, {ClientId, EventDetail}]).
 
 %% @equiv add_event_handler(Pid, Desc, {'_', '_', '_', '_'}, [])
-add_event_handler(Pid, Desc) -> 
+add_event_handler(Pid, Desc) ->
     add_event_handler(Pid, Desc, {'_', '_', '_', '_'}).
 
 %% @equiv add_event_handler(Pid, Desc, MatchHead, [])
-add_event_handler(Pid, Desc, MatchHead) -> 
+add_event_handler(Pid, Desc, MatchHead) ->
     add_event_handler(Pid, Desc, MatchHead, []).
-    
+
 %% @doc
-%% Register a process that will receive Riak events 
+%% Register a process that will receive Riak events
 %% generated by the cluster in the form of Erlang messages.
 %% See {@link riak_eventer:add_handler/4.} for more information.
 add_event_handler(Pid, Desc, MatchHead, MatchGuard) ->
-    rpc:call(Node, riak_eventer, add_handler, [Pid, Desc, MatchHead, MatchGuard]). 
+    rpc:call(Node, riak_eventer, add_handler, [Pid, Desc, MatchHead, MatchGuard]).
 
 %% @doc
 %% Remove an event handler added by {@link add_event_handler/4}, if it exists.
 %% See {@link riak_eventer:remove_handler/3.} for more information.
 remove_event_handler(Pid, MatchHead, MatchGuard) ->
-    rpc:call(Node, riak_eventer, remove_handler, [Pid, MatchHead, MatchGuard]). 
+    rpc:call(Node, riak_eventer, remove_handler, [Pid, MatchHead, MatchGuard]).
 
 get_stats(local) ->
     [{Node, rpc:call(Node, gen_server, call, [riak_stat, get_stats])}];
     after Timeout ->
             {error, timeout, Acc}
     end.
-

File apps/riak/src/riak_js.erl

 -module(riak_js).
 
 -export([new_context/0, init_context/1, invoke_map/6, invoke_reduce/6]).
+-export([fetch_fun/2]).
 
 new_context() ->
     js_driver:new({?MODULE, init_context}).
             not(OldCSum =:= CSum)
     end.
 
+fetch_fun(Bucket, Key) ->
+    {ok, Client} = riak:local_client(),
+    Source = case Client:get(Bucket, Key) of
+                 {ok, Obj} ->
+                     riak_object:get_value(Obj);
+                 Error ->
+                     error_logger:error_report(Error),
+                     <<>>
+                         end,
+    Client:stop(),
+    Source.
+
 %% Internal functions
 priv_dir() ->
     %% Hacky workaround to handle running from a standard app directory

File apps/riak/src/riak_mapper.erl

 
 build_key({modfun, CMod, CFun}, Arg, KeyData) ->
     {CMod, CFun, Arg, KeyData};
+build_key({jsfun, FunName}, Arg, KeyData) ->
+    {FunName, Arg, KeyData};
 build_key(_, _, _) ->
     no_key.
 
 cache_fetch({qfun, _}, _BKey, _CacheKey, _MapState) ->
     not_cached;
-cache_fetch({jsfun, _}, _BKey, _CacheKey, _MapState) ->
-    not_cached;
 cache_fetch({jsanon, _}, _BKey, _CacheKey, _MapState) ->
     not_cached;
 cache_fetch({modfun, _CMod, _CFun}, BKey, CacheKey, Cache) ->
                 error -> not_cached;
                 {ok,CVal} -> CVal
             end
-    end.
+    end;
+%% TODO: Cache jsfun results, too
+cache_fetch({jsfun, _FunName}, _BKey, _CacheKey, _Cache) ->
+    not_cached.
 
 uncached_map(BKey, Mod, ModState, MapState, FunTerm, Arg, KeyData, VNode) ->
     riak_eventer:notify(riak_vnode, uncached_map, {FunTerm, Arg, BKey}),
                                         {Retval, _} = riak_js:invoke_map(JsCtx, CSums, [extract_values(V), KeyData, Arg],
                                                                          <<"Riak">>, F, undefined),
                                         {Retval, MapState};
+                                    {jsanon, {Bucket, Key}} ->
+                                        exec_map(V, MapState, {jsanon, riak_js:fetch_fun(Bucket, Key)}, Arg, BKey, KeyData, VNode);
                                     {jsanon, F} ->
                                         {Retval, NewCSums} = riak_js:invoke_map(JsCtx, CSums, [extract_values(V), KeyData, Arg],
                                                                                 undefined, <<"riakMapper">>, F),

File apps/riak/src/riak_mapred_json.erl

+-module(riak_mapred_json).
+
+-export([parse_targets/1, parse_query/1]).
+
+parse_targets({struct, Targets}) ->
+    parse_targets(Targets, []).
+
+parse_targets([], Accum) ->
+    if
+        length(Accum) > 0 ->
+            {ok, lists:reverse(Accum)};
+        true ->
+            error
+    end;
+parse_targets([{Bucket, Key}|T], Accum) when is_binary(Bucket),
+                                                         is_binary(Key) ->
+    parse_targets(T, [{Bucket, Key}|Accum]);
+parse_targets([Bucket|T], Accum) when is_binary(Bucket) ->
+    parse_targets(T, [Bucket|Accum]).
+
+parse_query(Query) ->
+    parse_query(Query, []).
+
+parse_query([], Accum) ->
+    if
+        length(Accum) > 0 ->
+            {ok, lists:reverse(Accum)};
+        true ->
+            error
+    end;
+parse_query([Type, {struct, StepDef}|T], Accum) when Type =:= <<"map">>;
+                                                       Type =:= <<"reduce">> ->
+    StepType = case Type of
+                   <<"map">> -> map;
+                   <<"reduce">> -> reduce
+               end,
+    Lang = proplists:get_value(<<"language">>, StepDef),
+    Keep = proplists:get_value(<<"keep">>, StepDef),
+    case not(Keep =:= true orelse Keep =:= false) of
+        true ->
+            error;
+        false ->
+            case parse_step(Lang, StepDef) of
+                error ->
+                    error;
+                {ok, ParsedStep} ->
+                    Arg = proplists:get_value(<<"arg">>, StepDef, none),
+                    parse_query(T, [{StepType, ParsedStep, Arg, Keep}|Accum])
+            end
+    end;
+parse_query(_, _Accum) ->
+    error.
+
+parse_step(<<"javascript">>, StepDef) ->
+    Source = proplists:get_value(<<"source">>, StepDef),
+    Name = proplists:get_value(<<"name">>, StepDef),
+    Bucket = proplists:get_value(<<"bucket">>, StepDef),
+    Key = proplists:get_value(<<"key">>, StepDef),
+    case Source of
+        undefined ->
+            case Name of
+                undefined ->
+                    case Bucket of
+                        undefined ->
+                            error;
+                        _ ->
+                            case Key of
+                                undefined ->
+                                    error;
+                                _ ->
+                                    {ok, {jsanon, {Bucket, Key}}}
+                            end
+                    end;
+                _ ->
+                    {ok, {jsfun, Name}}
+            end;
+        _ ->
+            {ok, {jsanon, Source}}
+    end;
+parse_step(<<"erlang">>, StepDef) ->
+    Module = proplists:get_value(<<"module">>, StepDef),
+    Function = proplists:get_value(<<"function">>, StepDef),
+    bin_to_atom(Module, fun(A1) ->
+                                   bin_to_atom(Function, fun(A2) -> {modfun, A1, A2} end) end).
+
+bin_to_atom(Binary, Cont) ->
+    L = binary_to_list(Binary),
+    Result = try
+                 list_to_existing_atom(L)
+             catch
+                 error:badarg ->
+                     try
+                         list_to_atom(L)
+                     catch
+                         error:badarg ->
+                             error
+                     end
+             end,
+    case Result of
+        error ->
+            error;
+        _ ->
+            Cont(Result)
+    end.

File apps/riak/src/riak_mapreduce_fsm.erl

                                ring=Ring,input_done=false},
             {ok,wait,StateData,Timeout};
         {bad_qterm, QTerm} ->
+            io:format("Bad term: ~p~n", [QTerm]),
             riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
                                 {error, {bad_qterm, QTerm}}),
             Client ! {ReqId, {error, {bad_qterm, QTerm}}},
                                 false -> {bad_qterm, QTerm};
                                 true -> check_query_syntax(Rest)
                             end;
+                        {jsanon, {Bucket, Key}} when is_binary(Bucket),
+                                                     is_binary(Key) ->
+                            check_query_syntax(Rest);
                         {jsanon, JF_F} when is_binary(JF_F) ->
                             check_query_syntax(Rest);
                         {jsfun, JF_F} when is_binary(JF_F) ->

File apps/riak/src/riak_reduce_phase_fsm.erl

                 {NewReduced, NewCSums} = case FunTerm of
                                              {qfun,F} -> {F(Reduced,Arg), CSums};
                                              {modfun,M,F} -> {M:F(Reduced,Arg), CSums};
+                                             {jsanon, {Bucket, Key}} ->
+                                                 Source = riak_js:fetch_fun(Bucket, Key),
+                                                 riak_js:invoke_reduce(JsCtx, CSums, [Reduced, Arg], undefined,
+                                                                       <<"riakReducer">>, Source);
                                              {jsanon, F} ->
-                                                 {Retval, CSums1} = riak_js:invoke_reduce(JsCtx, CSums,
-                                                                                          [Reduced, Arg], undefined, <<"riakReducer">>, F),
-                                                 {Retval, CSums1};
+                                                 riak_js:invoke_reduce(JsCtx, CSums,
+                                                                       [Reduced, Arg], undefined, <<"riakReducer">>, F);
                                              {jsfun, F} ->
                                                  {Retval, _} = riak_js:invoke_reduce(JsCtx, CSums, [Reduced, Arg],
                                                                                      <<"Riak">>, F, undefined),
                                                  {Retval, CSums}
                                              end,
-                {{next_state, wait, StateData#state{reduced=NewReduced, csums=NewCSums}},
-                 NewReduced}
+                {{next_state, wait, StateData#state{reduced=NewReduced, csums=NewCSums}}, NewReduced}
             catch C:R ->
                     Reason = {C, R, erlang:get_stacktrace()},
                     case NextFSM of

File apps/riak/src/riak_web.erl

 %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 %% KIND, either express or implied.  See the License for the
 %% specific language governing permissions and limitations
-%% under the License.    
+%% under the License.
 
 %% @doc Convenience functions for setting up the Jiak HTTP interface
 %%      of Riak.  This module loads parameters from the application
 dispatch_table() ->
     JiakProps = jiak_props(),
     RawProps = raw_props(),
+    MapredProps = mapred_props(),
+
     [{[proplists:get_value(jiak_name, JiakProps),bucket],
       jiak_resource,
       [{key_type, container}|JiakProps]},
      {[proplists:get_value(prefix, RawProps),bucket,key],
       raw_http_resource, RawProps},
      {[proplists:get_value(prefix, RawProps),bucket,key,'*'],
-      raw_link_walker_resource, RawProps}].
+      raw_link_walker_resource, RawProps},
+
+     {[proplists:get_value(prefix, MapredProps)],
+      mapred_resource, MapredProps}].
 
 jiak_props() ->
     [{jiak_name, riak:get_app_env(jiak_name, "jiak")},
 raw_props() ->
     [{prefix, riak:get_app_env(raw_name, "raw")},
      {riak, local}].
+
+mapred_props() ->
+    [{prefix, riak:get_app_env(mapred_name, "mapred")}].