dizzyd avatar dizzyd committed 9a3b5a9 Merge

Merging riak-0.7

Comments (0)

Files changed (10)

 Jay Doane
 Martin Scholl
 Jayson Baird
+Kirill A. Korinskiy
 
 

apps/riak/src/jiak_util.erl

 bucket_from_uri_test() ->
     PI = dict:store(bucket, "foo", dict:new()),
     RD0 = wrq:create('PUT', "1.1", "/jiak/foo", mochiweb_headers:empty()),
-    RD = wrq:load_dispatch_data(PI, none, none, none, none, RD0),
+    RD = wrq:load_dispatch_data(PI, none, none, none, none, none, RD0),
     ?assertEqual(<<"foo">>, bucket_from_reqdata(RD)).

apps/riak/src/riak_client.erl

     rpc:call(Node, riak_eventer, remove_handler, [Pid, MatchHead, MatchGuard]). 
 
 get_stats(local) ->
-    [{node(), gen_server:call(riak_stat, get_stats)}];
+    [{Node, rpc:call(Node, gen_server, call, [riak_stat, get_stats])}];
 get_stats(global) ->
     {ok, Ring} = rpc:call(Node, riak_ring_manager, get_my_ring, []),
     Nodes = riak_ring:all_members(Ring),

apps/riak/src/riak_fs_backend.erl

 
 %% @spec start(Partition :: integer()) ->
 %%          {ok, state()} | {{error, Reason :: term()}, state()}
-%% @doc Start this backend.  'riak_fs_backend_root' must be
-%%      set in Riak's application environment.  It must be set to
-%%      a string representing the base directory where this backend
-%%      should store its files.
+%% @doc Start this backend.  'riak_fs_backend_root' must be set in
+%%      Riak's application environment.  It must be set to a string
+%%      representing the base directory where this backend should
+%%      store its files.
 start(Partition) ->
     PartitionName = integer_to_list(Partition),
     ConfigRoot = riak:get_app_env(riak_fs_backend_root),
         true -> file:read_file(File)
     end.
 
+%% @spec atomic_write(state(), File :: string(), Val :: binary()) ->
+%%       ok | {error, Reason :: term()}
+%% @doc store a atomic value to disk. Write to temp file and rename to
+%%       normal path.
+atomic_write(_State, File, Val) ->
+    FakeFile = File ++ ".tmpwrite",
+    case file:write_file(FakeFile, Val) of
+        ok ->
+            file:rename(FakeFile, File);
+        X -> X
+    end.
+
 %% @spec put(state(), BKey :: riak_object:bkey(), Val :: binary()) ->
 %%         ok | {error, Reason :: term()}
 %% @doc Store Val under Bkey
 put(State,BKey,Val) ->       
     File = location(State,BKey),
     case filelib:ensure_dir(File) of
-        ok -> file:write_file(File,Val);
+        ok -> atomic_write(State, File, Val);
         X -> X
     end.
 

apps/riak/src/riak_get_fsm.erl

                 timeout :: pos_integer(),
                 tref    :: reference(),
                 bkey :: {riak_object:bucket(), riak_object:key()},
-                ring :: riak_ring:riak_ring()
+                ring :: riak_ring:riak_ring(),
+                startnow :: {pos_integer(), pos_integer(), pos_integer()}
                }).
 
 start(ReqId,Bucket,Key,R,Timeout,From) ->
 %% @private
 initialize(timeout, StateData0=#state{timeout=Timeout, req_id=ReqId,
                                       bkey={Bucket,Key}, ring=Ring}) ->
+    StartNow = now(),
     TRef = erlang:send_after(Timeout, self(), timeout),
     RealStartTime = riak_util:moment(),
     DocIdx = riak_util:chash_key({Bucket, Key}),
                        preflist=Preflist,final_obj=undefined,
                        replied_r=[],replied_fail=[],
                        replied_notfound=[],starttime=riak_util:moment(),
-                       waiting_for=Sent,tref=TRef},
+                       waiting_for=Sent,tref=TRef,startnow=StartNow},
     {next_state,waiting_vnode_r,StateData}.
 
 waiting_vnode_r({r, {ok, RObj}, Idx, ReqId},
     case length(Replied) >= R of
         true ->
             Final = respond(Client,Replied,AllowMult,ReqId),
-            riak_stat:update(node_get),
+            update_stats(StateData),
             case Final of
                 {error, notfound} ->
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
         true ->
             {next_state,waiting_vnode_r,NewStateData};
         false ->
-            riak_stat:update(node_get),
+            update_stats(StateData),
             riak_eventer:notify(riak_get_fsm, get_fsm_reply,
                                 {ReqId, notfound}),
             Client ! {ReqId, {error,notfound}},
             case length(NotFound) of
                 0 ->
                     FullErr = [E || {E,_I} <- Replied],
-                    riak_stat:update(node_get),
+                    update_stats(StateData),
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
                                         {ReqId, {error,FullErr}}),
                     Client ! {ReqId, {error,FullErr}},
                     {stop,normal,NewStateData};
                 _ ->
-                    riak_stat:update(node_get),
+                    update_stats(StateData),
                     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
                                         {ReqId, notfound}),
                     Client ! {ReqId, {error,notfound}},
             end
     end;
 waiting_vnode_r(timeout, StateData=#state{client=Client,req_id=ReqId}) ->
-    riak_stat:update(node_get),
+    update_stats(StateData),
     riak_eventer:notify(riak_get_fsm, get_fsm_reply,
                         {ReqId, timeout}),
     Client ! {ReqId, {error,timeout}},
      (vclock:descends(riak_object:vclock(O2),riak_object:vclock(O1)) == false)]
 		|| {O1,_} <- AnnoObjects],
     lists:flatten(ToRemove).
+
+update_stats(#state{startnow=StartNow}) ->
+    EndNow = now(),
+    riak_stat:update({get_fsm_time, timer:now_diff(EndNow, StartNow)}).

apps/riak/src/riak_put_fsm.erl

                 replied_fail :: list(),
                 timeout :: pos_integer(), 
                 tref    :: reference(),
-                ring :: riak_ring:riak_ring()
+                ring :: riak_ring:riak_ring(),
+                startnow :: {pos_integer(), pos_integer(), pos_integer()}
                }).
 
 start(ReqId,RObj,W,DW,Timeout,From) ->
 %% @private
 initialize(timeout, StateData0=#state{robj=RObj0, req_id=ReqId,
                                       timeout=Timeout, ring=Ring}) ->
+    StartNow = now(),
     TRef = erlang:send_after(Timeout, self(), timeout),
     RObj = update_metadata(RObj0),
     RealStartTime = riak_util:moment(),
                   robj=RObj, n=N, preflist=Preflist, bkey={Bucket,Key},
                   waiting_for=Sent, starttime=riak_util:moment(),
                   replied_w=[], replied_dw=[], replied_fail=[],
-                  tref=TRef},
+                  tref=TRef,startnow=StartNow},
     {next_state,waiting_vnode_w,StateData}.
 
 waiting_vnode_w({w, Idx, ReqId},
             case DW of
                 0 ->
                     Client ! {ReqId, ok},
-                    riak_stat:update(node_put),
+                    update_stats(StateData),
                     riak_eventer:notify(riak_put_fsm, put_fsm_reply_ok,
                                         {ReqId, ok, {Bucket, Key}}),
                     {stop,normal,StateData};
         true ->
             {next_state,waiting_vnode_w,NewStateData};
         false ->
-            riak_stat:update(node_put),
+            update_stats(StateData),
             riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                                 {ReqId, {error,too_many_fails,Replied}}),
             Client ! {ReqId, {error,too_many_fails}},
             {stop,normal,NewStateData}
     end;
 waiting_vnode_w(timeout, StateData=#state{client=Client,req_id=ReqId}) ->
-    riak_stat:update(node_put),
+    update_stats(StateData),
     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                         {ReqId, {error,timeout}}),
     Client ! {ReqId, {error,timeout}},
     Replied = [Idx|Replied0],
     case length(Replied) >= DW of
         true ->
-            riak_stat:update(node_put),
+            update_stats(StateData),
             riak_eventer:notify(riak_put_fsm, put_fsm_reply_ok,
                                 {ReqId, ok, {Bucket, Key}}),
             Client ! {ReqId, ok},
             {stop,normal,NewStateData}
     end;
 waiting_vnode_dw(timeout, StateData=#state{client=Client,req_id=ReqId}) ->
-    riak_stat:update(node_put),
+    update_stats(StateData),
     riak_eventer:notify(riak_put_fsm, put_fsm_reply,
                         {ReqId, {error,timeout}}),
     Client ! {ReqId, {error,timeout}},
     Obj = riak_object:new(<<"b">>,<<"k">>,<<"v1">>),
     ?assertNot(make_vtag(Obj) =:= 
                make_vtag(riak_object:increment_vclock(Obj,<<"client_id">>))).
+
+update_stats(#state{startnow=StartNow}) ->
+    EndNow = now(),
+    riak_stat:update({put_fsm_time, timer:now_diff(EndNow, StartNow)}).

apps/riak/src/riak_stat.erl

-%%%-------------------------------------------------------------------
-%%% File    : riak_stat.erl
-%%% Author  : Bryan Fink <bryan@basho.com>
-%%% Description : Stats aggregator for Riak.
-%%%
-%%% Created : 10 Nov 2009 by Bryan Fink <bryan@basho.com>
-%%%-------------------------------------------------------------------
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+%% @doc riak_stat is a long-lived gen_server process for aggregating
+%%      stats about the Riak node on which it is runing.
+%%
+%%      Update each stat with the exported function update/1.  Modify
+%%      the internal function update/3 to add storage for new stats.
+%%
+%%      Get the latest aggregation of stats with the exported function
+%%      get_stats/0.  Modify the internal function produce_stats/1 to
+%%      change how stats are represented.
+%%
+%%      Riak will start riak_stat for you, if you have specified
+%%      {riak_stat, true} in your config .erlenv file.
+%%
+%%      Current stats:
+%%<dl><dt>  vnode_gets
+%%</dt><dd> Total number of gets handled by all vnodes on this node
+%%          in the last minute.
+%%</dd><dd> update(vnode_get)
+%%
+%%</dd><dt> vnode_puts
+%%</dt><dd> Total number of puts handled by all vnodes on this node
+%%          in the last minute.
+%%</dd><dd> update(vnode_put)
+%%
+%%</dd><dt> node_gets
+%%</dt><dd> Number of gets coordinated by this node in the last
+%%          minute.
+%%</dd><dd> update({get_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_get_fsm_time_mean
+%%</dt><dd> Mean time, in microseconds, between when a riak_get_fsm is
+%%          started and when it sends a reply to the client, for the
+%%          last minute.
+%%</dd><dd> update({get_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_get_fsm_time_median
+%%</dt><dd> Median time, in microseconds, between when a riak_get_fsm
+%%          is started and when it sends a reply to the client, for
+%%          the last minute.
+%%</dd><dd> update({get_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_get_fsm_time_95
+%%</dt><dd> Response time, in microseconds, met or beaten by 95% of
+%%          riak_get_fsm executions.
+%%</dd><dd> update({get_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_get_fsm_time_99
+%%</dt><dd> Response time, in microseconds, met or beaten by 99% of
+%%          riak_get_fsm executions.
+%%</dd><dd> update({get_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_get_fsm_time_100
+%%</dt><dd> Response time, in microseconds, met or beaten by 100% of
+%%          riak_get_fsm executions.
+%%</dd><dd> update({get_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_puts
+%%</dt><dd> Number of puts coordinated by this node in the last
+%%          minute.
+%%</dd><dd> update({put_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_put_fsm_time_mean
+%%</dt><dd> Mean time, in microseconds, between when a riak_put_fsm is
+%%          started and when it sends a reply to the client, for the
+%%          last minute.
+%%</dd><dd> update({put_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_put_fsm_time_median
+%%</dt><dd> Median time, in microseconds, between when a riak_put_fsm
+%%          is started and when it sends a reply to the client, for
+%%          the last minute.
+%%</dd><dd> update({put_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_put_fsm_time_95
+%%</dt><dd> Response time, in microseconds, met or beaten by 95% of
+%%          riak_put_fsm executions.
+%%</dd><dd> update({put_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_put_fsm_time_99
+%%</dt><dd> Response time, in microseconds, met or beaten by 99% of
+%%          riak_put_fsm executions.
+%%</dd><dd> update({put_fsm_time, Microseconds})
+%%
+%%</dd><dt> node_put_fsm_time_100
+%%</dt><dd> Response time, in microseconds, met or beaten by 100% of
+%%          riak_put_fsm executions.
+%%</dd><dd> update({put_fsm_time, Microseconds})
+%%
+%%</dd><dt> cpu_nprocs
+%%</dt><dd> Value returned by {@link cpu_sup:nprocs/0}.
+%%
+%%</dd><dt> cpu_avg1
+%%</dt><dd> Value returned by {@link cpu_sup:avg1/0}.
+%%
+%%</dd><dt> cpu_avg5
+%%</dt><dd> Value returned by {@link cpu_sup:avg5/0}.
+%%
+%%</dd><dt> cpu_avg15
+%%</dt><dd> Value returned by {@link cpu_sup:avg15/0}.
+%%
+%%</dd><dt> mem_total
+%%</dt><dd> The first element of the tuple returned by
+%%          {@link memsup:get_memory_data/0}.
+%%
+%%</dd><dt> mem_allocated
+%%</dt><dd> The second element of the tuple returned by
+%%          {@link memsup:get_memory_data/0}.
+%%
+%%</dd><dt> disk
+%%</dt><dd> Value returned by {@link disksup:get_disk_data/0}.
+%%</dd></dl>
 -module(riak_stat).
 
 -behaviour(gen_server2).
          terminate/2, code_change/3]).
 
 -record(state,{vnode_gets,vnode_puts,
-               node_gets,node_puts}).
+               get_fsm_time,put_fsm_time}).
 
-%%====================================================================
-%% API
-%%====================================================================
-%%--------------------------------------------------------------------
-%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
-%% Description: Starts the server
-%%--------------------------------------------------------------------
+%% @spec start_link() -> {ok,Pid} | ignore | {error,Error}
+%% @doc Start the server.  Also start the os_mon application, if it's
+%%      not already running.
 start_link() ->
     case application:start(os_mon) of
         ok -> ok;
     end,
     gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
 
+%% @spec get_stats() -> proplist()
+%% @doc Get the current aggregation of stats.
 get_stats() ->
     gen_server2:call(?MODULE, get_stats).
 
+%% @spec update(term()) -> ok
+%% @doc Update the given stat.
 update(Stat) ->
     gen_server2:cast(?MODULE, {update, Stat, riak_util:moment()}).
 
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% Function: init(Args) -> {ok, State} |
-%%                         {ok, State, Timeout} |
-%%                         ignore               |
-%%                         {stop, Reason}
-%% Description: Initiates the server
-%%--------------------------------------------------------------------
+%% @private
 init([]) ->
     {ok, #state{vnode_gets=spiraltime:fresh(),
                 vnode_puts=spiraltime:fresh(),
-                node_gets=spiraltime:fresh(),
-                node_puts=spiraltime:fresh()}}.
+                get_fsm_time=slide:fresh(),
+                put_fsm_time=slide:fresh()}}.
 
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%%                                      {reply, Reply, State, Timeout} |
-%%                                      {noreply, State} |
-%%                                      {noreply, State, Timeout} |
-%%                                      {stop, Reason, Reply, State} |
-%%                                      {stop, Reason, State}
-%% Description: Handling call messages
-%%--------------------------------------------------------------------
+%% @private
 handle_call(get_stats, _From, State) ->
     {reply, produce_stats(State), State};
 handle_call(_Request, _From, State) ->
     Reply = ok,
     {reply, Reply, State}.
 
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%%                                      {noreply, State, Timeout} |
-%%                                      {stop, Reason, State}
-%% Description: Handling cast messages
-%%--------------------------------------------------------------------
+%% @private
 handle_cast({update, Stat, Moment}, State) ->
     {noreply, update(Stat, Moment, State)};
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%%                                       {noreply, State, Timeout} |
-%%                                       {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%%--------------------------------------------------------------------
+%% @private
 handle_info(_Info, State) ->
     {noreply, State}.
 
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%%--------------------------------------------------------------------
+%% @private
 terminate(_Reason, _State) ->
     ok.
 
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%%--------------------------------------------------------------------
+%% @private
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 %%% Internal functions
 %%--------------------------------------------------------------------
 
+%% @spec update(Stat::term(), integer(), state()) -> state()
+%% @doc Update the given stat in State, returning a new State.
 update(vnode_get, Moment, State) ->
     spiral_incr(#state.vnode_gets, Moment, State);
 update(vnode_put, Moment, State) ->
     spiral_incr(#state.vnode_puts, Moment, State);
-update(node_get, Moment, State) ->
-    spiral_incr(#state.node_gets, Moment, State);
-update(node_put, Moment, State) ->
-    spiral_incr(#state.node_puts, Moment, State);
+update({get_fsm_time, Microsecs}, Moment, State) ->
+    slide_incr(#state.get_fsm_time, Microsecs, Moment, State);
+update({put_fsm_time, Microsecs}, Moment, State) ->
+    slide_incr(#state.put_fsm_time, Microsecs, Moment, State);
 update(_, _, State) ->
     State.
 
+%% @spec spiral_incr(integer(), integer(), state()) -> state()
+%% @doc Increment the value of a spiraltime structure at a given
+%%      position of the State tuple.
 spiral_incr(Elt, Moment, State) ->
     setelement(Elt, State,
                spiraltime:incr(1, Moment, element(Elt, State))).
 
+%% @spec slide_incr(integer(), term(), integer(), state()) -> state()
+%% @doc Update a slide structure at a given position in the
+%%      STate tuple.
+slide_incr(Elt, Reading, Moment, State) ->
+    setelement(Elt, State,
+               slide:update(element(Elt, State), Reading, Moment)).
+
+%% @spec produce_stats(state()) -> proplist()
+%% @doc Produce a proplist-formatted view of the current aggregation
+%%      of stats.
 produce_stats(State) ->
     Moment = spiraltime:n(),
     lists:append(
        mem_stats(),
        disk_stats()]).
 
+%% @spec spiral_minute(integer(), integer(), state()) -> integer()
+%% @doc Get the count of events in the last minute from the spiraltime
+%%      structure at the given element of the state tuple.
 spiral_minute(Moment, Elt, State) ->
     Up = spiraltime:incr(0, Moment, element(Elt, State)),
     {_,Count} = spiraltime:rep_minute(Up),
     Count.
 
+%% @spec slide_minute(integer(), integer(), state()) ->
+%%         {Count::integer(), Mean::ustat(),
+%%          {Median::ustat(), NinetyFive::ustat(),
+%%           NinetyNine::ustat(), Max::ustat()}}
+%% @type ustat() = undefined | number()
+%% @doc Get the Count of readings, the Mean of those readings, and the
+%%      Median, 95th percentile, 99th percentile, and Maximum readings
+%%      for the last minute from the slide structure at the given
+%%      element of the state tuple.
+%%      If Count is 0, then all other elements will be the atom
+%%      'undefined'.
+slide_minute(Moment, Elt, State) ->
+    {Count, Mean} = slide:mean(element(Elt, State), Moment),
+    {_, Nines} = slide:nines(element(Elt, State), Moment),
+    {Count, Mean, Nines}.
+
+%% @spec vnode_stats(integer(), state()) -> proplist()
+%% @doc Get the vnode-sum stats proplist.
 vnode_stats(Moment, State) ->
     [{F, spiral_minute(Moment, Elt, State)}
      || {F, Elt} <- [{vnode_gets, #state.vnode_gets},
                      {vnode_puts, #state.vnode_puts}]].
 
+%% @spec node_stats(integer(), state()) -> proplist()
+%% @doc Get the node stats proplist.
 node_stats(Moment, State) ->
-    [{F, spiral_minute(Moment, Elt, State)}
-     || {F, Elt} <- [{node_gets, #state.node_gets},
-                     {node_puts, #state.node_puts}]].
+    {Gets, GetMean, {GetMedian, GetNF, GetNN, GetH}} =
+        slide_minute(Moment, #state.get_fsm_time, State),
+    {Puts, PutMean, {PutMedian, PutNF, PutNN, PutH}} =
+        slide_minute(Moment, #state.put_fsm_time, State),
+    [{node_gets, Gets},
+     {node_get_fsm_time_mean, GetMean},
+     {node_get_fsm_time_median, GetMedian},
+     {node_get_fsm_time_95, GetNF},
+     {node_get_fsm_time_99, GetNN},
+     {node_get_fsm_time_100, GetH},
+     {node_puts, Puts},
+     {node_put_fsm_time_mean, PutMean},
+     {node_put_fsm_time_median, PutMedian},
+     {node_put_fsm_time_95, PutNF},
+     {node_put_fsm_time_99, PutNN},
+     {node_put_fsm_time_100, PutH}].
 
+%% @spec cpu_stats() -> proplist()
+%% @doc Get stats on the cpu, as given by the cpu_sup module
+%%      of the os_mon application.
 cpu_stats() ->
     [{cpu_nprocs, cpu_sup:nprocs()},
      {cpu_avg1, cpu_sup:avg1()},
      {cpu_avg5, cpu_sup:avg5()},
      {cpu_avg15, cpu_sup:avg15()}].
 
+%% @spec mem_stats() -> proplist()
+%% @doc Get stats on the memory, as given by the memsup module
+%%      of the os_mon application.
 mem_stats() ->
     {Total, Alloc, _} = memsup:get_memory_data(),
     [{mem_total, Total},
      {mem_allocated, Alloc}].
 
+%% @spec disk_stats() -> proplist()
+%% @doc Get stats on the disk, as given by the disksup module
+%%      of the os_mon application.
 disk_stats() ->
     [{disk, disksup:get_disk_data()}].

apps/riak/src/riak_util.erl

      rpc:call(Node, code, load_file, [Module])} || 
         Node <- riak_ring:all_members(Ring)].
 
-%% @spec try_cast(term(), term(), [{Index :: term(), Node :: node()}]) ->
+%% @spec try_cast(term(), term(), [node()], [{Index :: term(), Node :: node()}]) ->
 %%          {[{Index :: term(), Node :: node(), Node :: node()}],
 %%           [{Index :: term(), Node :: node()}]}
 %% @doc Cast {Cmd, {Index,Node}, Msg} at riak_vnode_master on Node
-%%      if Node responds 'pong' to a net_adm:ping.  The list of
-%%      successful casts is the first element of the return tuple, and
-%%      the list of pang-responding nodes is the second element.
-%%      Used in riak_put_fsm and riak_get_fsm.
+%%      if Node is in UpNodes.  The list of successful casts is the
+%%      first element of the return tuple, and the list of unavailable
+%%      nodes is the second element.  Used in riak_put_fsm and riak_get_fsm.
 try_cast(Cmd, Msg, UpNodes, Targets) ->
     try_cast(Cmd, Msg, UpNodes, Targets, [], []).
 try_cast(_Cmd, _Msg, _UpNodes, [], Sent, Pangs) -> {Sent, Pangs};
 try_cast(Cmd, Msg, UpNodes, [{Index,Node}|Targets], Sent, Pangs) ->
-    case lists:member(Node, [node()|UpNodes])
-          orelse net_adm:ping(Node) == pong of
+    case lists:member(Node, [node()|UpNodes]) of
         false ->
             try_cast(Cmd, Msg, UpNodes, Targets, Sent, [{Index,Node}|Pangs]);
         true ->
 fallback(_Cmd, _Msg, [], _Fallbacks, Sent) -> Sent;
 fallback(_Cmd, _Msg, _Pangs, [], Sent) -> Sent;
 fallback(Cmd, Msg, [{Index,Node}|Pangs], [{_,FN}|Fallbacks], Sent) ->
-    case lists:member(FN, [node()|nodes()]) orelse net_adm:ping(FN) == pong of
+    case lists:member(FN, [node()|nodes()]) of
         false -> fallback(Cmd, Msg, [{Index,Node}|Pangs], Fallbacks, Sent);
         true ->
             gen_server:cast({riak_vnode_master, FN},

apps/riak/src/slide.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+%% @doc Keep track of thing in a sliding time window.  The idea here
+%%      is that you have some reading to take several times.
+%%      Occasionally, you want to compute some aggregation of those
+%%      readings for the last N seconds.
+%%
+%%      For example, you might read the weight of cars passing a point
+%%      in the road.  You want to compute some statistics every hour.
+%%      You could:
+%%
+%%      %% create a new slide, with an hour window
+%%      T0 = slide:fresh(60*60)
+%%
+%%      %% update it every time a car passes
+%%      T1 = slide:update(T0, Weight, slide:moment())
+%%
+%%      %% eventually ask for stats
+%%      {NumberOfCars, TotalWeight} = slide:sum(TN, slide:moment())
+%%      {NumberOfCars, AverageWeight} = slide:mean(TN, slide:moment())
+%%      {NumberOfCars, {MedianWeight,
+%%                      NinetyFivePercentWeight,
+%%                      NinetyNinePercentWeight,
+%%                      HeaviestWeight} = slide:nines(TN, slide:moment())
+%%
+%%      The slide module attempts to be tunably efficient by exposing
+%%      the ability to determine how often to "prune" readings.  By default,
+%%      readings are pruned whenever a reading is made with a timestamp
+%%      that is two window-lengths newer than the oldest timestamp in the
+%%      slide.  At that point, all readings older than one window-length away
+%%      from the latest reading will be removed.  Use a different value
+%%      for the Trigger parameter of fresh/2 to change when pruning is
+%%      triggered.
+-module(slide).
+
+-export([fresh/0, fresh/1, fresh/2]).
+-export([update/2, update/3, moment/0]).
+-export([sum/1, sum/2, sum/3]).
+-export([mean/1, mean/2, mean/3]).
+-export([nines/1, nines/2, nines/3]).
+-include_lib("eunit/include/eunit.hrl").
+
+-record(slide, {
+          oldest,   %% oldest timestamp here
+          window,   %% window to which to trim
+          trigger,  %% age at which to trigger pruning
+          readings  %% {timestamp, reading}
+         }).
+
+%% @spec fresh() -> slide()
+%% @equiv fresh(60)
+fresh() -> fresh(60).
+
+%% @spec fresh(integer()) -> slide()
+%% @equiv fresh(Window, Window*2)
+fresh(Window) -> fresh(Window, Window*2).
+
+%% @spec fresh(integer(), integer()) -> slide()
+%% @doc Create an empty slide for tracking Window-seconds worth of
+%%      readings, and pruning those readings after Trigger seconds.
+fresh(Window, Trigger) when Trigger >= Window ->
+    #slide{window=Window, trigger=Trigger, readings=[]}.
+
+%% @spec moment() -> integer()
+%% @doc Get the current time in seconds.
+moment() ->
+    calendar:datetime_to_gregorian_seconds(calendar:universal_time()).
+
+%% @spec update(slide(), term()) -> slide()
+%% @equiv update(S, Reading, moment())
+update(S, Reading) -> update(S, Reading, moment()).
+
+%% @spec update(slide(), term(), integer()) -> slide()
+%% @doc Store a new reading.  The current list of readings will be
+%%      pruned if Moment is as new as or newer than the most recent
+%%      reading stored, and more than Trigger seconds newer than the
+%%      oldest reading stored.
+update(S=#slide{oldest=O, trigger=P, readings=R=[{Y,_}|_]},
+       Reading, Moment) ->
+    if Moment >= Y ->
+            %% Reading is newest
+            Pruned = maybe_prune(S, Moment),
+            Pruned#slide{oldest=case Pruned#slide.oldest of
+                                     undefined -> Moment;
+                                     PrunedOldest -> PrunedOldest
+                                 end,
+                          readings=[{Moment, Reading}|Pruned#slide.readings]};
+       Moment > Y-P ->
+            %% Reading is after our trigger time
+            %% assume normal use case adds a 'newest' regularly,
+            %% and don't bother pruning here
+            {Younger, Older} =
+                lists:splitwith(fun({T,_}) -> T > Moment end, R),
+            S#slide{
+              oldest=if Moment < O -> Moment;
+                        true -> O
+                     end,
+              readings=Younger++[{Moment, Reading}]++Older};
+       true ->
+            %% Reading is before the trigger time
+            S
+    end;
+update(S=#slide{readings=[]}, Reading, Moment) ->
+    S#slide{oldest=Moment, readings=[{Moment, Reading}]}.
+
+%% @spec maybe_prune(slide(), moment()) -> slide()
+%% @doc Prune if the trigger has been ... er, triggered.
+maybe_prune(S=#slide{oldest=O, trigger=P, window=W}, Moment) ->
+    if Moment-P > O -> prune(S, Moment-W);
+       true -> S
+    end.
+
+%% @spec prune(slide(), integer()) -> slide()
+%% @doc Remove all readings taken before MaxAge.
+prune(S=#slide{readings=R}, MaxAge) ->
+    Prune = fun(Reading={T, _}, Acc) ->
+                    if T > MaxAge -> [Reading|Acc];
+                       true       -> Acc
+                    end
+            end,
+    case lists:foldl(Prune, [], R) of
+        [] ->
+            S#slide{oldest=undefined, readings=[]};
+        RevFiltered=[{NewOldest,_}|_] ->
+            S#slide{oldest=NewOldest,
+                     readings=lists:reverse(RevFiltered)}
+    end.
+
+%% @spec sum(slide()) -> {Count::integer(), Sum::integer()}
+%% @doc Sum of readings from now through Window seconds ago.  Return is
+%%      number of readings in the range and the sum of those readings.
+sum(Slide) -> sum(Slide, moment()).
+
+%% @spec sum(slide(), integer()) -> {Count::integer(), Sum::integer()}
+%% @doc Sum of readings from Moment through Window seconds before Moment.
+%%      Return is number of readings in the range and the sum of those
+%%      readings.
+sum(Slide, Moment) -> sum(Slide, Moment, Slide#slide.window).
+
+%% @spec sum(slide(), integer(), integer()) ->
+%%          {Count::integer(), Sum::integer()}
+%% @doc Sum of readings from Moment through Seconds seconds before
+%%      Moment.  Return is number of readings in the range and the sum
+%%      of those readings.
+sum(#slide{readings=R}, Moment, Seconds) ->
+    Cutoff = Moment-Seconds,
+    Sum = fun({T, Reading}, {Count, Sum}) when T =< Moment, T > Cutoff ->
+                  {Count+1, Sum+Reading};
+             (_, Acc) -> Acc
+          end,
+    lists:foldl(Sum, {0, 0}, R).
+
+%% @spec mean(slide()) -> {Count::integer(), Mean::number()}
+%% @doc Mean of readings from now through Window seconds ago.  Return is
+%%      number of readings in the range and the mean of those readings.
+mean(Slide) -> mean(Slide, moment()).
+
+%% @spec mean(slide(), integer()) -> {Count::integer(), Mean::number()}
+%% @doc Mean of readings from Moment through Window seconds before Moment.
+%%      Return is number of readings in the range and the mean of those
+%%      readings.
+mean(Slide, Moment) -> mean(Slide, Moment, Slide#slide.window).
+
+%% @spec mean(slide(), integer(), integer()) ->
+%%          {Count::integer(), Mean::number()}
+%% @doc Mean of readings from Moment through Seconds seconds before
+%%      Moment.  Return is number of readings in the range and the mean
+%%      of those readings.
+mean(S, Moment, Seconds) ->
+    case sum(S, Moment, Seconds) of
+        {0, _}       -> {0, undefined};
+        {Count, Sum} -> {Count, Sum/Count}
+    end.
+
+%% @spec nines(slide()) ->
+%%         {Count::integer(), {Median::number(), NinetyFive::number(),
+%%                             NinetyNine::number(), Hundred::number()}}
+%% @doc Median, 95%, 99%, and 100% readings from now through Window
+%%  seconds ago.  Return is number of readings in the range and the
+%%  nines of those readings.
+nines(Slide) -> nines(Slide, moment()).
+
+%% @spec nines(slide(), integer()) ->
+%%         {Count::integer(), {Median::number(), NinetyFive::number(),
+%%                             NinetyNine::number(), Hundred::number()}}
+%% @doc Median, 95%, 99%, and 100% readings from Moment through Window
+%%      seconds before Moment.  Return is number of readings in the
+%%      range and the nines of those readings.
+nines(Slide, Moment) -> nines(Slide, Moment, Slide#slide.window).
+
+%% @spec nines(slide(), integer(), integer()) ->
+%%         {Count::integer(), {Median::number(), NinetyFive::number(),
+%%                             NinetyNine::number(), Hundred::number()}}
+%% @doc Median, 95%, 99%, and 100% readings from Moment through
+%%      Seconds seconds before Moment.  Return is number of readings
+%%      in the range and the nines of those readings.
+nines(#slide{readings=R}, Moment, Seconds) ->
+    Cutoff = Moment-Seconds,
+    case lists:sort([ Reading || {T, Reading} <- R,
+                                 T < Moment, T > Cutoff]) of
+        [] -> {0, {undefined, undefined, undefined, undefined}};
+        Window ->
+            Count = length(Window),
+            {Count,
+             {lists:nth(mochinum:int_ceil(Count*0.5), Window),
+              lists:nth(mochinum:int_ceil(Count*0.95), Window),
+              lists:nth(mochinum:int_ceil(Count*0.99), Window),
+              lists:last(Window)}}
+    end.
+
+%%
+%% Test
+%%
+
+direct_prune_test() ->
+    S0 = slide:fresh(10),
+    S1 = slide:update(S0, 5, 3),
+    ?assertEqual(S1, prune(S1, 2)),
+    ?assertEqual(S0, prune(S1, 4)).
+
+maybe_prune_test() ->
+    S0 = slide:fresh(10, 20),
+    S1 = slide:update(S0, 3, 1),
+    S2 = slide:update(S1, 5, 15),
+    ?assertEqual(S2, maybe_prune(S2, 12)),
+    ?assertEqual({1,5}, slide:sum(maybe_prune(S2, 22), 22)).
+
+auto_prune_test() ->
+    S0 = slide:fresh(10),
+    S1 = slide:update(S0, 5, 3),
+    S2 = slide:update(S1, 6, 14),
+    ?assertEqual({1, 5}, slide:sum(S1, 4, 10)),
+    ?assertEqual({1, 6}, slide:sum(S2, 15, 10)).
+
+sum_test() ->
+    S0 = slide:fresh(10),
+    ?assertEqual({0, 0}, % no points, sum = 0
+                 slide:sum(S0, 9, 10)),
+    S1 = slide:update(S0, 3, 1),
+    ?assertEqual({1, 3}, % one point, sum = 3
+                 slide:sum(S1, 9, 10)),
+    S2 = slide:update(S1, 5, 5),
+    ?assertEqual({2, 8}, % two points, sum = 8
+                 slide:sum(S2, 9, 10)),
+    S3 = slide:update(S2, 7, 5),
+    ?assertEqual({3, 15}, % three points (two concurrent), sum = 15
+                 slide:sum(S3, 9, 10)),
+    S4 = slide:update(S3, 11, 14),
+    ?assertEqual({3, 23}, % ignoring first reading, sum = 23
+                 slide:sum(S4, 14, 10)),
+    ?assertEqual({1, 11}, % shifted window
+                 slide:sum(S4, 18, 10)),
+    S5 = slide:update(S4, 13, 22),
+    ?assertEqual({1, 11}, % pruned early readings
+                 slide:sum(S5, 14, 10)),
+    ?assertEqual({2, 24}, % shifted window
+                 slide:sum(S5, 22, 10)).
+
+mean_test() ->
+    S0 = slide:fresh(10),
+    ?assertEqual({0, undefined}, % no points, no average
+                 slide:mean(S0)),
+    S1 = slide:update(S0, 3, 1),
+    ?assertEqual({1, 3.0}, % one point, avg = 3
+                 slide:mean(S1, 9, 10)),
+    S2 = slide:update(S1, 5, 5),
+    ?assertEqual({2, 4.0}, % two points, avg = 4
+                  slide:mean(S2, 9, 10)),
+    S3 = slide:update(S2, 7, 5),
+    ?assertEqual({3, 5.0}, % three points (two concurrent), avg = 5
+                  slide:mean(S3, 9, 10)),
+    S4 = slide:update(S3, 11, 14),
+    ?assertEqual({3, 23/3}, % ignoring first reading, avg = 
+                 slide:mean(S4, 14, 10)),
+    ?assertEqual({1, 11.0}, % shifted window
+                 slide:mean(S4, 18, 10)),
+    S5 = slide:update(S4, 13, 22),
+    ?assertEqual({1, 11.0}, % pruned early readings
+                 slide:mean(S5, 14, 10)),
+    ?assertEqual({2, 12.0}, % shifted window
+                 slide:mean(S5, 22, 10)).
+    
+nines_test() ->
+    PushReadings = fun(S, Readings) ->
+                           lists:foldl(
+                             fun({R,T}, A) ->
+                                     slide:update(A, R, T)
+                             end,
+                             S, Readings)
+                   end,
+    S0 = slide:fresh(10),
+    ?assertEqual({0, {undefined, undefined, undefined, undefined}},
+                 slide:nines(S0)),
+    S1 = PushReadings(S0, [ {R, 1} || R <- lists:seq(1, 10) ]),
+    ?assertEqual({10, {5, 10, 10, 10}}, slide:nines(S1, 9, 10)),
+    S2 = PushReadings(S1, [ {R, 2} || R <- lists:seq(11, 20) ]),
+    ?assertEqual({20, {10, 19, 20, 20}}, slide:nines(S2, 9, 10)),
+    S3 = PushReadings(S2, [ {R, 3} || R <- lists:seq(21, 100) ]),
+    ?assertEqual({100, {50, 95, 99, 100}}, slide:nines(S3, 9, 10)),
+    ?assertEqual({90, {55, 96, 100, 100}}, slide:nines(S3, 11, 10)).
+
+not_most_recent_test() ->
+    S0 = slide:fresh(10, 20),
+    S1 = slide:update(S0, 3, 13),
+    S2 = slide:update(S1, 5, 1),
+    S3 = slide:update(S2, 7, 22),
+    ?assertEqual({2, 8}, slide:sum(S2, 13, 13)),
+    ?assertEqual({1, 3}, slide:sum(S3, 13, 13)),
+    ?assertEqual({2, 10}, slide:sum(S3, 22, 22)).
+
+already_pruned_test() ->
+    S0 = slide:fresh(10, 20),
+    S1 = slide:update(S0, 3, 30),
+    S2 = slide:update(S1, 5, 1),
+    ?assertEqual(slide:sum(S1, 30, 30),
+                 slide:sum(S2, 30, 30)).

apps/webmachine/src/webmachine_resource.erl

     end.
 
 do(Fun, ReqProps) when is_atom(Fun) andalso is_list(ReqProps) ->
-    Self = proplists:get_value(resource, ReqProps),
     RState0 = proplists:get_value(reqstate, ReqProps),
     put(tmp_reqstate, empty),
     {Reply, ReqData, NewModState} = handle_wm_call(Fun, 
                     (RState0#reqstate.reqdata)#wm_reqdata{wm_state=RState0}),
-    case Reply of
-        {error, Err} -> {Err, Self};
-        _ ->
-            ReqState = case get(tmp_reqstate) of
-                empty -> RState0;
-                X -> X
-            end,
-            {Reply,
-            webmachine_resource:new(R_Mod, NewModState, R_ModExports, R_Trace),
-            ReqState#reqstate{reqdata=ReqData}}
-    end.
+    ReqState = case get(tmp_reqstate) of
+                   empty -> RState0;
+                   X -> X
+               end,
+    {Reply,
+     webmachine_resource:new(R_Mod, NewModState, R_ModExports, R_Trace),
+     ReqState#reqstate{reqdata=ReqData}}.
 
 handle_wm_call(Fun, ReqData) ->
     case default(Fun) of
       erlang:fun_info(Fun, type)]};
 escape_trace_data(Pid) when is_pid(Pid) ->
     {'WMTRACE_ESCAPED_PID', pid_to_list(Pid)};
+escape_trace_data(Port) when is_port(Port) ->
+    {'WMTRACE_ESCAPED_PORT', erlang:port_to_list(Port)};
 escape_trace_data(List) when is_list(List) ->
     escape_trace_list(List, []);
 escape_trace_data(Tuple) when is_tuple(Tuple) ->
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.