Anonymous avatar Anonymous committed 53e2b79 Merge

Merging & adding fixes to propagate JS errors thru the M/R FSM chain;Added accessor for bucket props to jQuery client

Comments (0)

Files changed (10)


 void on_error(JSContext *context, const char *message, JSErrorReport *report) {
   if (report->flags & JSREPORT_EXCEPTION) {
     spidermonkey_error *sm_error = (spidermonkey_error *) driver_alloc(sizeof(spidermonkey_error));
-    sm_error->msg = copy_string(message);
+    if (message != NULL) {
+      sm_error->msg = copy_string(message);
+    }
+    else {
+      sm_error->msg = copy_string("undefined error");
+    }
     sm_error->lineno = report->lineno;
-    sm_error->offending_source = copy_string(report->linebuf);
+    if (report->linebuf != NULL) {
+      sm_error->offending_source = copy_string(report->linebuf);
+    }
+    else {
+      sm_error->offending_source = copy_string("unknown");
+    }
     JS_SetContextPrivate(context, sm_error);
 	JSString *str = JS_ValueToString(vm->context, result);
 	retval = copy_jsstring(str);
+      else if(strcmp(JS_GetStringBytes(JS_ValueToString(vm->context, result)), "undefined") == 0) {
+	retval = copy_string("{\"error\": \"Expression returned undefined\", \"lineno\": 0, \"source\": \"unknown\"}");
+      }
       else {
-	retval = copy_string("{\"error\": \"non-JSON return value\"}");
+	retval = copy_string("{\"error\": \"non-JSON return value\", \"lineno\": 0, \"source\": \"unknown\"}");
     JS_DestroyScript(vm->context, script);


 call(Ctx, FunctionName, Args, Bindings) ->
     JsBindings = list_to_binary(build_bindings(Bindings, [])),
     ArgList = build_arg_list(Args, []),
-    Js = iolist_to_binary([<<"function() {">>, JsBindings, <<"return ">>, FunctionName, <<"(">>, ArgList, <<");">>, <<"}();">>]),
+    Js = iolist_to_binary([<<"function() {">>, JsBindings, <<" if (">>, FunctionName, <<" === undefined) { throw(\"">>,
+                           FunctionName, <<" not defined\"); } ">>,
+                           <<"return ">>, FunctionName, <<"(">>, ArgList, <<");">>, <<"}();">>]),
     js_driver:eval_js(Ctx, Js).
 %% Internal functions


             {ok, js_mochijson2:decode(Result)};
         {error, ErrorJson} when is_binary(ErrorJson) ->
             case js_mochijson2:decode(ErrorJson) of
-                {struct, [{<<"error">>, {struct, Error}}]} ->
-                    {error, Error};
-                {struct, [{<<"error">>, Error}]} ->
+                {struct, Error = [{<<"error">>, _}|_]} ->
                     {error, Error}
         {error, Error} ->


     wrq:set_resp_body(format_error(Error), RD).
 format_error({error, Message}=Error) when is_atom(Message);
-                                    is_binary(Message),
-                                    is_list(Message) ->
+                                          is_binary(Message) ->
     mochijson2:encode({struct, [Error]});
+format_error({error, Error}) when is_list(Error) ->
+    mochijson2:encode({struct, Error});
 format_error(_Error) ->
+    io:format("_Error: ~p~n", [_Error]),
     mochijson2:encode({struct, [{error, map_reduce_error}]}).
 stream_mapred_results(RD, ReqId, State) ->


     case Result of
         {ok, ReturnValue} ->
-            gen_fsm:send_event(FsmPid, {mapexec_reply, ReturnValue, Requestor});
-        ErrorResult ->
-            gen_fsm:send_event(FsmPid, {mapexec_error, Requestor, ErrorResult})
-    end,
-    {noreply, NewState};
+            gen_fsm:send_event(FsmPid, {mapexec_reply, ReturnValue, Requestor}),
+            {noreply, NewState};
+        {error, ErrorResult} ->
+            gen_fsm:send_event(FsmPid, {mapexec_error_noretry, Requestor, ErrorResult}),
+            {noreply, State}
+    end;
 handle_cast({dispatch, Requestor, _JobId, {FsmPid, {map, {jsfun, JS}, Arg, _Acc},
                                             KeyData}}, #state{ctx=Ctx}=State) ->
     case invoke_js(Ctx, JS, [JsonValue, KeyData, JsonArg]) of
         {ok, R} ->
             gen_fsm:send_event(FsmPid, {mapexec_reply, R, Requestor});
-        Error ->
-            gen_fsm:send_event(FsmPid, {mapexec_error, Requestor, Error})
+        {error, Error} ->
+            gen_fsm:send_event(FsmPid, {mapexec_error_noretry, Requestor, Error})
     {noreply, State};
 handle_cast(_Msg, State) ->


     riak_eventer:notify(riak_map_executor, mapexec_vnode_err, {VN,ErrMsg}),
     riak_phase_proto:mapexec_error(PhasePid, "all nodes failed"),
+wait({mapexec_error_noretry, VN, ErrMsg}, #state{phase_pid=PhasePid}=StateData) ->
+    riak_eventer:notify(riak_map_executor, mapexec_vnode_err_noretry, {VN, ErrMsg}),
+    riak_phase_proto:mapexec_error(PhasePid, ErrMsg),
+    {stop, normal, StateData};
 wait({mapexec_error, VN, ErrMsg},StateData=
      #state{vnodes=VNodes,qterm=QTerm,bkey=BKey,keydata=KeyData}) ->
     riak_eventer:notify(riak_map_executor, mapexec_vnode_err, {VN,ErrMsg}),


                                  {erlang, {modfun,M,F}} ->
                                  {javascript, QTerm} ->
-                                     js_reduce(QTerm, Reduced, Arg)
+                                     case js_reduce(QTerm, Reduced, Arg) of
+                                         {ok, Result} ->
+                                             Result;
+                                         {error, Error} ->
+                                             error_logger:error_msg("Javascript reduce error: ~p~n", [Error]),
+                                             Reduced
+                                     end
                 {{next_state, wait, StateData#state{reduced=NewReduced}}, NewReduced}
             catch C:R ->


 %% @private
 do_map(ClientPid, QTerm, BKey, KeyData, #state{mod=Mod, modstate=ModState, mapcache=Cache}=State, VNode) ->
     {Reply, NewState} = case do_map(QTerm, BKey, Mod, ModState, KeyData, Cache, VNode, ClientPid) of
-                            executing ->
+                            map_executing ->
                                 {{mapexec_reply, executing, self()}, State};
                             {ok, Retval} ->
                                 {{mapexec_reply, Retval, self()}, State};
                 {error, notfound}
     riak_js_manager:dispatch({ClientPid, QTerm, V, KeyData}),
-    executing.
+    map_executing.
 build_key({modfun, CMod, CFun}, Arg, KeyData) ->
     {CMod, CFun, Arg, KeyData};


 (function($) {
-  var riakSuccessHandler = function(data, status) { };
+  $.riakGetBucketProps = function(bucket, options) {
+    var settings = $.extend({contentType: "application/json",
+	  success: riakSuccessHandler}),
+    settings["url"] = "/raw/" + bucket;
+    settings["type"] = "GET";
+    $.ajax(settings);
+  }
   $.riakGetResource = function(bucket, key, options) {
     var settings = $.extend({ifModified: true,
 	  cache: true,
+  var riakSuccessHandler = function(data, status) { };
   function buildQuery(phases) {
     return { return buildPhase(phase); }, phases);
     else if (phase["reduce"] !== undefined) {
       return {reduce: {language: "javascript", source: phase["reduce"].toString(), keep: keepResults}};
+    else if (phase["link"] !== undefined) {
+      return {link: {language: "javascript", source: phase["reduce"].toString(), keep: keepResults}};
+    }
     else {
       throw("Illegal phase definition");


    holding the input data for those functions, and it runs reduce-step
    functions on the node coordinating the map/reduce query.
+*** How Riak's Map/Reduce Queries Are Specified
+    Map/Reduce queries in Riak have two components: a list of inputs
+    and a list of "steps", or "phases".
+    Each element of the input list is a bucket-key pair.  This
+    bucket-key pair may also be annotated with "key-data", which will
+    be passed as an argument to a map function, when evaluated on the
+    object stored under that bucket-key pair.
+    Each element of the phases list is a description of a map
+    function, a reduce function, or a link function.  The description
+    includes where to find the code for the phase function (for map
+    and reduce phases), static data passed to the function every time
+    it is executed during that phase, and a flag indicating whether or
+    not to include the results of that phase in the final output of
+    the query.
+    The phase list describes the chain of operations each input will
+    flow through.  That is, the initial inputs will be fed to the
+    first phase in the list, and the output of that phase will be fed
+    as input to the next phase in the list.  This stream will continue
+    through the final phase.
 *** How a Map Phase Works in Riak
+    The input list to a map phase must be a list of (possibly
+    annotated) bucket-key pairs.  For each pair, Riak will send the
+    request to evaluate the map function to the partition that is
+    responsible for storing the data for that bucket-key.  The vnode
+    hosting that partition will lookup the object stored under that
+    bucket-key, and evaluation the map function with the object as an
+    argument.  The other arguments to the function will be the
+    annotation, if any is included, with the bucket-key, and the
+    static data for the phase, as specified in the query.
-  I'm thinking of moving some content from basic-mapreduce.txt into
-  this document, and then creating a small "Erlang companion".  This
-  file (js-mapreduce) would become the Riak Map/Reduce Guide, the
-  primary reference, while the Erlang companion would be basically
-  just "how to do the same stuff in Erlang."
+*** How a Reduce Phase Works in Riak
+    Reduce phases accept any list of data as input, and produce any
+    list of data as output.  They also receive a phase-static value,
+    specified in the query definition.
+    The important thing to understand is that the function defining
+    the reduce phase may be evaluated multiple times, and the input of
+    later evaluations will include the input of earlier evaluations.
+    For example, a reduce phase may implement the "set-union"
+    function.  In that case, the first set of inputs might be
+    =[1,2,2,3]=, and the output would be =[1,2,3]=.  When the phase
+    receives more inputs, say =[3,4,5]=, the function will be called
+    with the concatentation of the two lists: =[1,2,3,3,4,5]=.
+    Other systems refer to the second application of the reduce
+    function as a "re-reduce".  There are at least a couple of
+    reduce-query implementation strategies that work with Riak's model.
+    One strategy is to implement the phase preceeding the reduce
+    phase, such that its output is "the same shape" as the output of
+    the reduce phase.  This is how the examples in this document are
+    written, and the way that we have found produces cleaner code.
+    An alternate strategy is to make the output of a reduce phase
+    recognizable, such that it can be extracted from the input list on
+    subsequent applications.  For example, if inputs from the
+    preceeding phase are numbers, outputs from the reduce phase could
+    be objects or strings.  This would allow the function to find the
+    previous result, and apply new inputs to it.
+*** How a Link Phase Works in Riak
+    Link phases find links matching patterns specified in the query
+    definition.  The patterns specify which buckets and tags links
+    must have.
+    "Following a link" means adding it to the output list of this
+    phase.  The output of this phase is often most useful as input to
+    a map phase, or another reduce phase.
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.