riak / src / riak_mapreduce_fsm.erl

%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at

%%   http://www.apache.org/licenses/LICENSE-2.0

%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.    

%% @doc riak_mapreduce_fsm is the driver of a mapreduce query.
%%
%%      Map phases are expected to have inputs of the form
%%      [{Bucket,Key}] or [{{Bucket,Key},KeyData}] (the first form is
%%      equivalent to [{{Bucket,Key},undefined}]) and will execute
%%      with locality to each key and must return a list that is valid
%%      input to the next phase
%%
%%      Reduce phases take any list, but the function must be
%%      commutative and associative, and the next phase will block
%%      until the reduce phase is entirely done, and the reduce fun
%%      must return a list that is valid input to the next phase
%%
%%      Valid terms for Query:
%%<ul>
%%<li>  {link, Bucket, Tag, Acc}</li>
%%<li>  {map, FunTerm, Arg, Acc}</li>
%%<li>  {reduce, FunTerm, Arg, Acc}</li>
%%</ul>
%%      where FunTerm is one of:
%% <ul>
%%<li>  {modfun, Mod, Fun} : Mod and Fun both atoms ->
%%         Mod:Fun(Object,KeyData,Arg)</li>
%%<li>  {qfun, Fun} : Fun is an actual fun ->
%%         Fun(Object,KeyData,Arg)</li>
%%</ul>
%% @type mapred_queryterm() =
%%         {map, mapred_funterm(), Arg :: term(),
%%          Accumulate :: boolean()} |
%%         {reduce, mapred_funterm(), Arg :: term(),
%%          Accumulate :: boolean()} |
%%         {link, Bucket :: riak_object:bucket(), Tag :: term(),
%%          Accumulate :: boolean()}
%% @type mapred_funterm() =
%%         {modfun, Module :: atom(), Function :: atom()}|
%%         {qfun, function()}
%% @type mapred_result() = [term()]

-module(riak_mapreduce_fsm).
-behaviour(gen_fsm).

-export([start/4]).
-export([init/1, handle_event/3, handle_sync_event/4,
         handle_info/3, terminate/3, code_change/4]).

-export([wait/2]). 

-record(state, {client,reqid,fsms,starttime,timeout,ring,input_done}).

start(ReqId,Query,Timeout,Client) ->
    gen_fsm:start(?MODULE, [ReqId,Query,Timeout,Client], []).
%% @private
init([ReqId,Query,Timeout,Client]) ->
    {ok, Ring} = riak_ring_manager:get_my_ring(),
    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start, {ReqId, Query}),
    case check_query_syntax(Query) of
        ok ->
            FSMs = make_phase_fsms(Query, Ring), % Pid for each phase, in-order
            StateData = #state{client=Client,fsms=FSMs,reqid=ReqId,
                               starttime=riak_util:moment(),timeout=Timeout,
                               ring=Ring,input_done=false},
            {ok,wait,StateData,Timeout};
        {bad_qterm, QTerm} ->
            riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
                                {error, {bad_qterm, QTerm}}),
            Client ! {ReqId, {error, {bad_qterm, QTerm}}},
            {stop,normal}
    end.

check_query_syntax([]) -> ok;
check_query_syntax([QTerm={QTermType,QT2,_QT3,Acc}|Rest])
  when is_boolean(Acc) ->
    case lists:member(QTermType, [link,map,reduce]) of
        false -> {bad_qterm, QTerm};
        true ->
            case QTermType of
                link ->
                    case (is_binary(QT2) orelse QT2 == '_') of
                        false -> {bad_qterm, QTerm};
                        true -> check_query_syntax(Rest)
                    end;
                _ -> % (map or reduce)
                    case QT2 of
                        {modfun, MF_M, MF_F} ->
                            case is_atom(MF_M) andalso is_atom(MF_F) of
                                false -> {bad_qterm, QTerm};
                                true -> check_query_syntax(Rest)
                            end;
                        {qfun, QF_F} ->
                            case is_function(QF_F) of
                                false -> {bad_qterm, QTerm};
                                true -> check_query_syntax(Rest)
                            end;
                        _ -> {bad_qterm, QTerm}
                    end
            end
    end;
check_query_syntax([BadQTerm|_]) -> {bad_qterm,BadQTerm}.

make_phase_fsms(Query, Ring) -> 
    make_phase_fsms(lists:reverse(Query),final,[], Ring).
make_phase_fsms([], _NextFSM, FSMs, _Ring) -> FSMs;
make_phase_fsms([QTerm|Rest], NextFSM, FSMs, Ring) -> 
    PhaseMod = case QTerm of
        {reduce, _, _, _} -> riak_reduce_phase_fsm;
        {map, _, _, _} -> riak_map_phase_fsm;
        {link, _, _, _} -> riak_map_phase_fsm
    end,
    {ok, Pid} = PhaseMod:start_link(Ring, QTerm, NextFSM, self()),
    make_phase_fsms(Rest,Pid,[Pid|FSMs], Ring).

wait({input,Inputs},
     StateData=#state{reqid=ReqId,timeout=Timeout,fsms=FSMs}) ->
    riak_eventer:notify(riak_mapreduce_fsm, mr_got_input,
                        {ReqId, length(Inputs)}),
    gen_fsm:send_event(hd(FSMs), {input, Inputs}),
    {next_state, wait, StateData, Timeout};
wait(input_done, StateData=#state{reqid=ReqId,fsms=FSMs}) ->
    riak_eventer:notify(riak_mapreduce_fsm, mr_done_input, {ReqId}),
    gen_fsm:send_event(hd(FSMs), done),
    maybe_finish(StateData#state{input_done=true});
wait({done,FSM}, StateData=#state{fsms=FSMs0}) ->
    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done_msg, {FSM,FSMs0}),
    FSMs = lists:delete(FSM,FSMs0),
    maybe_finish(StateData#state{fsms=FSMs});
wait({error, ErrFSM, ErrMsg}, StateData=#state{client=Client,reqid=ReqId,
                                               fsms=FSMs0}) ->
    FSMs = lists:delete(ErrFSM,FSMs0),
    [gen_fsm:send_event(FSM, die) || FSM <- FSMs],
    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {error, ReqId}),
    Client ! {ReqId, {error, ErrMsg}},
    {stop,normal,StateData};
wait({acc,Data}, StateData=#state{reqid=ReqId,client=Client,timeout=Timeout}) ->
    LData = case Data of
        {single, X} -> [X];
        {list, X} -> X
    end,
    Client ! {ReqId, {mr_results, LData}},
    {next_state, wait, StateData,Timeout};
wait(timeout, StateData=#state{reqid=ReqId,client=Client}) ->
    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {timeout, ReqId}),
    Client ! {ReqId, {error, timeout}},
    {stop,normal,StateData}.

%% @private
maybe_finish(StateData=#state{input_done=Input_Done,fsms=FSMs,
                client=Client,reqid=ReqId,timeout=Timeout}) ->
    case Input_Done of
        false ->
            {next_state, wait, StateData, Timeout};
        true ->
            case FSMs of
                [] ->
                    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
                                        {ok, ReqId}),
                    Client ! {ReqId, done},
                    {stop,normal,StateData};
                _ ->
                    {next_state, wait, StateData, Timeout}
            end
    end.

%% @private
handle_event(_Event, _StateName, StateData) ->
    {stop,badmsg,StateData}.

%% @private
handle_sync_event(_Event, _From, _StateName, StateData) ->
    {stop,badmsg,StateData}.

%% @private
handle_info(_Info, _StateName, StateData) ->
    {stop,badmsg,StateData}.

%% @private
terminate(Reason, _StateName, _State=#state{reqid=ReqId}) ->
    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_end, {ReqId, Reason}),
    Reason.

%% @private
code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.