Commits

Anonymous committed 6d84040 Merge

Merge.

Comments (0)

Files changed (6)

client_lib/jiak.js

 //                 alert("note's author is: "+
 //                       authors[0].object.name);
 //               });
+//
+// Default R, W, DW, and RW values are all 2.  To use other
+// values, pass an options object of the form:
+//    {r: 1, // value you want for R
+//     w: 3, // value you want for W
+//     dw:1, // value you want for DW
+//     rw:1} // value you want for RW
+// or, pass the values as parameters to the store, fetch
+// and delete functions.
 
 // This file is provided to you under the Apache License,
 // Version 2.0 (the "License"); you may not use this file
     this.opts = Opts||{};
 }
 
-JiakClient.prototype.store = function(Object, Callback, NoReturnBody) {
+JiakClient.prototype.store = function(Object, Callback, NoReturnBody, W, DW, R) {
     var req = {
         contentType: "application/json",
         dataType: "json"
     req.url = this.path(Object.bucket);
     if (Object.key) req.url += Object.key;
     
-    if (!(this.opts.noReturnBody || NoReturnBody))
+    var q = false;
+    if (!(this.opts.noReturnBody || NoReturnBody)) {
         req.url += '?returnbody=true';
+        q = true;
+    }
+
+    if (W || this.opts.w) {
+        req.url += (q?'&':'?')+'w='+(W||this.opts.w);
+        q = true;
+    }
+
+    if (DW || this.opts.dw) {
+        req.url += (q?'&':'?')+'dw='+(DW||this.opts.dw);
+        q = true;
+    }
+
+    if (R || this.opts.r)
+        req.url += (q?'&':'?')+'r='+(R||this.opts.r);
 
     if (typeof Callback == 'function')
         req.success = Callback;
     return $.ajax(req);
 }
 
-JiakClient.prototype.fetch = function(Bucket, Key, Callback) {
+JiakClient.prototype.fetch = function(Bucket, Key, Callback, R) {
     return $.ajax({
-        url:      this.path(Bucket, Key),
+        url:      this.path(Bucket, Key)+
+                    ((R||this.opts.r)?('?r='+(R||this.opts.r)):''),
         dataType: "json",
         success:  Callback
     });
 }
 
-JiakClient.prototype.remove = function(Bucket, Key, Callback) {
+JiakClient.prototype.remove = function(Bucket, Key, Callback, RW) {
     return $.ajax({
         type:    'DELETE',
-        url:     this.path(Bucket, Key),
+        url:     this.path(Bucket, Key)+
+                   ((RW||this.opts.rw)?('?rw='+(RW||this.opts.rw)):''),
         success: Callback
     });
 }

client_lib/jiak.rb

 require 'json'
 
 class JiakClient
-  def initialize(ip, port, jiakPrefix='/jiak/')
+  def initialize(ip, port, jiakPrefix='/jiak/', options={})
     @ip = ip
     @port = port
     @prefix = jiakPrefix
+    @opts = options
   end
 
   # Set the schema for 'bucket'.  The schema parameter
   end
   
   # Get the object stored in 'bucket' at 'key'
-  def fetch(bucket, key)
-    do_req(Net::HTTP::Get.new(path(bucket, key)), '200')
+  def fetch(bucket, key, r=nil)
+    do_req(Net::HTTP::Get.new(path(bucket, key,
+                                   {'r'=>(r||@opts['r'])})),
+           '200')
   end
 
   # Store 'object' in Riak.  If the object has not defined
   # its 'key' field, a key will be chosen for it by the server.
-  def store(object)
+  def store(object, w=nil, dw=nil, r=nil)
+    q = {
+      'returnbody'=>'true',
+      'w'=>(w||@opts['w']),
+      'dw'=>(dw||@opts['dw']),
+      'r'=>(r||@opts['r'])
+    }
     if (object['key'])
-      req = Net::HTTP::Put.new(path(object['bucket'], object['key'])+
-                               '?returnbody=true')
+      req = Net::HTTP::Put.new(path(object['bucket'], object['key'], q))
       code = '200'
     else
-      req = Net::HTTP::Post.new(path(object['bucket'])+'?returnbody=true')
+      req = Net::HTTP::Post.new(path(object['bucket'], nil, q))
       code = '201'
     end
 
   end
 
   # Delete the data stored in 'bucket' at 'key'
-  def delete(bucket, key)
-    do_req(Net::HTTP::Delete.new(path(bucket, key)), '204')
+  def delete(bucket, key, rw=nil)
+    do_req(Net::HTTP::Delete.new(path(bucket, key,
+                                      {'rw'=>(rw||@opts['rw'])})),
+           '204')
   end
 
   # Follow links from the object stored in 'bucket' at 'key'
     acc
   end
 
-  def path(bucket, key='')
+  def path(bucket, key=nil, reqOpts={})
     p = @prefix + URI.encode(bucket) + '/'
-    if (key != '')
+    if (key)
       p += URI.encode(key) + '/'
     end
+
+    q = [];
+    reqOpts.each do |n,v|
+      if (v): q.push "#{n}=#{v}" end
+    end
+    if (q.length > 0): p += '?'+(q.join('&')) end
+
     p
   end
 

demo/stickynotes/priv/www/js/jiak.js

 //                 alert("note's author is: "+
 //                       authors[0].object.name);
 //               });
+//
+// Default R, W, DW, and RW values are all 2.  To use other
+// values, pass an options object of the form:
+//    {r: 1, // value you want for R
+//     w: 3, // value you want for W
+//     dw:1, // value you want for DW
+//     rw:1} // value you want for RW
+// or, pass the values as parameters to the store, fetch
+// and delete functions.
 
 // This file is provided to you under the Apache License,
 // Version 2.0 (the "License"); you may not use this file
     this.opts = Opts||{};
 }
 
-JiakClient.prototype.store = function(Object, Callback, NoReturnBody) {
+JiakClient.prototype.store = function(Object, Callback, NoReturnBody, W, DW, R) {
     var req = {
         contentType: "application/json",
         dataType: "json"
     req.url = this.path(Object.bucket);
     if (Object.key) req.url += Object.key;
     
-    if (!(this.opts.noReturnBody || NoReturnBody))
+    var q = false;
+    if (!(this.opts.noReturnBody || NoReturnBody)) {
         req.url += '?returnbody=true';
+        q = true;
+    }
+
+    if (W || this.opts.w) {
+        req.url += (q?'&':'?')+'w='+(W||this.opts.w);
+        q = true;
+    }
+
+    if (DW || this.opts.dw) {
+        req.url += (q?'&':'?')+'dw='+(DW||this.opts.dw);
+        q = true;
+    }
+
+    if (R || this.opts.r)
+        req.url += (q?'&':'?')+'r='+(R||this.opts.r);
 
     if (typeof Callback == 'function')
         req.success = Callback;
     return $.ajax(req);
 }
 
-JiakClient.prototype.fetch = function(Bucket, Key, Callback) {
+JiakClient.prototype.fetch = function(Bucket, Key, Callback, R) {
     return $.ajax({
-        url:      this.path(Bucket, Key),
+        url:      this.path(Bucket, Key)+
+                    ((R||this.opts.r)?('?r='+(R||this.opts.r)):''),
         dataType: "json",
         success:  Callback
     });
 }
 
-JiakClient.prototype.remove = function(Bucket, Key, Callback) {
+JiakClient.prototype.remove = function(Bucket, Key, Callback, RW) {
     return $.ajax({
         type:    'DELETE',
-        url:     this.path(Bucket, Key),
+        url:     this.path(Bucket, Key)+
+                   ((RW||this.opts.rw)?('?rw='+(RW||this.opts.rw)):''),
         success: Callback
     });
 }

src/riak_client.erl

 -author('Justin Sheehy <justin@basho.com>').
 
 -export([mapred/2,mapred/3]).
+-export([mapred_stream/2,mapred_stream/3]).
 -export([get/3,get/4]).
 -export([put/2,put/3,put/4]).
 -export([delete/3,delete/4]).
 
 %% @spec mapred(Inputs :: list(),
 %%              Query :: [riak_mapreduce_fsm:mapred_queryterm()],
-%%              TimeoutMillisecs :: integer()) ->
+%%              TimeoutMillisecs :: integer()  | 'infinity') ->
 %%       {ok, riak_mapreduce_fsm:mapred_result()} |
 %%       {error, {bad_qterm, riak_mapreduce_fsm:mapred_queryterm()}} |
 %%       {error, timeout} |
 %% @doc Perform a map/reduce job across the cluster.
 %%      See the map/reduce documentation for explanation of behavior.
 mapred(Inputs,Query,Timeout)
-  when is_list(Inputs), is_list(Query), is_integer(Timeout) ->
+  when is_list(Inputs), is_list(Query),
+       (is_integer(Timeout) orelse Timeout =:= infinity) ->
     Me = self(),
+    {ok, {ReqId, MR_FSM}} = mapred_stream(Query,Me,Timeout),
+    gen_fsm:send_event(MR_FSM,{input,Inputs}),
+    gen_fsm:send_event(MR_FSM,input_done),
+    collect_mr_results(ReqId, Timeout, []).
+
+%% @spec mapred_stream(Query :: [riak_mapreduce_fsm:mapred_queryterm()],
+%%                     ClientPid :: pid()) ->
+%%       {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
+%%       {error, {bad_qterm, riak_mapreduce_fsm:mapred_queryterm()}} |
+%%       {error, Err :: term()}
+%% @doc Perform a streaming map/reduce job across the cluster.
+%%      See the map/reduce documentation for explanation of behavior.
+mapred_stream(Query,ClientPid) ->
+    mapred_stream(Query,ClientPid,?DEFAULT_TIMEOUT).
+
+%% @spec mapred_stream(Query :: [riak_mapreduce_fsm:mapred_queryterm()],
+%%                     ClientPid :: pid(),
+%%                     TimeoutMillisecs :: integer() | 'infinity') ->
+%%       {ok, {ReqId :: term(), MR_FSM_PID :: pid()}} |
+%%       {error, {bad_qterm, riak_mapreduce_fsm:mapred_queryterm()}} |
+%%       {error, Err :: term()}
+%% @doc Perform a streaming map/reduce job across the cluster.
+%%      See the map/reduce documentation for explanation of behavior.
+mapred_stream(Query,ClientPid,Timeout)
+  when is_list(Query), is_pid(ClientPid),
+       (is_integer(Timeout) orelse Timeout =:= infinity) ->
     ReqId = mk_reqid(),
-    spawn(Node, riak_mapreduce_fsm, start, [ReqId,Inputs,Query,Timeout,Me]),
-    wait_for_reqid(ReqId, Timeout).
+    {ok, MR_FSM} = rpc:call(Node, riak_mapreduce_fsm, start,
+                            [ReqId,Query,Timeout,ClientPid]),
+    {ok, {ReqId, MR_FSM}}.
 
 %% @spec get(riak_object:bucket(), riak_object:key(), R :: integer()) ->
 %%       {ok, riak_object:riak_object()} |
     after Timeout ->
             {error, timeout}
     end.
+
+%% @private
+collect_mr_results(ReqId, Timeout, Acc) ->
+    receive
+        {ReqId, done} -> {ok, Acc};
+        {ReqId,{mr_results,Res}} -> collect_mr_results(ReqId,Timeout,Acc++Res)
+    after Timeout ->
+            {error, timeout}
+    end.
+
+
+            
+            

src/riak_mapreduce_fsm.erl

 -module(riak_mapreduce_fsm).
 -behaviour(gen_fsm).
 
--export([start/5]).
+-export([start/4]).
 -export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).
 
 -export([wait/2]). 
 
--record(state, {client, reqid, acc, fsms, starttime, endtime, ring}).
+-record(state, {client,reqid,fsms,starttime,timeout,ring,input_done}).
 
-start(ReqId,Inputs,Query,Timeout,Client) ->
-    gen_fsm:start(?MODULE, [ReqId,Inputs,Query,Timeout,Client], []).
+start(ReqId,Query,Timeout,Client) ->
+    gen_fsm:start(?MODULE, [ReqId,Query,Timeout,Client], []).
 %% @private
-init([ReqId,Inputs,Query,Timeout,Client]) ->
+init([ReqId,Query,Timeout,Client]) ->
     {ok, Ring} = riak_ring_manager:get_my_ring(),
-    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start,
-              {ReqId, length(Inputs), length(Query)}),
+    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_start, {ReqId, Query}),
     case check_query_syntax(Query) of
         ok ->
             FSMs = make_phase_fsms(Query, Ring), % Pid for each phase, in-order
-            gen_fsm:send_event(hd(FSMs), {input, Inputs}),
-            gen_fsm:send_event(hd(FSMs), done),
-            StateData = #state{client=Client,fsms=FSMs,acc=[],reqid=ReqId,
-                               starttime=riak_util:moment(),
-                               endtime=Timeout+riak_util:moment(),
-                               ring=Ring},
+            StateData = #state{client=Client,fsms=FSMs,reqid=ReqId,
+                               starttime=riak_util:moment(),timeout=Timeout,
+                               ring=Ring,input_done=false},
             {ok,wait,StateData,Timeout};
         {bad_qterm, QTerm} ->
             riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
     {ok, Pid} = PhaseMod:start_link(Ring, QTerm, NextFSM, self()),
     make_phase_fsms(Rest,Pid,[Pid|FSMs], Ring).
 
-wait({done,FSM}, StateData=#state{client=Client,acc=Acc,reqid=ReqId,
-                                  endtime=End,fsms=FSMs0}) ->
+wait({input,Inputs},
+     StateData=#state{reqid=ReqId,timeout=Timeout,fsms=FSMs}) ->
+    riak_eventer:notify(riak_mapreduce_fsm, mr_got_input,
+                        {ReqId, length(Inputs)}),
+    gen_fsm:send_event(hd(FSMs), {input, Inputs}),
+    {next_state, wait, StateData, Timeout};
+wait(input_done, StateData=#state{reqid=ReqId,fsms=FSMs}) ->
+    riak_eventer:notify(riak_mapreduce_fsm, mr_done_input, {ReqId}),
+    gen_fsm:send_event(hd(FSMs), done),
+    maybe_finish(StateData#state{input_done=true});
+wait({done,FSM}, StateData=#state{fsms=FSMs0}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done_msg, {FSM,FSMs0}),
     FSMs = lists:delete(FSM,FSMs0),
-    case FSMs of
-        [] -> 
-            riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
-                                {ok, ReqId, length(Acc)}),
-            Client ! {ReqId, {ok, Acc}},
-            {stop,normal,StateData};
-        _ ->
-            {next_state, wait, StateData#state{fsms=FSMs},
-             End-riak_util:moment()}
-    end;
+    maybe_finish(StateData#state{fsms=FSMs});
 wait({error, ErrFSM, ErrMsg}, StateData=#state{client=Client,reqid=ReqId,
                                                fsms=FSMs0}) ->
     FSMs = lists:delete(ErrFSM,FSMs0),
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {error, ReqId}),
     Client ! {ReqId, {error, ErrMsg}},
     {stop,normal,StateData};
-wait({acc,Data}, StateData=#state{acc=Acc,endtime=End}) ->
-    AccData = case Data of
-        {single, X} -> [X|Acc];
-        {list, X} -> X ++ Acc
+wait({acc,Data}, StateData=#state{reqid=ReqId,client=Client,timeout=Timeout}) ->
+    LData = case Data of
+        {single, X} -> [X];
+        {list, X} -> X
     end,
-    {next_state, wait, StateData#state{acc=AccData},End-riak_util:moment()};
+    Client ! {ReqId, {mr_results, LData}},
+    {next_state, wait, StateData,Timeout};
 wait(timeout, StateData=#state{reqid=ReqId,client=Client}) ->
     riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done, {timeout, ReqId}),
     Client ! {ReqId, {error, timeout}},
     {stop,normal,StateData}.
 
 %% @private
+maybe_finish(StateData=#state{input_done=Input_Done,fsms=FSMs,
+                client=Client,reqid=ReqId,timeout=Timeout}) ->
+    case Input_Done of
+        false ->
+            {next_state, wait, StateData, Timeout};
+        true ->
+            case FSMs of
+                [] ->
+                    riak_eventer:notify(riak_mapreduce_fsm, mr_fsm_done,
+                                        {ok, ReqId}),
+                    Client ! {ReqId, done},
+                    {stop,normal,StateData};
+                _ ->
+                    {next_state, wait, StateData, Timeout}
+            end
+    end.
+
+%% @private
 handle_event(_Event, _StateName, StateData) ->
     {stop,badmsg,StateData}.
 

src/riak_util.erl

 mkclientid(RemoteNode) ->
     {{Y,Mo,D},{H,Mi,S}} = erlang:universaltime(),
     {_,_,NowPart} = now(),
-    list_to_binary(lists:flatten(io_lib:format(
-           "~4.4.0w~2.2.0w~2.2.0w~2.2.0w~2.2.0w~2.2.0w-~s-~s-~p",
-                    [Y,Mo,D,H,Mi,S,node(),RemoteNode,NowPart]))).
+    Id = erlang:phash2([Y,Mo,D,H,Mi,S,node(),RemoteNode,NowPart]),
+    <<Id:32>>.
 
 %% @spec chash_key(BKey :: riak_object:bkey()) -> chash:index().
 %% @doc Create a binary used for determining replica placement.