Commits

dizzyd  committed ec04d15 Merge

Merging with mainline

  • Participants
  • Parent commits aa9e5c1, fe43521

Comments (0)

Files changed (8)

File ebin/basho_bench.app

              basho_bench_driver_riakclient,
              basho_bench_driver_cassandra,
              basho_bench_driver_bitcask,
+             basho_bench_driver_hibari,
              basho_bench_driver_null,
              basho_bench_log,
              basho_bench_keygen,

File examples/hibari.config

+{mode, max}.
+%% Remember: actual rate = mode rate (below) * concurrent (below)
+%%
+%% SLF: Hrm, this doesn't seem to play very well with Hibari client
+%% protocol timeouts??  Or I just need to experiment more with
+%% changing both the per-generator rate and the # of concurrent
+%% generators?
+%{mode, {rate, 8}}.
+
+{duration, 15}.                                  % 1 minute is very short
+
+{concurrent, 100}.                                % 5 is low for Hibari
+
+{driver, basho_bench_driver_hibari}.
+
+{code_paths, [
+              %% If you have the Hibari source distribution pulled
+              %% from SourceForge and compiled, then change the prefix
+              %% for these paths from "/User/fritchie/src/hibari" to
+              %% the top of your Hibari source.
+
+              "/Users/fritchie/src/hibari/src/erl-apps/cluster-info__HEAD/ebin",
+              "/Users/fritchie/src/hibari/src/erl-apps/gdss__HEAD/ebin",
+              "/Users/fritchie/src/hibari/src/erl-apps/gmt-util__HEAD/ebin",
+              "/Users/fritchie/src/hibari/src/erl-apps/gdss-ubf-proto__HEAD/ebin",
+              "/Users/fritchie/src/hibari/src/erl-third-party/mochiweb__HEAD/ebin",
+              "/Users/fritchie/src/hibari/src/erl-tools/ubf-jsonrpc__HEAD/ebin",
+              "/Users/fritchie/src/hibari/src/erl-tools/ubf__HEAD/ebin"
+             ]}.
+
+%{key_generator, {uniform_int_bin, 35000}}.
+{key_generator, {uniform_int_bin, 99000}}.
+
+{value_generator, {fixed_bin, 1000}}.
+%{value_generator, {fixed_bin, 10000}}.
+
+%% Default is [{get,4}, {put,4}, {delete, 1}] which means that out
+%% of every 9 operations, 'get' will be called four times, 'put' will
+%% called four times, and 'delete' will be called once, on average.
+
+{operations, [{get,4}, {put,4}, {delete, 1}]}.
+
+%% hibari_table
+%%
+%% Name of the Hibari table to operate on.  Must be an atom.
+
+{hibari_table, tab1}.
+%{hibari_table, tab1_nosync}.
+
+%% hibari_client_type
+%%
+%% Type of Hibari client: native, ubf, ebf, jsf, json_rpc
+%% Uncomment only one of the pairs below.
+%%
+%% hibari_server_tcp_port
+%%
+%% For use with all Hibari clients other than the native Erlang client.
+%% (This attribute is ignored by the native Erlang client.)
+%%
+%% TCP port number for the target server.  Default port numbers:
+%%
+%%    EBF        7580
+%%    UBF        7581
+%%    JSF        7582
+
+%% TODO: Native type is broken due to gmt app can't start because of
+%%       reliance on init:get_argument(), and bash_bench isn't starting
+%%       with the extra cmd line args necessary.  Sending patch to Joe.
+%{hibari_client_type, native}.
+%{hibari_server_tcp_port, -1}.           % Value is unused but must be present
+
+{hibari_client_type, ebf}.
+{hibari_server_tcp_port, 7580}.
+
+%{hibari_client_type, ubf}.
+%{hibari_server_tcp_port, 7581}.
+
+%% TODO: Hibari's JSF support is broken right now, but I'm not sure?
+%{hibari_client_type, jsf}.
+%{hibari_server_tcp_port, 7582}.
+
+%% TODO: SLF: Untested!  Try testing against bb2e?
+%{hibari_client_type, json_rpc}.
+%{hibari_server_tcp_port, 22982377582}.
+
+%% hibari_servers
+%% 
+%% For use with all Hibari clients other than the native Erlang client.
+%% (This attribute is ignored by the native Erlang client.)
+%%
+%% Each concurrent load generator will use one server from this list.
+%% The assignment is made at plugin initialization time and will not change
+%% for the life of the load generator process.
+%%
+%% This list should include all Hibari servers in the cluster.  The
+%% list may contain valid DNS hostnames and/or IP addresses, both as
+%% Erlang strings.  (Don't forget commas between each machine, and
+%% don't put a comma after the last item in the list.)
+
+{hibari_servers, [
+                  %%"localhost"
+                  "bb2e"
+                 ]}.
+
+%% hibari_native_1node
+%%
+%% For the Erlang native client, we need the name of a single Erlang
+%% node in the Hibari cluster.  After we connect to that node, we'll
+%% automatically become aware of all other nodes in the cluster.
+
+{hibari_native_1node, 'gdss_dev2@sbb'}.
+
+%% hibari_native_my_sname
+%%
+%% For the Erlang native client, this is the name of the basho_bench
+%% node.
+
+{hibari_native_my_sname, 'basho_bench'}.
+
+%% hibari_native_ticktime
+%%
+%% For the Erlang native client, this value must match the value of
+%% 'cluster_timeout' in Hibari's "etc/central.conf" file.  The default
+%% is 20.
+
+{hibari_native_ticktime, 20}.
+
+%% hibari_native_cookie
+%%
+%% For the Erlang native client, this value specifies the "cookie"
+%% used by the Hibari server cluster.  See ~hibariuser/.erlang.cookie
+%% on one of the Hibari server nodes.  The cookie must be an atom, so
+%% be sure to use single quotes.
+
+{hibari_native_cookie, 'SKHZSZEOIVIYDXBZQHOB'}.
+

File rebar.config

 {deps, [
         {stats, "3", {hg, "http://bitbucket.org/dizzyd/stats", "tip"}},
-        {ibrowse, "1.*", {git, "http://github.com/dizzyd/ibrowse.git", "HEAD"}},
+        {ibrowse, ".*", {git, "http://github.com/dizzyd/ibrowse.git", "HEAD"}},
         {casbench, "0.1", {hg, "http://bitbucket.org/basho/casbench", "tip"}},
         {riakc, ".*", {hg, "http://bitbucket.org/basho/riak-erlang-client", "tip"}},
         {protobuffs, ".*", {hg, "http://bitbucket.org/basho/protobuffs", "tip"}}

File src/basho_bench_driver_hibari.erl

+%% -------------------------------------------------------------------
+%%
+%% basho_bench: Benchmarking Suite
+%%
+%% Copyright (c) 2010 Scott Lystig Fritchie, <slfritchie@snookles.com>
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(basho_bench_driver_hibari).
+
+-export([new/1,
+         run/4]).
+
+-include("basho_bench.hrl").
+
+%% UBF string helper
+-define(S(X), #'#S'{value=X}).
+
+%% UBF string record
+-record('#S', {value=""}).
+
+-define(BASIC_TIMEOUT, 10*1000).
+
+-record(state, {
+          hibari_type,
+          id,
+          hibari_servers,
+          hibari_port,
+          clnt,
+          table
+         }).
+
+%% ====================================================================
+%% API
+%% ====================================================================
+
+new(Id) ->
+    %% Try to tell the user politely if there's a missing movule and where
+    %% that module might be found.
+    DepMods = [
+               {brick_simple,
+                "/path/to/your-hibari-distro/src/erl-apps/gdss__HEAD/ebin"},
+               {gmt_util,
+                "/path/to/your-hibari-distro/src/erl-apps/gmt-util__HEAD/ebin"},
+               {ubf_gdss_plugin,
+                "/path/to/your-hibari-distro/src/erl-apps/gdss-ubf-proto__HEAD/ebin"},
+               {cluster_info,
+                "/path/to/your-hibari-distro/src/erl-apps/cluster-info__HEAD/ebin"},
+               {jsf,
+                "/path/to/your-hibari-distro/src/erl-tools/ubf-jsonrpc__HEAD/ebin"},
+               {mochijson2,
+                "/path/to/your-hibari-distro/src/erl-third-party/mochiweb__HEAD/ebin"},
+               {ubf_client,
+                "/path/to/your-hibari-distro/src/erl-tools/ubf__HEAD/ebin"}
+              ] ++
+        [{Mod, x} || Mod <- [
+                             %% gdss app
+                             brick_simple, brick_server, brick_hash, 
+                             %% gmt app
+                             gmt_util,
+                             %% ubf app
+                             contract_driver, ubf, ubf_client, ubf_driver
+                            ]],
+    if Id == 1 ->
+            F = fun({Mod, Dir}) ->
+                        case code:load_file(Mod) of
+                            {module, Mod} ->
+                                ok;
+                            {error, not_purged} ->
+                                %% This is OK: generator #1 crashed & restarted.
+                                ok;
+                            _Load ->
+                                ?ERROR("~p: error loading '~p' module: ~p.\n\n",
+                                       [?MODULE, Mod, _Load]),
+                                if is_list(Dir) ->
+                                        ?FAIL_MSG(
+                                           "Please double-check the path for "
+                                           "this module in\nthe basho_bench "
+                                           "config file, e.g.\n    ~s\n", 
+                                           [Dir]);
+                                   true ->
+                                        ok
+                                end
+                        end
+                end,
+            lists:map(F, DepMods),
+            ?INFO("All required modules are available.\n", []);
+       true ->
+            ok
+    end,
+
+    Table = basho_bench_config:get(hibari_table),
+    HibariServers = basho_bench_config:get(hibari_servers),
+    HibariType = basho_bench_config:get(hibari_client_type),
+    HibariTcpPort = basho_bench_config:get(hibari_server_tcp_port),
+    Native1Node = basho_bench_config:get(hibari_native_1node),
+    NativeSname = basho_bench_config:get(hibari_native_my_sname),
+    NativeTickTime = basho_bench_config:get(hibari_native_ticktime),
+    NativeCookie = basho_bench_config:get(hibari_native_cookie),
+
+    Clnt = make_clnt(HibariType, Id, NativeSname, NativeTickTime, NativeCookie,
+                     Native1Node, HibariServers, HibariTcpPort, false),
+
+    {ok, #state{hibari_type = HibariType,
+                id = Id,
+                hibari_servers = HibariServers,
+                hibari_port = HibariTcpPort,
+                clnt = Clnt,
+                table = Table
+               }}.
+
+run(get, KeyGen, _ValueGen, #state{hibari_type = ClientType, clnt = Clnt,
+                                   table = Table} = S) ->
+    Key = KeyGen(),
+    case do(ClientType, Clnt, Table, [brick_server:make_get(Key)]) of
+        [{ok, _TS, _Val}] ->
+            {ok, S};
+        [key_not_exist] ->
+            %%io:format("g!"),
+            {ok, S};
+        timeout ->
+            {error, timeout, close_and_reopen(S)};
+        _Else ->
+            {'EXIT', _Else}
+    end;
+run(put, KeyGen, ValueGen, #state{hibari_type = ClientType, clnt = Clnt,
+                                  table = Table} = S) ->
+    Key = KeyGen(),
+    Val = ValueGen(),
+    case do(ClientType, Clnt, Table, [brick_server:make_set(Key, Val)]) of
+        [ok] ->
+            {ok, S};
+        [{ts_error, _TS}] ->
+            %% Honest race with another put.
+            {ok, S};
+        timeout ->
+            {error, timeout, close_and_reopen(S)};
+        _Else ->
+            {'EXIT', _Else}
+    end;
+run(delete, KeyGen, _ValueGen, #state{hibari_type = ClientType, clnt = Clnt,
+                                      table = Table} = S) ->
+    Key = KeyGen(),
+    case do(ClientType, Clnt, Table, [brick_server:make_delete(Key)]) of
+        [ok] ->
+            {ok, S};
+        [key_not_exist] ->
+            %%io:format("d!"),
+            {ok, S};
+        timeout ->
+            {error, timeout, close_and_reopen(S)};
+        _Else ->
+            {'EXIT', _Else}
+    end.
+
+%%%
+%%% Private funcs
+%%%
+
+do(native, _Clnt, Table, OpList) ->
+    try
+        brick_simple:do(Table, OpList, ?BASIC_TIMEOUT)
+    catch _X:_Y ->
+        ?ERROR("Error on ~p: ~p ~p\n", [Table, _X, _Y]),
+        {error, {_X, _Y}}                                 
+    end;
+do(Type, Clnt, Table, OpList)
+  when Type == ubf; Type == ebf; Type == jsf ->
+    try begin
+            DoOp = {do, Table, OpList, [], ?BASIC_TIMEOUT+100},
+            {reply, Res, none} = ubf_client:rpc(Clnt, DoOp, ?BASIC_TIMEOUT),
+            Res
+        end
+    catch
+        error:{badmatch, {error, socket_closed}} ->
+            socket_closed;
+        error:{badmatch, timeout} ->
+            timeout;                                                     
+        error:badpid ->
+            %% This error corresponds to a timeout error .....
+            %%
+            %%
+            ?ERROR("TODO: error:badpid for ~p\n", [Clnt]),
+            error_badpid;
+        _X:_Y ->
+            ?ERROR("Error on ~p: ~p ~p\n", [Table, _X, _Y]),
+            bummer
+    end.
+
+make_clnt(HibariType, Id, NativeSname, NativeTickTime, NativeCookie,
+          Native1Node, HibariServers, HibariTcpPort, RetryP) ->
+    case HibariType of
+        native when Id == 1 ->
+            ?INFO("Try to start net_kernel with name ~p\n", [NativeSname]),
+            case net_kernel:start([NativeSname, shortnames, 1000*NativeTickTime]) of
+                {ok, _} ->
+                    ?INFO("Net kernel started as ~p\n", [node()]);
+                {error, {already_started, _}} ->
+                    ok;
+                {error, Reason} ->
+                    ?FAIL_MSG("Failed to start net_kernel for ~p: ~p\n", [?MODULE, Reason])
+            end,
+
+            ?INFO("Set cookie to ~p\n", [NativeCookie]),
+            true = erlang:set_cookie(node(), NativeCookie),
+
+            ?INFO("Try to ping ~p\n", [Native1Node]),
+            case net_adm:ping(Native1Node) of
+                pong ->
+                    ok;
+                pang ->
+                    ?FAIL_MSG("~s: cannot ping ~p, aborting!\n",
+                              [?MODULE, Native1Node])
+            end,
+            application:start(sasl),
+            application:start(crypto),
+            EmptyPath = "./empty_file",
+            ok = file:write_file(EmptyPath, <<>>),
+            application:set_env(gmt, central_config, EmptyPath),
+            ok = application:start(gmt),
+            ok = application:start(gdss_client),
+            timer:sleep(2000),
+            ?INFO("Hibari client app started\n", []),
+            undefined;
+        native ->
+            %% All the work was done above in Id == 1 clause
+            undefined;
+        X when X == ebf; X == ubf; X == jsf ->
+            %% Choose the node using our ID as a modulus
+            Server = lists:nth((Id rem length(HibariServers)+1),
+                               HibariServers),
+            if not RetryP -> ?INFO("Using server ~p for ~p worker ~p\n",
+                                   [Server, HibariType, Id]);
+               true       -> ok
+            end,
+            case ubf_client:connect(Server, HibariTcpPort,
+                                    [{proto, HibariType}], 60*1000) of
+                {ok, Pid, _} ->
+                    {reply, {ok, ok}, none} =
+                        ubf_client:rpc(Pid, {startSession, ?S("gdss"), []},
+                                       60*1000),
+                    Pid;
+                Error ->
+                    ?FAIL_MSG("~s: id ~p cannot connect to "
+                              "~p port ~p: ~p\n",
+                              [?MODULE, Id, Server, HibariTcpPort, Error])
+            end
+    end.
+
+%% UBF/EBF/JSF timeouts cause problems for basho_bench.  If we report
+%% an error, we'll timeout.  But if we don't report an error, then
+%% basho_bench will assume that we can continue using the same #state
+%% and therefore the same UBF/EBF/JSF client ... but that client is
+%% now in an undefined state and cannot be used again.
+%%
+%% So, we use this function to forcibly close the current client and
+%% open a new one.  For the native client, this ends up doing almost
+%% exactly nothing.
+
+close_and_reopen(#state{clnt = OldClnt,
+                        id = OldId,
+                        hibari_type = HibariType,
+                        hibari_servers = HibariServers,
+                        hibari_port = HibariTcpPort} = S) ->
+    %%?INFO("close_and_reopen #~p\n", [OldId]),
+    catch ubf_client:close(OldClnt),
+
+    %% Take advantage of the fact that native clients don't need to re-do
+    %% anything, so just pass 'foo' instead.
+    Id = if HibariType == native -> foo;
+            true                 -> OldId
+         end,
+    NewClnt = make_clnt(HibariType, Id, foo, foo, foo,
+                        foo, HibariServers, HibariTcpPort, true),
+    S#state{clnt = NewClnt}.
+

File src/basho_bench_driver_http_raw.erl

                  base_urls_index,    % #url to use for next request
                  path_params }).     % Params to append on the path
 
+
 %% ====================================================================
 %% API
 %% ====================================================================
     Port = basho_bench_config:get(http_raw_port, 8098),
     Path = basho_bench_config:get(http_raw_path, "/riak/test"),
     Params = basho_bench_config:get(http_raw_params, ""),
+    Disconnect = basho_bench_config:get(http_raw_disconnect_frequency, infinity),
+
+    case Disconnect of
+        infinity -> ok;
+        Seconds when is_integer(Seconds) -> ok;
+        {ops, Ops} when is_integer(Ops) -> ok;
+        _ -> ?FAIL_MSG("Invalid configuration for http_raw_disconnect_frequency: ~p~n", [Disconnect])
+    end,
+
+    %% Uses pdict to avoid threading state record through lots of functions
+    erlang:put(disconnect_freq, Disconnect),
 
     %% If there are multiple URLs, convert the list to a tuple so we can efficiently
     %% round-robin through them.
         {error, Reason} ->
             {error, Reason, S2}
     end;
+run(get_existing, KeyGen, _ValueGen, State) ->
+    {NextUrl, S2} = next_url(State),
+    case do_get(url(NextUrl, KeyGen, State#state.path_params)) of
+        {not_found, Url} ->
+            {error, {not_found, Url}, S2};
+        {ok, _Url, _Headers} ->
+            {ok, S2};
+        {error, Reason} ->
+            {error, Reason, S2}
+    end;
 run(update, KeyGen, ValueGen, State) ->
     {NextUrl, S2} = next_url(State),
     case do_get(url(NextUrl, KeyGen, State#state.path_params)) of
                     {error, Reason, S2}
             end
     end;
+run(update_existing, KeyGen, ValueGen, State) ->
+    {NextUrl, S2} = next_url(State),
+    case do_get(url(NextUrl, KeyGen, State#state.path_params)) of
+        {error, Reason} ->
+            {error, Reason, S2};
+
+        {not_found, Url} ->
+            {error, {not_found, Url}, S2};
+
+        {ok, Url, Headers} ->
+            Vclock = lists:keyfind("X-Riak-Vclock", 1, Headers),
+            case do_put(Url, [State#state.client_id, Vclock], ValueGen) of
+                ok ->
+                    {ok, S2};
+                {error, Reason} ->
+                    {error, Reason, S2}
+            end
+    end;
 run(insert, KeyGen, ValueGen, State) ->
     %% Go ahead and evaluate the keygen so that we can use the
     %% sequential_int_gen to do a controlled # of inserts (if we desire). Note
     erlang:erase({ibrowse_pid, Url#url.host}),
     ok.
 
+maybe_disconnect(Url) ->
+    case erlang:get(disconnect_freq) of
+        infinity -> ok;
+        {ops, Count} -> should_disconnect_ops(Count,Url) andalso disconnect(Url);
+        Seconds -> should_disconnect_secs(Seconds,Url) andalso disconnect(Url)
+    end.
+
+should_disconnect_ops(Count, Url) ->
+    Key = {ops_since_disconnect, Url#url.host},
+    case erlang:get(Key) of
+        undefined ->
+            erlang:put(Key, 1),
+            false;
+        Count ->
+            erlang:put(Key, 0),
+            true;
+        Incr ->
+            erlang:put(Key, Incr + 1),
+            false
+    end.
+
+should_disconnect_secs(Seconds, Url) ->
+    Key = {last_disconnect, Url#url.host},
+    case erlang:get(Key) of
+        undefined ->
+            erlang:put(Key, erlang:now()),
+            false;
+        Time when is_tuple(Time) andalso size(Time) == 3 ->
+            Diff = timer:now_diff(erlang:now(), Time),
+            if
+                Diff >= Seconds * 1000000 ->
+                    erlang:put(Key, erlang:now()),
+                    true;
+                true -> false
+            end
+    end.
+
+clear_disconnect_freq(Url) ->
+    case erlang:get(disconnect_freq) of
+        infinity -> ok;
+        {ops, _Count} -> erlang:put({ops_since_disconnect, Url#url.host}, 0);
+        _Seconds -> erlang:put({last_disconnect, Url#url.host}, erlang:now())
+    end.
 
 send_request(Url, Headers, Method, Body, Options) ->
     send_request(Url, Headers, Method, Body, Options, 3).
     Pid = connect(Url),
     case catch(ibrowse_http_client:send_req(Pid, Url, Headers, Method, Body, Options, 5000)) of
         {ok, Status, RespHeaders, RespBody} ->
+            maybe_disconnect(Url),
             {ok, Status, RespHeaders, RespBody};
 
         Error ->
+            clear_disconnect_freq(Url),
             disconnect(Url),
             case should_retry(Error) of
                 true ->

File src/basho_bench_driver_null.erl

 new(_Id) ->
     {ok, undefined}.
 
-run(get, KeyGen, _ValueGen, _State) ->
+run(get, KeyGen, _ValueGen, State) ->
     Key = KeyGen(),
-    {ok, Key};
-run(put, KeyGen, ValueGen, _State) ->
+    {ok, State};
+run(put, KeyGen, ValueGen, State) ->
     Key = KeyGen(),
     ValueGen(),
-    {ok, Key};
-run(delete, KeyGen, _ValueGen, _State) ->
+    {ok, State};
+run(delete, KeyGen, _ValueGen, State) ->
     Key = KeyGen(),
-    {ok, Key}.
+    {ok, State}.
 

File src/basho_bench_driver_riakc_pb.erl

 
     Ips  = basho_bench_config:get(riakc_pb_ips, [{127,0,0,1}]),
     Port  = basho_bench_config:get(riakc_pb_port, 8087),
-    %% riakc_pb_replies sets defaults for R, W, DW and RW. 
+    %% riakc_pb_replies sets defaults for R, W, DW and RW.
     %% Each can be overridden separately
     Replies = basho_bench_config:get(riakc_pb_replies, 2),
     R = basho_bench_config:get(riakc_pb_r, Replies),
                           w = W,
                           dw = DW,
                           rw = RW
-                        }};
+                         }};
         {error, Reason2} ->
             ?FAIL_MSG("Failed to connect riakc_pb_socket to ~p port ~p: ~p\n",
                       [TargetIp, Port, Reason2])
 
 run(get, KeyGen, _ValueGen, State) ->
     Key = KeyGen(),
-    case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key, 
+    case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key,
                              [{r, State#state.r}]) of
         {ok, _} ->
             {ok, State};
         {error, Reason} ->
             {error, Reason, State}
     end;
+run(get_existing, KeyGen, _ValueGen, State) ->
+    Key = KeyGen(),
+    case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key,
+                             [{r, State#state.r}]) of
+        {ok, _} ->
+            {ok, State};
+        {error, notfound} ->
+            {error, {not_found, Key}, State};
+        {error, Reason} ->
+            {error, Reason, State}
+    end;
 run(put, KeyGen, ValueGen, State) ->
     Robj0 = riakc_obj:new(State#state.bucket, KeyGen()),
     Robj = riakc_obj:update_value(Robj0, ValueGen()),
     end;
 run(update, KeyGen, ValueGen, State) ->
     Key = KeyGen(),
-    case riakc_pb_socket:get(State#state.pid, State#state.bucket, 
+    case riakc_pb_socket:get(State#state.pid, State#state.bucket,
                              Key, [{r, State#state.r}]) of
         {ok, Robj} ->
             Robj2 = riakc_obj:update_value(Robj, ValueGen()),
             case riakc_pb_socket:put(State#state.pid, Robj2, [{w, State#state.w},
-                                                              {dw, State#state.dw}]) of 
+                                                              {dw, State#state.dw}]) of
                 ok ->
                     {ok, State};
                 {error, Reason} ->
             Robj0 = riakc_obj:new(State#state.bucket, KeyGen()),
             Robj = riakc_obj:update_value(Robj0, ValueGen()),
             case riakc_pb_socket:put(State#state.pid, Robj, [{w, State#state.w},
-                                                             {dw, State#state.dw}]) of 
+                                                             {dw, State#state.dw}]) of
                 ok ->
                     {ok, State};
                 {error, Reason} ->
                     {error, Reason, State}
             end
     end;
+run(update_existing, KeyGen, ValueGen, State) ->
+    Key = KeyGen(),
+    case riakc_pb_socket:get(State#state.pid, State#state.bucket,
+                             Key, [{r, State#state.r}]) of
+        {ok, Robj} ->
+            Robj2 = riakc_obj:update_value(Robj, ValueGen()),
+            case riakc_pb_socket:put(State#state.pid, Robj2, [{w, State#state.w},
+                                                              {dw, State#state.dw}]) of
+                ok ->
+                    {ok, State};
+                {error, Reason} ->
+                    {error, Reason, State}
+            end;
+        {error, notfound} ->
+            {error, {not_found, Key}, State}
+    end;
 run(delete, KeyGen, _ValueGen, State) ->
     %% Pass on rw
-    case riakc_pb_socket:delete(State#state.pid, State#state.bucket, KeyGen(), 
+    case riakc_pb_socket:delete(State#state.pid, State#state.bucket, KeyGen(),
                                 [{rw, State#state.rw}]) of
         ok ->
             {ok, State};

File src/basho_bench_keygen.erl

 new({sequential_int_str, MaxKey}, _Id) ->
     Ref = make_ref(),
     fun() -> Key = sequential_int_generator(Ref, MaxKey), integer_to_list(Key) end;
+new({partitioned_sequential_int, MaxKey}, Id) ->
+    Workers = basho_bench_config:get(concurrent),
+    MaxValue = MaxKey div Workers,
+    MinValue = MaxValue * (Id - 1),
+    Ref = make_ref(),
+    fun() -> sequential_int_generator(Ref,MaxValue) + MinValue end;
+new({partitioned_sequential_int_bin, MaxKey}, Id) ->
+    Gen = new({partitioned_sequential_int, MaxKey}, Id),
+    fun() -> <<(Gen()):32/native>> end;
+new({partitioned_sequential_int_str, MaxKey}, Id) ->
+    Gen = new({partitioned_sequential_int, MaxKey}, Id),
+    fun() -> integer_to_list(Gen()) end;
 new({uniform_int_bin, MaxKey}, _Id) ->
     fun() -> Key = random:uniform(MaxKey), <<Key:32/native>> end;
 new({uniform_int_str, MaxKey}, _Id) ->
 new({pareto_int_bin, MaxKey}, _Id) ->
     Pareto = pareto(trunc(MaxKey * 0.2), ?PARETO_SHAPE),
     fun() -> <<(Pareto()):32/native>> end;
+new({pareto_int_str, MaxKey}, _Id) ->
+    Pareto = pareto(trunc(MaxKey * 0.2), ?PARETO_SHAPE),
+    fun() -> integer_to_list(Pareto()) end;
+new({truncated_pareto_int, MaxKey}, Id) ->
+    Pareto = new({pareto_int, MaxKey}, Id),
+    fun() -> erlang:min(MaxKey, Pareto()) end;
+new({truncated_pareto_int_bin, MaxKey}, Id) ->
+    TPareto = new({truncated_pareto_int, MaxKey}, Id),
+    fun() -> <<(TPareto()):32/native>> end;
 new({function, Module, Function, Args}, Id) ->
     case code:ensure_loaded(Module) of
         {module, Module} ->
     MaxKey;
 dimension({sequential_int_str, MaxKey}) ->
     MaxKey;
+dimension({partitioned_sequential_int, MaxKey}) ->
+    MaxKey;
+dimension({partitioned_sequential_int_bin, MaxKey}) ->
+    MaxKey;
+dimension({partitioned_sequential_int_str, MaxKey}) ->
+    MaxKey;
 dimension({uniform_int_bin, MaxKey}) ->
     MaxKey;
 dimension({uniform_int_str, MaxKey}) ->