Commits

Anonymous committed 6c146d5 Merge

Merging

  • Participants
  • Parent commits 69ec93d, e97d77b

Comments (0)

Files changed (4)

apps/erlang_js/src/js_json.erl

 is_structable([]) ->
     false;
 is_structable([H|_]) ->
-    is_tuple(H) andalso size(H) == 2.
+    is_tuple(H) andalso size(H) == 2 andalso element(1,H) /= struct.
 
 %% testing constructs borrowed from the Yaws JSON implementation.
 

apps/riak/src/mapred_resource.erl

     end.
 
 allowed_methods(RD, State) ->
-    {['POST'], RD, State}.
+    {['GET','HEAD','POST'], RD, State}.
 
 malformed_request(RD, State) ->
-    {IsMalformed, NewState} = case wrq:req_body(RD) of
-                                  undefined ->
-                                      {true, State};
-                                  Body ->
-                                      {Verified, State1} = verify_body(Body, State),
-                                      {not(Verified), State1}
-                              end,
-    {IsMalformed, RD, NewState}.
+    {Verified, Message, NewState} =
+        case {wrq:method(RD), wrq:req_body(RD)} of
+            {'POST', Body} when Body /= undefined ->
+                verify_body(Body, State);
+            _ ->
+                {false, usage(), State}
+        end,
+    {not Verified,
+     if Verified -> RD;
+        true ->
+             wrq:set_resp_header(
+               "Content-Type", "text/plain",
+               wrq:set_resp_body(Message, RD))
+     end,
+     NewState}.
 
 content_types_provided(RD, State) ->
     {[{"application/json", nop}], RD, State}.
 
-%% This should never get called
-nop(_RD, _State) ->
-    ok.
+nop(RD, State) ->
+    {usage(), RD, State}.
 
 process_post(RD, #state{inputs=Inputs, mrquery=Query}=State) ->
     Me = self(),
     {ok, Client} = riak:local_client(),
     case wrq:get_qs_value("chunked", RD) of
         "true" ->
-            {ok, {ReqId, FSM}} = Client:mapred_stream(Query, Me, ?DEFAULT_TIMEOUT),
-            gen_fsm:send_event(FSM,{input, Inputs }),
-            gen_fsm:send_event(FSM,input_done),
+            {ok, ReqId} = 
+                if is_list(Inputs) ->
+                        {ok, {RId, FSM}} = Client:mapred_stream(Query, Me,
+                                                                ?DEFAULT_TIMEOUT),
+                        gen_fsm:send_event(FSM,{input, Inputs }),
+                        gen_fsm:send_event(FSM,input_done),
+                        {ok, RId};
+                   is_binary(Inputs) ->
+                        Client:mapred_bucket_stream(Inputs, Query, Me,
+                                                    ?DEFAULT_TIMEOUT)
+                end,
             RD1 = wrq:set_resp_header("Content-Type", "application/json", RD),
             {true, wrq:set_resp_body({stream, stream_mapred_results(RD1, ReqId)}, RD1), State};
         Param when Param =:= "false";
                    Param =:= undefined ->
-            case Client:mapred(Inputs, Query) of
+            Results = if is_list(Inputs) ->
+                              Client:mapred(Inputs, Query);
+                         is_binary(Inputs) ->
+                              Client:mapred_bucket(Inputs, Query)
+                      end,
+            case Results of
                 {ok, Result} ->
                     RD1 = wrq:set_resp_header("Content-Type", "application/json", RD),
                     {true, wrq:set_resp_body(mochijson2:encode(Result), RD1), State};
     end.
 
 verify_body(Body, State) ->
-    case mochijson2:decode(Body) of
+    case catch mochijson2:decode(Body) of
         {struct, MapReduceDesc} ->
             Inputs = proplists:get_value(?INPUTS_TOKEN, MapReduceDesc),
             Query = proplists:get_value(?QUERY_TOKEN, MapReduceDesc),
                         {ok, ParsedInputs} ->
                             case riak_mapred_json:parse_query(Query) of
                                 {ok, ParsedQuery} ->
-                                    {true, State#state{inputs=ParsedInputs, mrquery=ParsedQuery}};
+                                    {true, [], State#state{inputs=ParsedInputs,
+                                                           mrquery=ParsedQuery}};
                                 error ->
-                                    {false, State}
+                                    {false,
+                                     "An error occurred parsing "
+                                     "the \"query\" field.\n",
+                                     State}
                             end;
                         error ->
-                            {false, State}
+                            {false,
+                             "An error occurred parsing the \"inputs\" field.\n",
+                             State}
                     end;
                 false ->
-                    {false, State}
-            end
+                    {false,
+                     "The post body was missing the "
+                     "\"inputs\" or \"query\" field.\n",
+                     State}
+            end;
+        {'EXIT', Message} ->
+            {false,
+             io_lib:format("The POST body was not valid JSON.~n"
+                           "The error from the parser was:~n~p~n",
+                           [Message]),
+             State};
+        _ ->
+            {false, "The POST body was not a JSON object.\n", State}
     end.
+
+usage() ->
+    "This resource accepts POSTs with bodies containing JSON of the form:\n"
+        "{\n"
+        " \"inputs\":[...list of inputs...],\n"
+        " \"query\":[...list of map/reduce phases...]\n"
+        "}\n".

apps/riak/src/riak_mapred_json.erl

 
 -export([parse_inputs/1, parse_query/1]).
 
+parse_inputs(Bucket) when is_binary(Bucket) ->
+    {ok, Bucket};
 parse_inputs(Targets) ->
     parse_inputs(Targets, []).
 
                    <<"reduce">> -> reduce;
                    <<"link">> -> link
                end,
-    Keep = proplists:get_value(<<"keep">>, StepDef),
+    Keep = proplists:get_value(<<"keep">>, StepDef, T==[]),
     Step = case not(Keep =:= true orelse Keep =:= false) of
                true -> error;
                false ->

doc/js-mapreduce.org

+#+TITLE: Using Javascript with Riak Map/Reduce
+
+Riak supports writing map/reduce query functions in Javascript, and
+specifying query execution over HTTP.  This document will teach you
+how to use these features.
+
+* Simple Example
+
+  Let's hit the ground running with a quick example to demonstrate
+  what HTTP/Javascript map/reduce looks like in Riak.  This example
+  will store several chunks of text in Riak, and then compute a word
+  counts on the set of documents.
+
+** Load data
+
+   We'll use the "raw" HTTP interface to store the texts we want to
+   process:
+
+#+BEGIN_EXAMPLE
+$ curl -X PUT -H "content-type: text/plain" \
+ http://localhost:8098/raw/alice/p1 --data-binary @-
+Alice was beginning to get very tired of sitting by her sister on the
+bank, and of having nothing to do: once or twice she had peeped into the
+book her sister was reading, but it had no pictures or conversations in
+it, 'and what is the use of a book,' thought Alice 'without pictures or
+conversation?'
+^D
+$ curl -X PUT -H "content-type: text/plain" \
+ http://localhost:8098/raw/alice/p2 --data-binary @-
+So she was considering in her own mind (as well as she could, for the
+hot day made her feel very sleepy and stupid), whether the pleasure
+of making a daisy-chain would be worth the trouble of getting up and
+picking the daisies, when suddenly a White Rabbit with pink eyes ran
+close by her.
+^D
+$ curl -X PUT -H "content-type: text/plain" \
+ http://localhost:8098/raw/alice/p5 --data-binary @-
+The rabbit-hole went straight on like a tunnel for some way, and then
+dipped suddenly down, so suddenly that Alice had not a moment to think
+about stopping herself before she found herself falling down a very deep
+well.
+#+END_EXAMPLE
+
+** Run query
+
+   With data loaded, we can now run a query:
+
+#+BEGIN_EXAMPLE
+$ curl -X POST -H "content-type: application/json" http://localhost:8098/mapred --data @-
+{"inputs":[["alice","p1"],["alice","p2"],["alice","p5"]],"query":[{"map":{"language":"javascript","source":"function(v) { var m = v.values[0].data.toLowerCase().match('\\\\w*','g'); var r = []; for(var i in m) if (m[i] != '') { var o = {}; o[m[i]] = 1; r.push(o); } return r; }"}},{"reduce":{"language":"javascript","source":"function(v) { var r = {}; for (var i in v) { for(var w in v[i]) { if (w in r) r[w] += v[i][w]; else r[w] = v[i][w]; } } return [r]; }"}}]}
+^D
+#+END_EXAMPLE
+
+   And we end up with the word counts for the three documents.
+
+#+BEGIN_EXAMPLE
+[{"the":8,"rabbit":2,"hole":1,"went":1,"straight":1,"on":2,"like":1,"a":6,"tunnel":1,"for":2,"some":1,"way":1,"and":5,"then":1,"dipped":1,"suddenly":3,"down":2,"so":2,"that":1,"alice":3,"had":3,"not":1,"moment":1,"to":3,"think":1,"about":1,"stopping":1,"herself":2,"before":1,"she":4,"found":1,"falling":1,"very":3,"deep":1,"well":2,"was":3,"considering":1,"in":2,"her":5,"own":1,"mind":1,"as":2,"could":1,"hot":1,"day":1,"made":1,"feel":1,"sleepy":1,"stupid":1,"whether":1,"pleasure":1,"of":5,"making":1,"daisy":1,"chain":1,"would":1,"be":1,"worth":1,"trouble":1,"getting":1,"up":1,"picking":1,"daisies":1,"when":1,"white":1,"with":1,"pink":1,"eyes":1,"ran":1,"close":1,"by":2,"beginning":1,"get":1,"tired":1,"sitting":1,"sister":2,"bank":1,"having":1,"nothing":1,"do":1,"once":1,"or":3,"twice":1,"peeped":1,"into":1,"book":2,"reading":1,"but":1,"it":2,"no":1,"pictures":2,"conversations":1,"what":1,"is":1,"use":1,"thought":1,"without":1,"conversation":1}]
+#+END_EXAMPLE
+
+** Explanation
+
+   For more details about what each bit of syntax means, and other
+   syntax options, read the following sections.  As a quick
+   explanation of how the example map/reduce query worked, though:
+
+   1. The objects named =p1=, =p2=, and =p5= from the =alice= bucket
+      were given as inputs to the query.
+
+   2. The map function from the phase was run on each one.  The function:
+
+#+BEGIN_SRC javascript
+function(v) {
+   var m = v.values[0].data.match('\\w*','g');
+   var r = [];
+   for(var i in m)
+      if (m[i] != '') {
+         var o = {};
+         o[m[i]] = 1;
+         r.push(o);
+      }
+   return r;
+}
+#+END_SRC
+
+      creates a list of JSON objects, one for each word (non-unique)
+      in the text.  The object has as a key, the word, and as the
+      value for that key, the integer 1.
+
+   3. The reduce function from the phase was run on the outputs of the
+      map functions.  The function:
+
+#+BEGIN_SRC javascript
+function(v) {
+   var r = {};
+   for (var i in v) {
+      for(var w in v[i]) {
+         if (w in r)
+            r[w] += v[i][w];
+         else
+            r[w] = v[i][w];
+      }
+   }
+   return [r];
+ }
+#+END_SRC
+
+      looks at each JSON object in the input list.  It steps through
+      each key in each object, and produces a new object. That new
+      object has a key for each key in every other object, the value
+      of that key being the sum of the values of that key in the other
+      objects.  It returns this new object in a list, because is may
+      be run a second time on a list including that object and more
+      inputs from the map phase.
+
+   4. The final output is a list with one element: a JSON object with
+      a key for each word in all of the documents (unique), with the
+      value of that key being the number of times the word appeared in
+      the documents.
+
+* Query Syntax
+  
+  Map/Reduce queries are issued over HTTP via a POST to the /mapred
+  resource.  The body should be =application/json= of the form
+  ={"inputs":[...inputs...],"query":[...query...]}=.
+
+** Inputs
+
+   The list of input object is given as a list of 2-element lists of
+   the form =[Bucket,Key]= or 3-element lists of the form
+   =[Bucket,Key,KeyData]=.
+
+   You may also pass just the name of a bucket
+   (={"inputs":"mybucket",...}=), which is equivalent to passing all
+   of the keys in that bucket as inputs (i.e. "a map/reduce across the
+   whole bucket").  You should be aware that this triggers the
+   somewhat expensive "list keys" operation, so you should use it
+   sparingly.
+
+** Query
+
+   The query is given as a list of phases, each phase being of the
+   form ={PhaseType:{...spec...}}=.  Valid =PhaseType= values are
+   "map", "reduce", and "link".
+
+   Every phase spec may include a =keep= field, which must have a
+   boolean value: =true= means that the results of this phase should
+   be included in the final result of the map/reduce, =false= means
+   the results of this phase should be used only by the next phase.
+   Omitting the =keep= field accepts its default value, which is
+   =false= for all phases except the final phase (Riak assumes that
+   you were most interested in the results of the last phase of your
+   map/reduce query).
+
+*** Map
+
+    Map phases must be told where to find the code for the function to
+    execute, and what language that function is in.
+
+    Function source can be specified directly in the query by using
+    the "source" spec field.  Function source can also be loaded from
+    a pre-stored riak object by providing "bucket" and "key" fields in
+    the spec.
+
+    For example:
+
+:{"map":{"language":"javascript","source":"function(v) { return [v]; }","keep":true}}
+
+    Would run the Javascript function given in the spec, and include
+    the results in the final output of the m/r query.
+
+:{"map":{"language":"javascript","bucket":"myjs","key":"mymap","keep":false}}
+
+    Would run the Javascript function declared in the content of the
+    Riak object under =mymap= in the =myjs= bucket, and the results of
+    the funciton would not be included in the final output of the m/r
+    query.
+
+    Map phases may also be passed static arguments by using the "arg"
+    spec field.
+
+*** Reduce
+
+    Reduce phases look exactly like map phases, but are labeled "reduce".
+
+*** Link
+
+    Link phases accept =bucket= and =tag= fields that specify which
+    links match the link query.  The string "_" (underscore) in each
+    field means "match all", while any other string means "match
+    exactly this string".  If either field is left out, it is
+    considered to be set to "_" (match all).
+
+    For example:
+
+:{"link":{"bucket":"foo","keep":false}}
+
+    Would follow all links pointing to objects in the =foo= bucket,
+    regardless of their tag.
+
+* Javascript Functions
+** Function Parameters
+*** Map functions
+
+    Map functions are passed three parameters: the object that the map
+    is being applied to, the "keydata" for that object, and the static
+    argument for the phase.
+
+    The object will be a JSON object of the form:
+
+#+BEGIN_EXAMPLE
+{
+ "bucket":BucketAsString,
+ "key":KeyAsString,
+ "vclock":VclockAsString,
+ "values":[
+           {
+            "metadata":{
+                        "X-Riak-VTag":VtagAsString,
+                        "X-riak-Last-Modified":LastModAsString,
+                        ...other metadata...
+                       },
+            "data":ObjectData
+           },
+           ...other metadata/data values (siblings)...
+          ]
+}
+#+END_EXAMPLE
+
+    =object.values[0].data= is probably what you'll be interested in
+    most of the time, but the rest of the details of the object are
+    provided for your use.
+
+    The "keydata" is the third element of the item from the input
+    bucket/key list (called =KeyData= in the [[Inputs]] section above), or
+    "undefined" if none was provided.
+
+    The static argument for the phase is the value of the =arg= field
+    from the map spec in the query list.
+
+    A map phase should produce a list of results.  You will see errors
+    if the output of your map function is not a list.  Return the
+    empty list if your map function chooses not to produce output.
+    
+*** Reduce functions
+
+    Reduce functions are passed two parameters: a list of inputs to
+    reduce, and the static argument for the phase.
+
+    The list of inputs to reduce may contain values from previous
+    executions of the reduce function.  It will also contain results
+    produced by the preceding map or reduce phase.
+
+    The static argument for the phase is the value of the =arg= field
+    from the reduce spec in the query list.
+
+    A reduce phase should produce a list of results.  You will see
+    errors if the output of your reduce function is not a list.
+    Return the empty list if your reduce function chooses not to
+    produce output.
+
+*** Link functions
+
+    If you're storing data through the "raw" interface, and using the
+    =Link= HTTP header, you don't need to worry about writing a
+    link-extraction function.  Just use the predefined
+    =raw_link_walker_resource:mapreduce_linkfun/3=.
+
+    But, if you need to extract links from your data in some other
+    manner, there are many ways to specify Javascript functions to do
+    that.  They all start with setting the =linkfun= bucket property.
+    Through the raw HTTP interface:
+
+:$ curl -X PUT -H "application/json" http://localhost:8098/raw/bucket \
+:> --data "{\"props\":{\"linkfun\":{...function...}}}"
+
+    The three ways to fill in the value of the =linkfun= key are:
+
+    + Quoted source code, as the value of the =jsanon= key:
+
+      :{"jsanon":"function(v,kd,bt) { return []; }"}
+
+    + The bucket and key of an object containing the function source:
+
+      :{"jsanon":{"bucket":Bucket,"key":Key}}
+
+    + The name of a predefined Javascript function:
+
+      :{"jsfun":FunctionName}
+
+    The function has basically the same contract as a map function.
+    The first argument is the object from which links should be
+    extracted.  The second argument is the =KeyData= for the object.
+
+    The third argument is a Javascript object representing the links
+    to match at return.  The two fields in the object, =bucket= and
+    =tag=, will have the values given in the link phase spec from the
+    query.
+
+    The link fun should return a list of the same form as the =inputs=
+    list: 2-item bucket/key lists, or 3-item bucket/key/keydata lists.
+
+* TODO How M/R works on Riak
+  I'm thinking of moving some content from basic-mapreduce.txt into
+  this document, and then creating a small "Erlang companion".  This
+  file (js-mapreduce) would become the Riak Map/Reduce Guide, the
+  primary reference, while the Erlang companion would be basically
+  just "how to do the same stuff in Erlang."