Anonymous avatar Anonymous committed ec5bca5 Merge

merge

Comments (0)

Files changed (9)

client_lib/jiak.js

 //                 alert('note's author is: '+
 //                       authors[0].object.name);
 //               });
+
+// This file is provided to you under the Apache License,
+// Version 2.0 (the "License"); you may not use this file
+// except in compliance with the License.  You may obtain
+// a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.    
+
+
 function JiakClient(BaseUrl, Opts) {
     this.baseurl = BaseUrl;
     if (!(this.baseurl.slice(-1) == '/'))

client_lib/jiak.php

 <?php
 
+  /* 
+    This file is provided to you under the Apache License,
+    Version 2.0 (the "License"); you may not use this file
+    except in compliance with the License.  You may obtain
+    a copy of the License at
+   
+      http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.    
+  */
+
 class JiakClient {
     function JiakClient($jiakIP, $jiakPort) {
         $this->jiakIP = $jiakIP;

client_lib/jiak.py

 #!/usr/bin/env python
 
+# This file is provided to you under the Apache License,
+# Version 2.0 (the "License"); you may not use this file
+# except in compliance with the License.  You may obtain
+# a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.    
+
 import httplib
 try:
     import json
 	 successors/2,successors/3,
 	 predecessors/2,predecessors/3,
 	 contains_name/2,key_of/1,
-	 merge_rings/3]).
+	 merge_rings/2]).
 
 -define(RINGTOP, trunc(math:pow(2,160)-1)).  % SHA-1 space
 
     {_NumPartitions, Nodes} = CHash,
     Nodes.
 
-% @doc Return the best possible merge of two rings based on a vclock.
+% @doc Return a randomized merge of two rings based on a vclock.
 %      If multiple nodes are actively claiming nodes in the same
-%      time period, some churn can occur.
-% @spec merge_rings(VClock :: vclock:vclock(),
-%                   CHashA :: chash(), CHashB :: chash()) -> chash()
-merge_rings(VClock,CHashA,CHashB) ->
+%      time period, churn will occur.  Be prepared to live with it.
+% @spec merge_rings(CHashA :: chash(), CHashB :: chash()) -> chash()
+merge_rings(CHashA,CHashB) ->
     {NumPartitions, NodesA} = CHashA,
     {NumPartitions, NodesB} = CHashB,
-    {NumPartitions, [{I,recentnode(VClock,A,B)} || 
+    {NumPartitions, [{I,randomnode(A,B)} || 
 		   {{I,A},{I,B}} <- lists:zip(NodesA,NodesB)]}.
 
-% @spec recentnode(VClock :: vclock:vclock(), NodeA :: node(),
-%                   NodeB :: node()) -> node()
-recentnode(VClock,NodeA,NodeB) ->
-    % note: this can result in node "theft" -- this is okay!
-    % the idea is to gossip often, leading to convergence.
-    CtrA = vclock:get_counter(NodeA,VClock),
-    CtrB = vclock:get_counter(NodeB,VClock),
-    if
-	CtrA > CtrB ->
-	    NodeA;
-	true ->
-	    NodeB
-    end.
+% @spec randomnode(NodeA :: node(), NodeB :: node()) -> node()
+randomnode(NodeA,NodeA) -> NodeA;
+randomnode(NodeA,NodeB) -> lists:nth(crypto:rand_uniform(1,3),[NodeA,NodeB]).
 
 % @doc Return the number of partitions in the ring.
 % @spec size(CHash :: chash()) -> integer()

src/riak_eventer.erl

 %% @private
 init([]) -> {ok, #state{ring=undefined, eventers=[]}}.
 
-notify(Event) -> gen_server2:cast(?MODULE, {event, Event}).
+notify(Event) ->
+    gen_server2:cast(riak_local_logger, {event, Event}),
+    gen_server2:cast(?MODULE, {event, Event}).
 
 notify(Module, EventName, EventDetail) ->
     notify({Module, EventName, node(), EventDetail}).

src/riak_local_logger.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+-module(riak_local_logger).
+-behavior(gen_server2).
+
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+	 terminate/2, code_change/3]).
+
+start_link() -> gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%% @private
+init([]) ->
+    case riak:get_app_env(riak_local_logfile) of
+        undefined ->
+            {ok, no_logger};
+        FN ->
+            {ok, CWD} = file:get_cwd(),
+            LogFN = filename:join([CWD,FN]),
+            ok = filelib:ensure_dir(LogFN),
+            file:open(LogFN, [raw, append, delayed_write])
+    end.
+
+%% @private
+handle_cast(_EventEntry, no_logger) -> {noreply,no_logger};
+handle_cast({event, EventEntry}, FD) ->
+    file:write(FD, [fmtnow()]),
+    file:write(FD, io_lib:format(": ~p~n",[EventEntry])),
+    {noreply,FD}.
+
+%% @private
+handle_call(_, _From, State) -> {noreply, State}.
+
+%% @private
+handle_info(_, State) -> {ok, State}.
+
+%% @private
+terminate(swap, State)  -> {?MODULE, State};
+terminate(_Reason,_State)  -> ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+month(1) ->  "Jan";
+month(2) ->  "Feb";
+month(3) ->  "Mar";
+month(4) ->  "Apr";
+month(5) ->  "May";
+month(6) ->  "Jun";
+month(7) ->  "Jul";
+month(8) ->  "Aug";
+month(9) ->  "Sep";
+month(10) -> "Oct";
+month(11) -> "Nov";
+month(12) -> "Dec".
+
+zone() ->
+    Time = erlang:universaltime(),
+    LocalTime = calendar:universal_time_to_local_time(Time),
+    DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) - calendar:datetime_to_gregorian_seconds(Time),
+    zone((DiffSecs/3600)*100).
+
+zone(Val) when Val < 0 ->
+    io_lib:format("-~4..0w", [trunc(abs(Val))]);
+zone(Val) when Val >= 0 ->
+    io_lib:format("+~4..0w", [trunc(abs(Val))]).
+
+fmtnow() ->
+    {{Year, Month, Date}, {Hour, Min, Sec}} = calendar:local_time(),
+    io_lib:format("[~2..0w/~s/~4..0w:~2..0w:~2..0w:~2..0w ~s]",
+                  [Date,month(Month),Year, Hour, Min, Sec, zone()]).
+

src/riak_ring.erl

 -module(riak_ring).
 -include_lib("eunit/include/eunit.hrl").
 
--export([fresh/0,fresh/1,fresh/2,fresh_from_extern/1,fresh_from_extern/2,
+-export([fresh/0,fresh/1,fresh/2,preflist/2,filtered_preflist/3,
 	 owner_node/1,all_members/1,num_partitions/1,all_owners/1,
-	 preflist/2,filtered_preflist/3,
          transfer_node/3,reconcile/2, my_indices/1,
 	 index_owner/2,diff_nodes/2,random_node/1, random_other_index/1,
          get_meta/2, update_meta/3]).	 
                  meta}). % dict of cluster-wide other data
                          % (primarily bucket N-value, etc)
 
-sequence_test() ->
-    test_ensure(crypto),
-    I1 = 365375409332725729550921208179070754913983135744,
-    I2 = 730750818665451459101842416358141509827966271488,
-    A = fresh(4,a),
-    B1 = fresh_from_extern(A,b),
-    B2 = transfer_node(I1, b, B1),
-    {new_ring, A1} = reconcile(B1,A),
-    C1 = fresh_from_extern(A,c),
-    C2 = transfer_node(I1, c, C1),
-    {new_ring, A2} = reconcile(C2,A1),
-    {new_ring, A3} = reconcile(B2,A2),
-    C3 = transfer_node(I2,c,C2),
-    {new_ring, C4} = reconcile(A3,C3),
-    {new_ring, A4} = reconcile(C4,A3),
-    {new_ring, B3} = reconcile(A4,B2),
-    (A4#hstate.ring == B3#hstate.ring) == (
-      B3#hstate.ring == C4#hstate.ring).
-
-% @spec test_ensure(App :: atom()) -> ok
-test_ensure(App) ->
-    case [X || {X,_,_} <- application:which_applications(), X == App] of
-	[] ->
-	    application:start(App);
-	_ ->
-	    ok
-    end.
+% need to re-create this test now that we no longer have fresh_from_extern
+%% sequence_test() ->
+%%     test_ensure(crypto),
+%%     I1 = 365375409332725729550921208179070754913983135744,
+%%     I2 = 730750818665451459101842416358141509827966271488,
+%%     A = fresh(4,a),
+%%     B1 = fresh_from_extern(A,b),
+%%     B2 = transfer_node(I1, b, B1),
+%%     {new_ring, A1} = reconcile(B1,A),
+%%     C1 = fresh_from_extern(A,c),
+%%     C2 = transfer_node(I1, c, C1),
+%%     {new_ring, A2} = reconcile(C2,A1),
+%%     {new_ring, A3} = reconcile(B2,A2),
+%%     C3 = transfer_node(I2,c,C2),
+%%     {new_ring, C4} = reconcile(A3,C3),
+%%     {new_ring, A4} = reconcile(C4,A3),
+%%     {new_ring, B3} = reconcile(A4,B2),
+%%     (A4#hstate.ring == B3#hstate.ring) == (
+%%       B3#hstate.ring == C4#hstate.ring).
+%% % @spec test_ensure(App :: atom()) -> ok
+%% test_ensure(App) ->
+%%     case [X || {X,_,_} <- application:which_applications(), X == App] of
+%% 	[] ->
+%% 	    application:start(App);
+%% 	_ ->
+%% 	    ok
+%%     end.
 
 % @doc This is used only when this node is creating a brand new cluster.
 % @spec fresh() -> hstate()
 % @spec fresh(RingSize :: integer(), NodeName :: term()) -> hstate()
 fresh(RingSize, NodeName) ->
     #hstate{nodename=NodeName,
-	    vclock=vclock:increment(NodeName,vclock:fresh()),
+	    vclock=vclock:fresh(),
 	    ring=chash:fresh(RingSize, NodeName),
             meta=dict:new()}.
 
-% @doc This is used when a node is joining an existing cluster, either
-%      as a new member or upon restarting.
-% @spec fresh_from_extern(ExtState :: hstate()) -> hstate()
-fresh_from_extern(ExtState) ->
-    fresh_from_extern(ExtState,node()).
-% @doc Equivalent to fresh_from_extern/1 but allows specification of the
-%      local node name.  Called by fresh_from_extern/1, and otherwise
-%      only intended for testing purposes.
-% @spec fresh_from_extern(ExtState :: hstate(), NodeName :: term()) -> hstate()
-fresh_from_extern(ExtState,NodeName) ->
-    #hstate{nodename=NodeName,
-	    vclock=ExtState#hstate.vclock,
-	    ring=ExtState#hstate.ring,
-            meta=ExtState#hstate.meta}.
-
 % @doc Return all partition indices owned by the node executing this function.
 % @spec my_indices(State :: hstate()) -> [integer()]
 my_indices(State) ->
     case ancestors([ExternState, MyState]) of
         [OlderState] ->
           case vclock:equal(OlderState#hstate.vclock,MyState#hstate.vclock) of
-              true -> 
-                  {new_ring, #hstate{nodename=MyState#hstate.nodename,
-                                     vclock=ExternState#hstate.vclock,
-                                     ring=ExternState#hstate.ring,
-                                     meta=ExternState#hstate.meta}};
-              false ->
-                  {no_change, MyState}
+              true -> {new_ring, #hstate{nodename=MyState#hstate.nodename,
+                                         vclock=ExternState#hstate.vclock,
+                                         ring=ExternState#hstate.ring,
+                                         meta=ExternState#hstate.meta}};
+              false -> {no_change, MyState}
           end;
 	[] -> 
             case equal_rings(ExternState,MyState) of
     VClock = vclock:increment(MyNodeName,
 				 vclock:merge([StateA#hstate.vclock,
 					       StateB#hstate.vclock])),
-    Ring = chash:merge_rings(VClock,StateA#hstate.ring,StateB#hstate.ring),
+    Ring = chash:merge_rings(StateA#hstate.ring,StateB#hstate.ring),
     Meta = merge_meta(StateA#hstate.meta, StateB#hstate.meta),
     #hstate{nodename=MyNodeName,
 	    vclock=VClock,

src/riak_ring_gossiper.erl

                     riak_ring_manager:set_my_ring(NewRing),
                     {ok, MyNewRing} = maybe_claim(),
                     riak_ring_manager:set_my_ring(MyNewRing),
-                    Changed = riak_ring:diff_nodes(MyNewRing, NewRing),
-                    [gossip_to(X) ||
-                        X <- [riak_ring:random_node(MyNewRing)|Changed],
-                        X =/= node()],
+                    Me = node(),
+                    case riak_ring:random_node(MyNewRing) of
+                        Me -> nop;
+                        RandNode -> gossip_to(RandNode)
+                    end,
                     riak_eventer:notify(riak_ring_gossiper, changed_ring, 
-                                        {gossip_changed, length(Changed)}),
+                                        gossip_changed),
                     loop(write)
             end;
-        {set_ring, ExternRing} ->
-            riak_eventer:notify(riak_ring_gossiper,
-                                fresh_ring, fresh_ring),
-            riak_ring_manager:set_my_ring(
-              riak_ring:fresh_from_extern(ExternRing, node())),
-            {ok, MyNewRing} = maybe_claim(),
-            riak_ring_manager:set_my_ring(MyNewRing),
-            Changed = riak_ring:diff_nodes(MyNewRing, ExternRing),
-            [gossip_to(X) ||
-                X <- [riak_ring:random_node(MyNewRing)|Changed],
-                X =/= node()],
-            riak_eventer:notify(riak_ring_gossiper, changed_ring, 
-                                {set_changed, length(Changed)}),
-            loop(write);
         {get_ring, RemoteNode} ->
-            set_remote_ring(RemoteNode),
+            gossip_to(RemoteNode),
             loop(Write)
     after Interval ->
             riak_eventer:notify(riak_ring_gossiper, interval, interval),
     {ok, MyRing} = riak_ring_manager:get_my_ring(),
     riak_connect:cast(RemoteNode, {gossip_ring, MyRing}).
 
-set_remote_ring(RemoteNode) ->
-    riak_eventer:notify(riak_ring_gossiper, set_remote_ring, RemoteNode),
-    {ok, MyRing} = riak_ring_manager:get_my_ring(),
-    riak_connect:cast(RemoteNode, {set_ring, MyRing}).
-
 get_ring_from(RemoteNode) ->
     riak_eventer:notify(riak_ring_gossiper, get_remote_ring, RemoteNode),
     riak_connect:cast(RemoteNode, {get_ring, node()}).
     BucketKeys = {riak_bucketkeys,
                  {riak_bucketkeys, start_link, []},
                   permanent, 5000, worker, dynamic},
+    LocalLogger = {riak_local_logger,
+                   {riak_local_logger, start_link, []},
+                   permanent, 5000, worker, dynamic},
     RiakWeb = {webmachine_mochiweb,
                  {webmachine_mochiweb, start, [riak_web:config()]},
                   permanent, 5000, worker, dynamic},
     Processes0 = 
     case riak:get_app_env(riak_web_ip) of
         "undefined" ->
-            [RingMgr,RingGossip,Connect,API,EventGuard,BucketKeys];
+            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger,BucketKeys];
         undefined ->
-            [RingMgr,RingGossip,Connect,API,EventGuard,BucketKeys];
+            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger,BucketKeys];
         _ ->
-            [RingMgr,RingGossip,Connect,API,EventGuard,BucketKeys,RiakWeb]
+            [RingMgr,RingGossip,Connect,API,EventGuard,LocalLogger,BucketKeys,
+             RiakWeb]
     end,
     Processes1 = 
     case riak:get_app_env(doorbell_port) of
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.