Commits

Anonymous committed 99ebf3e

the start of riak_stat - aggregate the total number of puts all vnodes on a node do

Comments (0)

Files changed (5)

src/riak_client.erl

 -export([send_event/2]).
 -export ([add_event_handler/2, add_event_handler/3, add_event_handler/4]).
 -export ([remove_event_handler/3]).
+-export([get_stats/1]).
 %% @type default_timeout() = 15000
 -define(DEFAULT_TIMEOUT, 15000).
 
 remove_event_handler(Pid, MatchHead, MatchGuard) ->
     rpc:call(Node, riak_eventer, remove_handler, [Pid, MatchHead, MatchGuard]). 
 
+get_stats(local) ->
+    [{node(), gen_server:call(riak_stat, get_stats)}];
+get_stats(global) ->
+    {ok, Ring} = rpc:call(Node, riak_ring_manager, get_my_ring, []),
+    Nodes = riak_ring:all_members(Ring),
+    [{N, rpc:call(N, gen_server, call, [riak_stat, get_stats])}
+     || N <- Nodes].
+
 %% @private
 mk_reqid() -> erlang:phash2(erlang:now()). % only has to be unique per-pid
 

src/riak_stat.erl

+%%%-------------------------------------------------------------------
+%%% File    : riak_stat.erl
+%%% Author  : Bryan Fink <bryan@basho.com>
+%%% Description : Stats aggregator for Riak.
+%%%
+%%% Created : 10 Nov 2009 by Bryan Fink <bryan@basho.com>
+%%%-------------------------------------------------------------------
+-module(riak_stat).
+
+-behaviour(gen_server2).
+
+%% API
+-export([start_link/0, get_stats/0, update/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state,{vnode_gets,vnode_puts}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link() ->
+    case application:start(os_mon) of
+        ok -> ok;
+        {error, {already_started, os_mon}} -> ok
+    %% die if os_mon doesn't start
+    end,
+    gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_stats() ->
+    gen_server2:call(?MODULE, get_stats).
+
+update(Stat) ->
+    gen_server2:cast(?MODULE, {update, Stat, riak_util:moment()}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([]) ->
+    {ok, #state{vnode_gets=spiraltime:fresh(),
+                vnode_puts=spiraltime:fresh()}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call(get_stats, _From, State) ->
+    {reply, produce_stats(State), State};
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast({update, Stat, Moment}, State) ->
+    {noreply, update(Stat, Moment, State)};
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+update(vnode_get, Moment, State) ->
+    spiral_incr(#state.vnode_gets, Moment, State);
+update(vnode_put, Moment, State) ->
+    spiral_incr(#state.vnode_puts, Moment, State);
+update(_, _, State) ->
+    State.
+
+spiral_incr(Elt, Moment, State) ->
+    setelement(Elt, State,
+               spiraltime:incr(1, Moment, element(Elt, State))).
+
+produce_stats(State) ->
+    Moment = spiraltime:n(),
+    lists:append(
+      [vnode_stats(Moment, State),
+       cpu_stats(),
+       mem_stats(),
+       disk_stats()]).
+
+spiral_minute(Moment, Elt, State) ->
+    Up = spiraltime:incr(0, Moment, element(Elt, State)),
+    {_,Count} = spiraltime:rep_minute(Up),
+    Count.
+
+vnode_stats(Moment, State) ->
+    [{F, spiral_minute(Moment, Elt, State)}
+     || {F, Elt} <- [{vnode_gets, #state.vnode_gets},
+                     {vnode_puts, #state.vnode_puts}]].
+
+cpu_stats() ->
+    [{cpu_nprocs, cpu_sup:nprocs()},
+     {cpu_avg1, cpu_sup:avg1()},
+     {cpu_avg5, cpu_sup:avg5()},
+     {cpu_avg15, cpu_sup:avg15()}].
+
+mem_stats() ->
+    {Total, Alloc, _} = memsup:get_memory_data(),
+    [{mem_total, Total},
+     {mem_allocated, Alloc}].
+
+disk_stats() ->
+    [{disk, disksup:get_disk_data()}].
     RiakWeb = {webmachine_mochiweb,
                  {webmachine_mochiweb, start, [riak_web:config()]},
                   permanent, 5000, worker, dynamic},
+    RiakStat = {riak_stat,
+                {riak_stat, start_link, []},
+                permanent, 5000, worker, [riak_stat]},
     
     % Figure out which processes we should run...
     IsWebConfigured = (riak:get_app_env(riak_web_ip) /= undefined) andalso (riak:get_app_env(riak_web_ip) /= "undefined"),
     HasStorageBackend = (riak:get_app_env(storage_backend) /= undefined) andalso (riak:get_app_env(storage_backend) /= "undefined"),
+    IsStatEnabled = (riak:get_app_env(riak_stat) == true),
     
     % Build the process list...
     Processes = lists:flatten([
         RingMgr, 
         Connect, 
         LocalLogger,
-        ?IF(IsWebConfigured, RiakWeb, [])
+        ?IF(IsWebConfigured, RiakWeb, []),
+        ?IF(IsStatEnabled, RiakStat, [])
     ]),
     
     % Run the proesses...

src/riak_vnode.erl

         {ok, Binary} -> {ok, binary_to_term(Binary)};
         X -> X
     end,
+    riak_stat:update(vnode_get),
     gen_fsm:send_event(FSM_pid, {r, RetVal, Idx, ReqID}).
 
 %% @private
     case syntactic_put_merge(Mod, ModState, BKey, DiffObj, ReqID) of
         {newobj, NewObj} ->
             Val = term_to_binary(NewObj),
-            Mod:put(ModState, BKey, Val);
+            Mod:put(ModState, BKey, Val),
+            riak_stat:update(vnode_put);
         _ -> nop
     end.
 
                     gen_fsm:send_event(FSM_pid, {dw, Idx, ReqID});
                 {error, _Reason} ->
                     gen_fsm:send_event(FSM_pid, {fail, Idx, ReqID})
-            end
+            end,
+            riak_stat:update(vnode_put)
     end.
 
 %% @private

src/spiraltime.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+%% @author Justin Sheehy <justin@basho.com>
+
+%% @doc A set of sliding windows for recording N-per-second running stats.
+%%
+%% This keeps stats per second for the last minute, per minute for an hour,
+%% per hour for a day, and per day for a week.
+%%
+%% The goal is not to have "perfect" stats; post-fact log analysis is
+%% better for that.  The goal here is to have approximate running
+%% data useful for quick understanding of performance trends.
+
+-module(spiraltime).
+-author('Justin Sheehy <justin@basho.com>').
+-export([fresh/0,fresh/1,n/0,incr/2,incr/3,
+         rep_second/1,rep_minute/1,rep_hour/1,rep_day/1,rep_week/1,
+         test_spiraltime/0]).
+
+%% @type moment() = integer().
+%% This is a number of seconds, as produced by
+%% calendar:datetime_to_gregorian_seconds(calendar:universal_time())
+
+%% @type count() = integer().
+%% The number of entries recorded in some time period.
+
+-record(spiral, {moment :: integer(),
+                 seconds :: [integer()],
+                 minutes :: [integer()],
+                 hours :: [integer()],
+                 days :: [integer()]}).
+
+n() ->
+    calendar:datetime_to_gregorian_seconds(calendar:universal_time()).
+
+%% @doc Create an empty spiral with which to begin recording entries.
+%% @spec fresh() -> spiral()
+fresh() ->
+    fresh(n()).
+
+%% @doc Create an empty spiral with which to begin recording entries.
+%% @spec fresh(moment()) -> spiral()
+fresh(Moment) ->
+    #spiral{moment=Moment,
+            seconds=[0 || _ <- lists:seq(1,60)],
+            minutes=[0 || _ <- lists:seq(1,60)],
+            hours=[0 || _ <- lists:seq(1,24)],
+            days=[0 || _ <- lists:seq(1,7)]}.
+
+fieldlen(#spiral.seconds) -> 60;
+fieldlen(#spiral.minutes) -> 60;
+fieldlen(#spiral.hours)   -> 24;
+fieldlen(#spiral.days)    -> 7.
+
+nextfield(#spiral.seconds) -> #spiral.minutes;
+nextfield(#spiral.minutes) -> #spiral.hours;
+nextfield(#spiral.hours)   -> #spiral.days;
+nextfield(#spiral.days)    -> done.
+
+%% @doc Produce the number of entries recorded in the last second.
+%% @spec rep_second(spiral()) -> {moment(), count()}
+rep_second(Spiral) ->
+    {Spiral#spiral.moment, hd(Spiral#spiral.seconds)}.
+
+%% @doc Produce the number of entries recorded in the last minute.
+%% @spec rep_minute(spiral()) -> {moment(), count()}
+rep_minute(Spiral) ->
+    {Minute,_} = lists:split(60,Spiral#spiral.seconds),
+    {Spiral#spiral.moment, lists:sum(Minute)}.
+
+%% @doc Produce the approximate number of entries recorded in the last hour.
+%% @spec rep_hour(spiral()) -> {moment(), count()}
+rep_hour(Spiral) ->
+    {Hour,_} = lists:split(60,Spiral#spiral.minutes),
+    {Spiral#spiral.moment, lists:sum(Hour)}.
+
+%% @doc Produce the approximate number of entries recorded in the last day.
+%% @spec rep_day(spiral()) -> {moment(), count()}
+rep_day(Spiral) ->
+    {Day,_} = lists:split(24,Spiral#spiral.hours),
+    {Spiral#spiral.moment, lists:sum(Day)}.
+
+%% @doc Produce the approximate number of entries recorded in the last week.
+%% @spec rep_week(spiral()) -> {moment(), count()}
+rep_week(Spiral) ->
+    {Week,_} = lists:split(7,Spiral#spiral.days),
+    {Spiral#spiral.moment, lists:sum(Week)}.
+
+%% @doc Add N to the counter of events, as recently as possible.
+%% @spec incr(count(), spiral()) -> spiral()
+incr(N, Spiral) -> incr(N,n(),Spiral).
+
+%% @doc Add N to the counter of events occurring at Moment.
+%% @spec incr(count(), moment(), spiral()) -> spiral()
+incr(N, Moment, Spiral) when Spiral#spiral.moment =:= Moment ->
+    % common case -- updates for "now"
+    Spiral#spiral{seconds=[hd(Spiral#spiral.seconds)+N|
+                           tl(Spiral#spiral.seconds)]};
+incr(_N, Moment, Spiral) when Spiral#spiral.moment - Moment > 60 ->
+    Spiral; % updates more than a minute old are dropped! whee!
+incr(N, Moment, Spiral) ->
+    S1 = update_moment(Moment, Spiral),
+    {Front,Back} = lists:split(S1#spiral.moment - Moment,
+                               S1#spiral.seconds),
+    S1#spiral{seconds=Front ++ [hd(Back)+N|tl(Back)]}.
+
+update_moment(Moment, Spiral) when Moment =< Spiral#spiral.moment ->
+    Spiral;
+update_moment(Moment, Spiral) when Moment - Spiral#spiral.moment > 36288000 ->
+    fresh(Moment);
+update_moment(Moment, Spiral) ->
+    update_moment(Moment, push(0, Spiral#spiral{
+                                    moment=Spiral#spiral.moment+1},
+                               #spiral.seconds)).
+
+getfield(Spiral,Field)   -> element(Field, Spiral).
+setfield(Spiral,X,Field) -> setelement(Field, Spiral, X).
+
+push(_N, Spiral, done) ->
+    Spiral;
+push(N, Spiral, Field) ->
+    Full = [N|getfield(Spiral,Field)],
+    Double = 2 * fieldlen(Field),
+    case length(Full) of
+        Double ->
+            {Keep, _Past} = lists:split(fieldlen(Field), Full),
+            push(lists:sum(Keep),setfield(Spiral,Keep,Field),nextfield(Field));
+        _ ->
+            setfield(Spiral,Full,Field)
+    end.
+
+test_spiraltime() ->
+    Start = n(),
+    S0 = fresh(Start),
+    S1 = incr(17, Start, S0),
+    PlusOne = Start+1,
+    S2 = incr(3, PlusOne, S1),
+    {PlusOne, 3} = rep_second(S2),
+    {PlusOne, 20} = rep_minute(S2),
+    true.