Anonymous avatar Anonymous committed e1b1077 Merge

Merging

Comments (0)

Files changed (7)

apps/riak/src/jiak_resource.erl

 %%            when GETting a bucket, set schema=false if you do not
 %%            want the schema included in the response
 %%</dd><dt> keys
-%%</dt><dd>   allowed values: true (default), false
+%%</dt><dd>   allowed values: true (default), false, stream
 %%            when GETting a bucket, set keys=false if you do not want
-%%            the keylist included in the response
+%%            the keylist included in the response. Set keys=stream if
+%%            you want the list of keys streamed in chunks.  The first
+%%            chunk will be the normal bucket schema (if requested), followed by JSON-encoded
+%%            chunks of this format: {"keys": [Key1, Key2, ...]}
 %%</dd><dt> returnbody
 %%</dt><dd>   allowed values: true, false (default)
 %%            when PUTting or POSTing an object, set returnbody=true
                             end;
                         undefined ->
                             {true,
-                             wrq:append_to_respons_body(
+                             wrq:append_to_response_body(
                                "JSON object must contain either"
                                " a 'schema' field or a 'bucket_mod' field",
                                ReqData),
                  {"schema", "false"} -> [];
                  _ -> [{schema, {struct, full_schema(Mod)}}]
              end,
-    {Keys, Context1} = case proplists:lookup("keys", Qopts) of
-                           {"keys", "false"} -> {[], Context};
-                           _ -> 
-                               {ok, {K, NewCtx}} = retrieve_keylist(Context),
-                               {[{keys, K}], NewCtx}
-                       end,
-    KeyList = case Keys of
-        [{keys,Ks}] -> Ks;
-        _ -> []
-    end,
-    NewReqData = lists:foldl(fun(K,RD) ->
-                                     add_link_head(Bucket,K,"contained",RD)
-                             end,
-                             ReqData, KeyList),
-    JSONSpec = {struct, Schema ++ Keys},
-    {mochijson2:encode(JSONSpec), NewReqData, Context1};
+    {Keys, Context1} = maybe_fetch_keylist(ReqData, Context),
+    case Keys of
+        stream ->
+            JSONSpec = {struct, Schema},
+            {{stream, {mochijson2:encode(JSONSpec), stream_keys(Context1)}},
+             ReqData, Context1};
+        false ->
+            JSONSpec = {struct, Schema},
+            {mochijson2:encode(JSONSpec), ReqData, Context1};
+        _ ->
+            NewReqData = lists:foldl(
+                           fun(K,RD) ->
+                                   add_link_head(Bucket,K,"contained",RD)
+                           end,
+                           ReqData, Keys),
+            JSONSpec = {struct, [{keys,Keys}|Schema]},
+            {mochijson2:encode(JSONSpec), NewReqData, Context1}
+    end;
 produce_body(ReqData, Context=#ctx{module=Module,bucket=Bucket}) ->
     {ok, {JiakObject0, Context1}} = retrieve_object(ReqData, Context),
     JiakObject = apply_read_mask(Module, JiakObject0),
                          NewReqData),
      Context1}.    
 
+%% @spec maybe_fetch_keylist(wrq(), context()) ->
+%%          {[binary()]|false|stream, context()}
+%% @doc Get the list of keys in this bucket.  This function
+%%      memoizes the keylist in the context so it can be
+%%      called multiple times without duplicating work.
+maybe_fetch_keylist(ReqData, Context=#ctx{bucket=Bucket,
+                                          jiak_client=JiakClient,
+                                          bucketkeys=undefined}) ->
+    case wrq:get_qs_value("keys", ReqData) of
+        "false" ->
+            {false, Context#ctx{bucketkeys=false}};
+        "stream" ->
+            {stream, Context#ctx{bucketkeys=stream}};
+        _ ->
+            {ok, Keys} = JiakClient:list_keys(Bucket),
+            {Keys, Context#ctx{bucketkeys=Keys}}
+    end;
+maybe_fetch_keylist(_ReqData, Context=#ctx{bucketkeys=Keys}) ->
+    {Keys, Context}.
+
 add_container_link(Bucket,ReqData) ->
     Val = io_lib:format("</~s/~s>; rel=\"up\"",
                     [riak:get_app_env(jiak_name, "jiak"),
 %%          {string(), webmachine:wrq(), context()}
 %% @doc Generate the ETag for a bucket.
 make_bucket_etag(ReqData, Context) ->
-    {ok, {Keys, Context1}} = retrieve_keylist(Context),
-    ETag = mochihex:to_hex(crypto:sha(term_to_binary(Keys))),
+    {Keys, Context1} = maybe_fetch_keylist(ReqData, Context),
+    ETag = mochihex:to_hex(
+             crypto:sha(
+               term_to_binary(
+                 {Keys, catch full_schema(Context#ctx.module)}))),
     {ETag, ReqData, Context1#ctx{etag=ETag}}.
 
-%% @spec retrieve_keylist(context()) -> {ok, {[binary()], context()}}
-%% @doc Get the list of keys in this bucket.  This function
-%%      memoizes the keylist in the context so it can be
-%%      called multiple times without duplicating work.
-retrieve_keylist(Context=#ctx{bucket=Bucket,jiak_client=JiakClient,
-                              bucketkeys=undefined}) ->
-    {ok, Keys} = JiakClient:list_keys(Bucket),
-    {ok, {Keys, Context#ctx{bucketkeys=Keys}}};
-retrieve_keylist(Context=#ctx{bucketkeys=Keys}) ->
-    {ok, {Keys, Context}}.
-
 %% @spec make_object_etag(webmachine:wrq(), context()) ->
 %%          {string(), webmachine:wrq(), context()}
 %% @doc Generate the ETag for an object.
         String    -> list_to_integer(String)
     end.
 
+%% @private
+stream_keys(Context=#ctx{bucket=Bucket,jiak_client=Client}) ->
+    RiakClient = Client:riak_client(),
+    {ok, ReqId} = RiakClient:stream_list_keys(Bucket),
+    fun() -> stream_keys(ReqId, Context) end.
+
+%% @private
+stream_keys(ReqId, Context) ->
+    receive
+        {ReqId, {keys, Keys}} ->
+            {mochijson2:encode({struct, [{<<"keys">>, Keys}]}), fun() -> stream_keys(ReqId, Context) end};                                                                     
+        {ReqId, done} -> {mochijson2:encode({struct, [{<<"keys">>, []}]}), done}
+    end.
+
 %%
 %% Tests
 %%

apps/riak/src/raw_http.hrl

 -define(Q_PROPS, "props").
 -define(Q_KEYS,  "keys").
 -define(Q_FALSE, "false").
+-define(Q_STREAM, "stream").
 -define(Q_VTAG,  "vtag").

apps/riak/src/raw_http_resource.erl

 %%      Includes the bucket props unless the "props=false" query param
 %%      is specified.
 %%      Includes the keys of the documents in the bucket unless the
-%%      "keys=false" query param is specified.
+%%      "keys=false" query param is specified. If "keys=stream" query param
+%%      is specified, keys will be streamed back to the client in JSON chunks
+%%      like so: {"keys":[Key1, Key2,...]}.
 %%      A Link header will also be added to the response by this function
 %%      if the keys are included in the JSON object.  The Link header
 %%      will include links to all keys in the bucket, with the property
     {KeyPart, KeyRD} =
         case wrq:get_qs_value(?Q_KEYS, RD) of
             ?Q_FALSE -> {[], RD};
+            ?Q_STREAM -> {stream, RD};
             _ ->
                 {ok, KeyList} = C:list_keys(B),
                 {[{?Q_KEYS, KeyList}],
                    end,
                    RD, KeyList)}
         end,
-    {mochijson2:encode({struct, SchemaPart++KeyPart}), KeyRD, Ctx}.
+    case KeyPart of
+        stream -> {{stream, {mochijson2:encode({struct, SchemaPart}),
+                             fun() ->
+                                     {ok, ReqId} = C:stream_list_keys(B),
+                                     stream_keys(ReqId)
+                             end}},
+                   KeyRD,
+                   Ctx};
+       _ ->
+            {mochijson2:encode({struct, SchemaPart++KeyPart}), KeyRD, Ctx}
+    end.
+
+stream_keys(ReqId) ->
+    receive
+        {ReqId, {keys, Keys}} ->
+            {mochijson2:encode({struct, [{<<"keys">>, Keys}]}), fun() -> stream_keys(ReqId) end};                                                                     
+        {ReqId, done} -> {mochijson2:encode({struct, [{<<"keys">>, []}]}), done}
+    end.
 
 %% @spec accept_bucket_body(reqdata(), context()) -> {true, reqdata(), context()}
 %% @doc Modify the bucket properties according to the body of the

apps/riak/src/riak_connect.erl

         P <- AllIndices].    
 
 attempt_simple_transfer(Ring, Owners, ExitingNode) ->
+    TargetN = riak:get_app_env(target_n_val, 3),
     attempt_simple_transfer(Ring, Owners,
-                            riak:get_app_env(target_n_val, 3),
-                            ExitingNode, 0, []).
+                            TargetN,
+                            ExitingNode, 0,
+                            [{O,-TargetN} || O <- riak_ring:all_members(Ring),
+                                             O /= ExitingNode]).
 attempt_simple_transfer(Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) ->
     %% handoff
     case [ N || {N, I} <- Last, Idx-I >= TargetN ] of
                           end,
             case lists:filter(fun(N) -> 
                                  Next = StepsToNext(N),
-                                 (Next >= TargetN) orelse (Next == length(Rest))
+                                 (Next+1 >= TargetN)
+                                          orelse (Next == length(Rest))
                               end,
                               Candidates) of
                 [] ->
 
 
 * Installation
-*** Embedded Erlang Node
+** Embedded Erlang Node
 
-    If you downloaded a pre-built, binary release of Riak, or if
-    you have made it through building the release from source, you
-    should have an Erlang embedded node ready to run Riak in-place.
-    No further installation is needed.
+   If you downloaded a pre-built, binary release of Riak, or if
+   you have made it through building the release from source, you
+   should have an Erlang embedded node ready to run Riak in-place.
+   No further installation is needed.
 
-    To run Riak on other machines, simply copy the entire embedded
-    node directory to those machines.  See the [[Configuration]] section
-    for details about altering configurations for each machine.
+   To run Riak on other machines, simply copy the entire embedded
+   node directory to those machines.  See the [[Configuration]] section
+   for details about altering configurations for each machine.
 
+** Installing to OTP Libary Path
 
-*** Installing to OTP Libary Path
+   You also have the option of installing Riak into the Erlang/OTP
+   library path.  This will make Riak modules available to all other
+   Erlang applications on your system without needing to muck about
+   with code paths.
 
-    You also have the option of installing Riak into the Erlang/OTP
-    library path.  This will make Riak modules available to all other
-    Erlang applications on your system without needing to muck about
-    with code paths.
-
-    To install, simply execute =rebar install= in the top-level Riak
-    directory.
+   To install, simply execute =rebar install= in the top-level Riak
+   directory.
 
 #+BEGIN_EXAMPLE
     $ sudo ./rebar install

doc/js-mapreduce.org

     The link fun should return a list of the same form as the =inputs=
     list: 2-item bucket/key lists, or 3-item bucket/key/keydata lists.
 
-* TODO How M/R works on Riak
+* How Map/Reduce Queries Work
+
+** Map/Reduce Intro
+
+   The main goal of Map/Reduce is to spread the processing of a query
+   across many systems to take advantage of parallel processing power.
+   This is generally done by dividing the query into several steps,
+   dividing the dataset into several chunks, and then running those
+   step/chunk pairs in separate physical hosts.
+
+   One step type is called "map".  Map functions take one piece of
+   data as input, and produce zero or more results as output.  If
+   you're familiar with "mapping over a list" in functional
+   programming style, you're already familiar with "map" steps in a
+   map/reduce query.
+
+   Another step type is called "reduce".  The purpose of a "reduce"
+   step is to combine the output of many "map" step evaluations, into
+   one result.
+
+   The common example of a map/reduce query involves a "map" step that
+   takes a body of text as input, and produces a word count for that
+   body of text.  A reduce step then takes the word counts produced
+   from many bodies of text and either sums them to provide a word
+   count for the corpus, or filters them to produce a list of
+   documents containing only certain counts.
+
+** Riak-specific Map/Reduce
+
+*** How Riak Spreads Processing
+
+   Riak's map/reduce has an additional goal: increasing data-locality.
+   When processing a large dataset, it's often much more efficient to
+   take the computation to the data than it is to bring the data to
+   the computation.
+
+   It is Riak's solution to the data-locality problem that determines
+   how Riak spreads the processing across the cluster.  In the same
+   way that any Riak node can coordinate a read or write by sending
+   requests directly to the other nodes responsible for maintaining
+   that data, any Riak node can also coordinate a map/reduce query by
+   sending a map-step evaluation request directly to the node
+   responsible for maintaining the input data. Map-step results are
+   sent back to the coordinating node, where reduce-step processing
+   can produce a unified result.
+
+   Put more simply: Riak runs map-step functions right on the node
+   holding the input data for those functions, and it runs reduce-step
+   functions on the node coordinating the map/reduce query.
+
+*** How a Map Phase Works in Riak
+
+    
+
   I'm thinking of moving some content from basic-mapreduce.txt into
   this document, and then creating a small "Erlang companion".  This
   file (js-mapreduce) would become the Riak Map/Reduce Guide, the

rel/overlay/bin/riak-admin

     exit 1
 fi
 
+# Learn how to specify node name for connection from remote nodes
+if [[ "$NAME_ARG" =~ ^-sname ]]; then
+    NAME_PARAM="-sname"
+    NAME_HOST=""
+else
+    NAME_PARAM="-name"
+    if [[ "$NAME_ARG" =~ (@.*) ]]; then
+        NAME_HOST=${BASH_REMATCH[1]}
+    else
+        NAME_HOST=""
+    fi
+fi
+
 # Extract the target cookie
 COOKIE_ARG=`grep -e '-setcookie' $RUNNER_ETC_DIR/vm.args`
 if [ -z "$COOKIE_ARG" ]; then
         COOKIE=$2
         FILENAME=$3
 
-        $ERTS_PATH/erl -noshell -name riak_backup -setcookie $COOKIE \
+        $ERTS_PATH/erl -noshell $NAME_PARAM riak_backup$NAME_HOST -setcookie $COOKIE \
                        -eval "riak_backup:$ACTION('$NODE', \"$FILENAME\")" -s init stop
         ;;
 
         COOKIE=$2
         FILENAME=$3
 
-        $ERTS_PATH/erl -noshell -name riak_logger -setcookie $COOKIE \
+        $ERTS_PATH/erl -noshell $NAME_PARAM riak_logger$NAME_HOST -setcookie $COOKIE \
                        -eval "riak_event_logger:start('$NODE', \"$FILENAME\")."            
         ;;
 
         # Parse out the node name to pass to the client
         NODE_NAME=${NAME_ARG#* }
 
-        $ERTS_PATH/erl -noshell -name riak_test $COOKIE_ARG \
+        $ERTS_PATH/erl -noshell $NAME_PARAM riak_test$NAME_HOST $COOKIE_ARG \
                        -eval "riak:client_test(\"$NODE_NAME\")" -s init stop
 
         ;;
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.