dizzyd avatar dizzyd committed 6c966fd

Moving riak into place

Comments (0)

Files changed (124)

apps/riak/ebin/riak.app

+% -*- mode: erlang -*-
+{application, riak,
+ [{description, "riak"},
+  {vsn, "0.6"},
+  {modules, [
+             chash,
+             gen_server2,
+             jaywalker_resource,
+             jiak,
+             jiak_client,
+             jiak_context,
+             jiak_context_tests,
+             jiak_default,
+             jiak_default_tests,
+             jiak_example,
+             jiak_object,
+             jiak_resource,
+             jiak_util,
+             json_pp,
+             merkerl,
+             priority_queue,
+             raw_http_resource,
+             raw_link_walker_resource,
+             riak,
+             riak_app,
+             riak_backup,
+             riak_bucket,
+             riak_claim,
+             riak_client,
+             riak_connect,
+             riak_delete,
+             riak_dets_backend,
+             riak_ets_backend,
+             riak_event_logger,
+             riak_eventer,
+             riak_fs_backend,
+             riak_gb_trees_backend,
+             riak_get_fsm,
+             riak_keys_fsm,
+             riak_local_logger,
+             riak_map_executor,
+             riak_map_localphase,
+             riak_map_phase_fsm,
+             riak_mapreduce,
+             riak_mapreduce_fsm,
+             riak_object,
+             riak_osmos_backend,
+             riak_put_fsm,
+             riak_reduce_phase_fsm,
+             riak_ring,
+             riak_ring_manager,
+             riak_startup,
+             riak_stat,
+             riak_sup,
+             riak_test_util,
+             riak_util,
+             riak_vnode,
+             riak_vnode_master,
+             riak_web,
+             spiraltime,
+             vclock
+            ]},
+  {applications, [
+                  kernel,
+                  stdlib,
+                  sasl,
+                  crypto
+                 ]},
+  {registered, []},
+  {mod, {riak_app, []}},
+  {env, []}
+ ]}.
+
Add a comment to this file

apps/riak/priv/.empty_for_hg

Empty file added.

apps/riak/src/Makefile

+all: $(EBIN_FILES_NO_DOCS)
+
+docs: $(ERL_DOCUMENTS)
+
+debug:
+	$(MAKE) DEBUG=-DDEBUG
+
+clean:
+	rm -rf $(EBIN_FILES_NO_DOCS)

apps/riak/src/chash.erl

+%% @copyright 2007-2008 Basho Technologies
+
+%% @reference Karger, D.; Lehman, E.; Leighton, T.; Panigrahy, R.; Levine, M.; Lewin, D. (1997). "Consistent hashing and random trees". Proceedings of the twenty-ninth annual ACM symposium on Theory of computing: 654~663. ACM Press New York, NY, USA
+
+% @author Justin Sheehy <justin@basho.com>
+% @author Andy Gross <andy@basho.com>
+
+% @doc A consistent hashing implementation.
+%      The space described by the ring coincides with SHA-1 hashes,
+%      and so any two keys producing the same SHA-1 hash are
+%      considered identical within the ring.
+
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+
+%% http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(chash).
+-author('Justin Sheehy <justin@basho.com>').
+-author('Andy Gross <andy@basho.com>').
+
+-export([fresh/2,update/3,lookup/2,members/1,size/1,nodes/1,
+     successors/2,successors/3,
+     predecessors/2,predecessors/3,
+     contains_name/2,key_of/1,
+     merge_rings/2]).
+    
+-define(RINGTOP, trunc(math:pow(2,160)-1)).  % SHA-1 space
+-include_lib("eunit/include/eunit.hrl").
+
+% @type chash() = {NumPartitions, [NodeEntry]}
+%  NumPartitions = integer()
+%  NodeEntry = {IndexAsInt, Node}
+%  IndexAsInt = integer()
+%  Node = node().
+% It is not recommended that code outside this module make use
+% of the structure of a chash.
+
+% @type index() = binary().
+% Indices into the ring, used as keys for object location, are binary
+% representations of 160-bit integers.
+
+% @type node() = term().
+% A Node is the unique identifier for the owner of a given partition.
+% An Erlang Pid works well here, but the chash module allows it to
+% be any term.
+
+% @doc Create a brand new ring.  The size and seednode are specified;
+%      initially all partitions are owned by the seednode.  If NumPartitions
+%      is not much larger than the intended eventual number of
+%       participating nodes, then performance will suffer.
+% @spec fresh(NumPartitions :: integer(), SeedNode :: node()) -> chash()
+fresh(NumPartitions, SeedNode) ->
+    Inc = ?RINGTOP div NumPartitions,
+    {NumPartitions, [{IndexAsInt, SeedNode} ||
+           IndexAsInt <- lists:seq(0,(?RINGTOP-1),Inc)]}.
+
+% @doc Find the Node that owns the partition identified by IndexAsInt.
+% @spec lookup(IndexAsInt :: integer(), CHash :: chash()) -> node()
+lookup(IndexAsInt, CHash) ->
+    {_NumPartitions, Nodes} = CHash,
+    {IndexAsInt, X} = proplists:lookup(IndexAsInt, Nodes),
+    X.
+
+% @doc Return true if named Node owns any partitions in the ring, else false.
+% @spec contains_name(Name :: node(), CHash :: chash()) -> bool()
+contains_name(Name, CHash) ->
+    {_NumPartitions, Nodes} = CHash,
+    case [X || {_,X} <- Nodes, X == Name] of
+    [] ->
+        false;
+    _ ->
+        true
+    end.
+
+% @doc Make the partition beginning at IndexAsInt owned by Name'd node.
+% @spec update(IndexAsInt :: integer(), Name :: node(), CHash :: chash())
+%                -> chash()
+update(IndexAsInt, Name, CHash) ->
+    {NumPartitions, Nodes} = CHash,
+    NewNodes = lists:keyreplace(IndexAsInt, 1, Nodes, {IndexAsInt, Name}),
+    {NumPartitions, NewNodes}.
+
+% @doc Given an object key, return all NodeEntries in order starting at Index.
+% @spec successors(Index :: index(), CHash :: chash()) -> [NodeEntry]
+successors(Index, CHash) ->
+    {NumPartitions, _Nodes} = CHash,
+    successors(Index, CHash, NumPartitions).
+% @doc Given an object key, return the next N NodeEntries in order
+%      starting at Index.
+% @spec successors(Index :: index(), CHash :: chash(), N :: integer())
+%                                                             -> [NodeEntry]
+successors(Index, CHash, N) ->
+    Num = max_n(N, CHash),
+    {Res, _} = lists:split(Num, ordered_from(Index, CHash)),
+    Res.
+
+% @doc Given an object key, return all NodeEntries in reverse order
+%      starting at Index.
+% @spec predecessors(Index :: index(), CHash :: chash()) -> [NodeEntry]
+predecessors(Index, CHash) ->
+    {NumPartitions, _Nodes} = CHash,
+    predecessors(Index, CHash, NumPartitions).
+% @doc Given an object key, return the next N NodeEntries in reverse order
+%      starting at Index.
+% @spec predecessors(Index :: index(), CHash :: chash(), N :: integer())
+%                                                             -> [NodeEntry]
+predecessors(Index, CHash, N) ->
+    Num = max_n(N, CHash),
+    {Res, _} = lists:split(Num, lists:reverse(ordered_from(Index,CHash))),
+    Res.
+
+% @doc Return either N or the number of partitions in the ring, whichever
+%      is lesser.
+% @spec max_n(N :: integer(), CHash :: chash()) -> integer()
+max_n(N, {NumPartitions, _Nodes}) ->
+    if
+    N > NumPartitions ->
+        NumPartitions;
+    true ->
+        N
+    end.    
+
+% @doc Given an object key, return all NodeEntries in order starting at Index.
+% @spec ordered_from(Index :: index(), CHash :: chash()) -> [NodeEntry]
+ordered_from(Index, {NumPartitions, Nodes}) ->
+    <<IndexAsInt:160/integer>> = Index,
+    Inc = ?RINGTOP div NumPartitions,
+    {A, B} = lists:split((IndexAsInt div Inc)+1, Nodes),
+    B ++ A.
+
+% @doc Given any term used to name an object, produce that object's key
+%      into the ring.  Two names with the same SHA-1 hash value are
+%      considered the same name.
+% @spec key_of(ObjectName :: term()) -> index()
+key_of(ObjectName) ->    
+    crypto:sha(term_to_binary(ObjectName)).
+
+% @doc Return all Nodes that own any partitions in the ring.
+% @spec members(CHash :: chash()) -> [Node]
+members(CHash) ->
+    {_NumPartitions, Nodes} = CHash,
+    lists:usort([X || {_Idx,X} <- Nodes]).
+
+% @doc Return the entire set of NodeEntries in the ring.
+% @spec nodes(CHash :: chash()) -> [NodeEntry]
+nodes(CHash) ->
+    {_NumPartitions, Nodes} = CHash,
+    Nodes.
+
+% @doc Return a randomized merge of two rings.
+%      If multiple nodes are actively claiming nodes in the same
+%      time period, churn will occur.  Be prepared to live with it.
+% @spec merge_rings(CHashA :: chash(), CHashB :: chash()) -> chash()
+merge_rings(CHashA,CHashB) ->
+    {NumPartitions, NodesA} = CHashA,
+    {NumPartitions, NodesB} = CHashB,
+    {NumPartitions, [{I,randomnode(A,B)} || 
+           {{I,A},{I,B}} <- lists:zip(NodesA,NodesB)]}.
+
+% @spec randomnode(NodeA :: node(), NodeB :: node()) -> node()
+randomnode(NodeA,NodeA) -> NodeA;
+randomnode(NodeA,NodeB) -> lists:nth(crypto:rand_uniform(1,3),[NodeA,NodeB]).
+
+% @doc Return the number of partitions in the ring.
+% @spec size(CHash :: chash()) -> integer()
+size(CHash) ->
+    {NumPartitions,_Nodes} = CHash,
+    NumPartitions.
+
+update_test() ->
+    Node = 'old@host', NewNode = 'new@host',
+    
+    % Create a fresh ring...
+    CHash = chash:fresh(5, Node),
+    GetNthIndex = fun(N, {_, Nodes}) -> {Index, _} = lists:nth(N, Nodes), Index end,
+    
+    % Test update...
+    FirstIndex = GetNthIndex(1, CHash),
+    ThirdIndex = GetNthIndex(3, CHash),
+    {5, [{_, NewNode}, {_, Node}, {_, Node}, {_, Node}, {_, Node}, {_, Node}]} = update(FirstIndex, NewNode, CHash),
+    {5, [{_, Node}, {_, Node}, {_, NewNode}, {_, Node}, {_, Node}, {_, Node}]} = update(ThirdIndex, NewNode, CHash).
+
+contains_test() ->
+    CHash = chash:fresh(8, the_node),
+    ?assertEqual(true, contains_name(the_node,CHash)),
+    ?assertEqual(false, contains_name(some_other_node,CHash)).
+
+max_n_test() ->
+    CHash = chash:fresh(8, the_node),
+    ?assertEqual(1, max_n(1,CHash)),
+    ?assertEqual(8, max_n(11,CHash)).
+    
+simple_size_test() ->
+    ?assertEqual(8, length(chash:nodes(chash:fresh(8,the_node)))).
+
+successors_length_test() ->
+    ?assertEqual(8, length(chash:successors(chash:key_of(0),
+                                            chash:fresh(8,the_node)))).
+inverse_pred_test() ->
+    CHash = chash:fresh(8,the_node),
+    S = [I || {I,_} <- chash:successors(chash:key_of(4),CHash)],
+    P = [I || {I,_} <- chash:predecessors(chash:key_of(4),CHash)],
+    ?assertEqual(S,lists:reverse(P)).
+
+merge_test() ->
+    CHashA = chash:fresh(8,node_one),
+    CHashB = chash:update(0,node_one,chash:fresh(8,node_two)),
+    CHash = chash:merge_rings(CHashA,CHashB),
+    ?assertEqual(node_one,chash:lookup(0,CHash)).

apps/riak/src/gen_server2.erl

+%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) the module name is gen_server2
+%%
+%% 2) more efficient handling of selective receives in callbacks
+%% gen_server2 processes drain their message queue into an internal
+%% buffer before invoking any callback module functions. Messages are
+%% dequeued from the buffer for processing. Thus the effective message
+%% queue of a gen_server2 process is the concatenation of the internal
+%% buffer and the real message queue.
+%% As a result of the draining, any selective receive invoked inside a
+%% callback is less likely to have to scan a large message queue.
+%%
+%% 3) gen_server2:cast is guaranteed to be order-preserving
+%% The original code could reorder messages when communicating with a
+%% process on a remote node that was not currently connected.
+%%
+%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
+%% allow callers to attach priorities to requests. Requests with
+%% higher priorities are processed before requests with lower
+%% priorities. The default priority is 0.
+%%
+%% 5) On return from init/1, the timeout value {binary, Min} creates a
+%% binary exponential timeout, where Min is the minimum number of
+%% milliseconds permitted, and is also used as the current timeout
+%% value. Returning from handle_* with the timeout value set to
+%% 'binary' will use the current binary timeout value. handle_info/2
+%% with the Info of 'timeout' will function normally, and supports the
+%% return value of {noreply, State, hibernate} which will hibernate
+%% the process. The current timeout value is:
+%%
+%% a) doubled if the time spent in hibernation is < 4 * the current value;
+%% b) halved if the time spent in hibernation is > 16 * the current value;
+%% c) maintained in all other cases
+%%
+%% Explicit timeouts (i.e. not 'binary') from the handle_* functions
+%% are still supported, and do not have any effect on the current
+%% timeout value.
+
+%% All modifications are (C) 2009 LShift Ltd.
+
+%% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at http://www.erlang.org/.
+%% 
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%% 
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+%% 
+%%     $Id$
+%%
+-module(gen_server2).
+
+%%% ---------------------------------------------------
+%%%
+%%% The idea behind THIS server is that the user module
+%%% provides (different) functions to handle different
+%%% kind of inputs. 
+%%% If the Parent process terminates the Module:terminate/2
+%%% function is called.
+%%%
+%%% The user module should export:
+%%%
+%%%   init(Args)  
+%%%     ==> {ok, State}
+%%%         {ok, State, Timeout}
+%%%         ignore
+%%%         {stop, Reason}
+%%%
+%%%   handle_call(Msg, {From, Tag}, State)
+%%%
+%%%    ==> {reply, Reply, State}
+%%%        {reply, Reply, State, Timeout}
+%%%        {noreply, State}
+%%%        {noreply, State, Timeout}
+%%%        {stop, Reason, Reply, State}  
+%%%              Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%%   handle_cast(Msg, State)
+%%%
+%%%    ==> {noreply, State}
+%%%        {noreply, State, Timeout}
+%%%        {stop, Reason, State} 
+%%%              Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%%   handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
+%%%
+%%%    ==> {noreply, State}
+%%%        {noreply, State, Timeout}
+%%%        {stop, Reason, State} 
+%%%              Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%%   terminate(Reason, State) Let the user module clean up
+%%%        always called when server terminates
+%%%
+%%%    ==> ok
+%%%
+%%%
+%%% The work flow (of the server) can be described as follows:
+%%%
+%%%   User module                          Generic
+%%%   -----------                          -------
+%%%     start            ----->             start
+%%%     init             <-----              .
+%%%
+%%%                                         loop
+%%%     handle_call      <-----              .
+%%%                      ----->             reply
+%%%
+%%%     handle_cast      <-----              .
+%%%
+%%%     handle_info      <-----              .
+%%%
+%%%     terminate        <-----              .
+%%%
+%%%                      ----->             reply
+%%%
+%%%
+%%% ---------------------------------------------------
+
+%% API
+-export([start/3, start/4,
+	 start_link/3, start_link/4,
+	 call/2, call/3, pcall/3, pcall/4,
+	 cast/2, pcast/3, reply/2,
+	 abcast/2, abcast/3,
+	 multi_call/2, multi_call/3, multi_call/4,
+	 enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
+
+-export([behaviour_info/1]).
+
+%% System exports
+-export([system_continue/3,
+	 system_terminate/4,
+	 system_code_change/4,
+	 format_status/2]).
+
+%% Internal exports
+-export([init_it/6, print_event/3]).
+
+-import(error_logger, [format/2]).
+
+%%%=========================================================================
+%%%  API
+%%%=========================================================================
+
+-ifdef(use_specs).
+-spec behaviour_info(atom()) -> 'undefined' | [{atom(), any()}].
+-endif.
+
+behaviour_info(callbacks) ->
+    [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
+     {terminate,2},{code_change,3}];
+behaviour_info(_Other) ->
+    undefined.
+
+%%%  -----------------------------------------------------------------
+%%% Starts a generic server.
+%%% start(Mod, Args, Options)
+%%% start(Name, Mod, Args, Options)
+%%% start_link(Mod, Args, Options)
+%%% start_link(Name, Mod, Args, Options) where:
+%%%    Name ::= {local, atom()} | {global, atom()}
+%%%    Mod  ::= atom(), callback module implementing the 'real' server
+%%%    Args ::= term(), init arguments (to Mod:init/1)
+%%%    Options ::= [{timeout, Timeout} | {debug, [Flag]}]
+%%%      Flag ::= trace | log | {logfile, File} | statistics | debug
+%%%          (debug == log && statistics)
+%%% Returns: {ok, Pid} |
+%%%          {error, {already_started, Pid}} |
+%%%          {error, Reason}
+%%% -----------------------------------------------------------------
+start(Mod, Args, Options) ->
+    gen:start(?MODULE, nolink, Mod, Args, Options).
+
+start(Name, Mod, Args, Options) ->
+    gen:start(?MODULE, nolink, Name, Mod, Args, Options).
+
+start_link(Mod, Args, Options) ->
+    gen:start(?MODULE, link, Mod, Args, Options).
+
+start_link(Name, Mod, Args, Options) ->
+    gen:start(?MODULE, link, Name, Mod, Args, Options).
+
+
+%% -----------------------------------------------------------------
+%% Make a call to a generic server.
+%% If the server is located at another node, that node will
+%% be monitored.
+%% If the client is trapping exits and is linked server termination
+%% is handled here (? Shall we do that here (or rely on timeouts) ?).
+%% ----------------------------------------------------------------- 
+call(Name, Request) ->
+    case catch gen:call(Name, '$gen_call', Request) of
+	{ok,Res} ->
+	    Res;
+	{'EXIT',Reason} ->
+	    exit({Reason, {?MODULE, call, [Name, Request]}})
+    end.
+
+call(Name, Request, Timeout) ->
+    case catch gen:call(Name, '$gen_call', Request, Timeout) of
+	{ok,Res} ->
+	    Res;
+	{'EXIT',Reason} ->
+	    exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
+    end.
+
+pcall(Name, Priority, Request) ->
+    case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of
+	{ok,Res} ->
+	    Res;
+	{'EXIT',Reason} ->
+	    exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}})
+    end.
+
+pcall(Name, Priority, Request, Timeout) ->
+    case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of
+	{ok,Res} ->
+	    Res;
+	{'EXIT',Reason} ->
+	    exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}})
+    end.
+
+%% -----------------------------------------------------------------
+%% Make a cast to a generic server.
+%% -----------------------------------------------------------------
+cast({global,Name}, Request) ->
+    catch global:send(Name, cast_msg(Request)),
+    ok;
+cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> 
+    do_cast(Dest, Request);
+cast(Dest, Request) when is_atom(Dest) ->
+    do_cast(Dest, Request);
+cast(Dest, Request) when is_pid(Dest) ->
+    do_cast(Dest, Request).
+
+do_cast(Dest, Request) -> 
+    do_send(Dest, cast_msg(Request)),
+    ok.
+    
+cast_msg(Request) -> {'$gen_cast',Request}.
+
+pcast({global,Name}, Priority, Request) ->
+    catch global:send(Name, cast_msg(Priority, Request)),
+    ok;
+pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) -> 
+    do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_atom(Dest) ->
+    do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_pid(Dest) ->
+    do_cast(Dest, Priority, Request).
+
+do_cast(Dest, Priority, Request) -> 
+    do_send(Dest, cast_msg(Priority, Request)),
+    ok.
+    
+cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
+
+%% -----------------------------------------------------------------
+%% Send a reply to the client.
+%% -----------------------------------------------------------------
+reply({To, Tag}, Reply) ->
+    catch To ! {Tag, Reply}.
+
+%% ----------------------------------------------------------------- 
+%% Asyncronous broadcast, returns nothing, it's just send'n prey
+%%-----------------------------------------------------------------  
+abcast(Name, Request) when is_atom(Name) ->
+    do_abcast([node() | nodes()], Name, cast_msg(Request)).
+
+abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
+    do_abcast(Nodes, Name, cast_msg(Request)).
+
+do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
+    do_send({Name,Node},Msg),
+    do_abcast(Nodes, Name, Msg);
+do_abcast([], _,_) -> abcast.
+
+%%% -----------------------------------------------------------------
+%%% Make a call to servers at several nodes.
+%%% Returns: {[Replies],[BadNodes]}
+%%% A Timeout can be given
+%%% 
+%%% A middleman process is used in case late answers arrives after
+%%% the timeout. If they would be allowed to glog the callers message
+%%% queue, it would probably become confused. Late answers will 
+%%% now arrive to the terminated middleman and so be discarded.
+%%% -----------------------------------------------------------------
+multi_call(Name, Req)
+  when is_atom(Name) ->
+    do_multi_call([node() | nodes()], Name, Req, infinity).
+
+multi_call(Nodes, Name, Req) 
+  when is_list(Nodes), is_atom(Name) ->
+    do_multi_call(Nodes, Name, Req, infinity).
+
+multi_call(Nodes, Name, Req, infinity) ->
+    do_multi_call(Nodes, Name, Req, infinity);
+multi_call(Nodes, Name, Req, Timeout) 
+  when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
+    do_multi_call(Nodes, Name, Req, Timeout).
+
+
+%%-----------------------------------------------------------------
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ 
+%%   
+%% Description: Makes an existing process into a gen_server. 
+%%              The calling process will enter the gen_server receive 
+%%              loop and become a gen_server process.
+%%              The process *must* have been started using one of the 
+%%              start functions in proc_lib, see proc_lib(3). 
+%%              The user is responsible for any initialization of the 
+%%              process, including registering a name for it.
+%%-----------------------------------------------------------------
+enter_loop(Mod, Options, State) ->
+    enter_loop(Mod, Options, State, self(), infinity).
+
+enter_loop(Mod, Options, State, ServerName = {_, _}) ->
+    enter_loop(Mod, Options, State, ServerName, infinity);
+
+enter_loop(Mod, Options, State, Timeout) ->
+    enter_loop(Mod, Options, State, self(), Timeout).
+
+enter_loop(Mod, Options, State, ServerName, Timeout) ->
+    Name = get_proc_name(ServerName),
+    Parent = get_parent(),
+    Debug = debug_options(Name, Options),
+    Queue = priority_queue:new(),
+    {Timeout1, TimeoutState} = build_timeout_state(Timeout),
+    loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, Debug).
+
+%%%========================================================================
+%%% Gen-callback functions
+%%%========================================================================
+
+%%% ---------------------------------------------------
+%%% Initiate the new process.
+%%% Register the name using the Rfunc function
+%%% Calls the Mod:init/Args function.
+%%% Finally an acknowledge is sent to Parent and the main
+%%% loop is entered.
+%%% ---------------------------------------------------
+init_it(Starter, self, Name, Mod, Args, Options) ->
+    init_it(Starter, self(), Name, Mod, Args, Options);
+init_it(Starter, Parent, Name0, Mod, Args, Options) ->
+    Name = name(Name0),
+    Debug = debug_options(Name, Options),
+    Queue = priority_queue:new(),
+    case catch Mod:init(Args) of
+	{ok, State} ->
+	    proc_lib:init_ack(Starter, {ok, self()}), 	    
+	    loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
+	{ok, State, Timeout} ->
+	    proc_lib:init_ack(Starter, {ok, self()}),
+            {Timeout1, TimeoutState} = build_timeout_state(Timeout),
+	    loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue,
+                 Debug);
+	{stop, Reason} ->
+	    %% For consistency, we must make sure that the
+	    %% registered name (if any) is unregistered before
+	    %% the parent process is notified about the failure.
+	    %% (Otherwise, the parent process could get
+	    %% an 'already_started' error if it immediately
+	    %% tried starting the process again.)
+	    unregister_name(Name0),
+	    proc_lib:init_ack(Starter, {error, Reason}),
+	    exit(Reason);
+	ignore ->
+	    unregister_name(Name0),
+	    proc_lib:init_ack(Starter, ignore),
+	    exit(normal);
+	{'EXIT', Reason} ->
+	    unregister_name(Name0),
+	    proc_lib:init_ack(Starter, {error, Reason}),
+	    exit(Reason);
+	Else ->
+	    Error = {bad_return_value, Else},
+	    proc_lib:init_ack(Starter, {error, Error}),
+	    exit(Error)
+    end.
+
+name({local,Name}) -> Name;
+name({global,Name}) -> Name;
+%% name(Pid) when is_pid(Pid) -> Pid;
+%% when R11 goes away, drop the line beneath and uncomment the line above
+name(Name) -> Name.
+
+unregister_name({local,Name}) ->
+    _ = (catch unregister(Name));
+unregister_name({global,Name}) ->
+    _ = global:unregister_name(Name);
+unregister_name(Pid) when is_pid(Pid) ->
+    Pid.
+
+build_timeout_state(Timeout) ->
+    case Timeout of
+        {binary, Min} -> {binary,  {Min, Min, undefined}};
+        _             -> {Timeout, undefined}
+    end.
+
+%%%========================================================================
+%%% Internal functions
+%%%========================================================================
+%%% ---------------------------------------------------
+%%% The MAIN loop.
+%%% ---------------------------------------------------
+loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
+    proc_lib:hibernate(?MODULE,wake_hib,
+                       [Parent, Name, State, Mod, undefined, Queue, Debug]);
+loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue,
+     Debug) ->
+    proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod,
+                                         {Current, Min, now()}, Queue, Debug]);
+loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+    receive
+        Input -> loop(Parent, Name, State, Mod,
+                      Time, TimeoutState, in(Input, Queue), Debug)
+    after 0 ->
+            process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
+                             Queue, Debug, false)
+    end.
+
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
+                 Debug, Hib) ->
+    case priority_queue:out(Queue) of
+        {{value, Msg}, Queue1} ->
+            process_msg(Parent, Name, State, Mod,
+                        Time, TimeoutState, Queue1, Debug, Hib, Msg);
+        {empty, Queue1} ->
+            Time1 = case {Time, TimeoutState} of
+                        {binary, {Current, _Min, undefined}} -> Current;
+                        _ -> Time
+                    end,
+            receive
+                Input ->
+                    loop(Parent, Name, State, Mod,
+                         Time, TimeoutState, in(Input, Queue1), Debug)
+            after Time1 ->
+                    process_msg(Parent, Name, State, Mod,
+                                Time, TimeoutState, Queue1, Debug, Hib, timeout)
+            end
+    end.
+
+wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+    Msg = receive
+	      Input ->
+		  Input
+	  end,
+    TimeoutState1 = adjust_hibernate_after(TimeoutState),
+    process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1,
+                     in(Msg, Queue), Debug, true).
+
+adjust_hibernate_after(undefined) ->
+    undefined;
+adjust_hibernate_after({Current, Min, HibernatedAt}) ->
+    NapLengthMicros = timer:now_diff(now(), HibernatedAt),
+    CurrentMicros = Current * 1000,
+    LowTargetMicros = CurrentMicros * 4,
+    HighTargetMicros = LowTargetMicros * 4,
+    if
+        NapLengthMicros < LowTargetMicros ->
+            %% nap was too short, don't go to sleep as soon
+            {Current * 2, Min, undefined};
+
+        NapLengthMicros > HighTargetMicros ->
+            %% nap was long, try going to sleep sooner
+            {lists:max([Min, round(Current / 2)]), Min, undefined};
+
+        true ->
+            %% nap and timeout seem to be in the right relationship. stay here
+            {Current, Min, undefined}
+    end.
+
+in({'$gen_pcast', {Priority, Msg}}, Queue) ->
+    priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
+in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
+    priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
+in(Input, Queue) ->
+    priority_queue:in(Input, Queue).
+
+process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
+            Debug, _Hib, Msg) ->
+    case Msg of
+	{system, From, Req} ->
+	    sys:handle_system_msg
+              (Req, From, Parent, ?MODULE, Debug,
+               [Name, State, Mod, Time, TimeoutState, Queue]);
+        %% gen_server puts Hib on the end as the 7th arg, but that
+        %% version of the function seems not to be documented so
+        %% leaving out for now.
+	{'EXIT', Parent, Reason} ->
+	    terminate(Reason, Name, Msg, Mod, State, Debug);
+	_Msg when Debug =:= [] ->
+	    handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue);
+	_Msg ->
+	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, 
+				      Name, {in, Msg}),
+	    handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue,
+                       Debug1)
+    end.
+
+%%% ---------------------------------------------------
+%%% Send/recive functions
+%%% ---------------------------------------------------
+do_send(Dest, Msg) ->
+    catch erlang:send(Dest, Msg).
+
+do_multi_call(Nodes, Name, Req, infinity) ->
+    Tag = make_ref(),
+    Monitors = send_nodes(Nodes, Name, Tag, Req),
+    rec_nodes(Tag, Monitors, Name, undefined);
+do_multi_call(Nodes, Name, Req, Timeout) ->
+    Tag = make_ref(),
+    Caller = self(),
+    Receiver =
+	spawn(
+	  fun() ->
+		  %% Middleman process. Should be unsensitive to regular
+		  %% exit signals. The sychronization is needed in case
+		  %% the receiver would exit before the caller started
+		  %% the monitor.
+		  process_flag(trap_exit, true),
+		  Mref = erlang:monitor(process, Caller),
+		  receive
+		      {Caller,Tag} ->
+			  Monitors = send_nodes(Nodes, Name, Tag, Req),
+			  TimerId = erlang:start_timer(Timeout, self(), ok),
+			  Result = rec_nodes(Tag, Monitors, Name, TimerId),
+			  exit({self(),Tag,Result});
+		      {'DOWN',Mref,_,_,_} ->
+			  %% Caller died before sending us the go-ahead.
+			  %% Give up silently.
+			  exit(normal)
+		  end
+	  end),
+    Mref = erlang:monitor(process, Receiver),
+    Receiver ! {self(),Tag},
+    receive
+	{'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
+	    Result;
+	{'DOWN',Mref,_,_,Reason} ->
+	    %% The middleman code failed. Or someone did 
+	    %% exit(_, kill) on the middleman process => Reason==killed
+	    exit(Reason)
+    end.
+
+send_nodes(Nodes, Name, Tag, Req) ->
+    send_nodes(Nodes, Name, Tag, Req, []).
+
+send_nodes([Node|Tail], Name, Tag, Req, Monitors)
+  when is_atom(Node) ->
+    Monitor = start_monitor(Node, Name),
+    %% Handle non-existing names in rec_nodes.
+    catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
+    send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
+send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
+    %% Skip non-atom Node
+    send_nodes(Tail, Name, Tag, Req, Monitors);
+send_nodes([], _Name, _Tag, _Req, Monitors) -> 
+    Monitors.
+
+%% Against old nodes:
+%% If no reply has been delivered within 2 secs. (per node) check that
+%% the server really exists and wait for ever for the answer.
+%%
+%% Against contemporary nodes:
+%% Wait for reply, server 'DOWN', or timeout from TimerId.
+
+rec_nodes(Tag, Nodes, Name, TimerId) -> 
+    rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
+
+rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
+    receive
+	{'DOWN', R, _, _, _} ->
+	    rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
+	{{Tag, N}, Reply} ->  %% Tag is bound !!!
+	    unmonitor(R), 
+	    rec_nodes(Tag, Tail, Name, Badnodes, 
+		      [{N,Reply}|Replies], Time, TimerId);
+	{timeout, TimerId, _} ->	
+	    unmonitor(R),
+	    %% Collect all replies that already have arrived
+	    rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+    end;
+rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
+    %% R6 node
+    receive
+	{nodedown, N} ->
+	    monitor_node(N, false),
+	    rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
+	{{Tag, N}, Reply} ->  %% Tag is bound !!!
+	    receive {nodedown, N} -> ok after 0 -> ok end,
+	    monitor_node(N, false),
+	    rec_nodes(Tag, Tail, Name, Badnodes,
+		      [{N,Reply}|Replies], 2000, TimerId);
+	{timeout, TimerId, _} ->	
+	    receive {nodedown, N} -> ok after 0 -> ok end,
+	    monitor_node(N, false),
+	    %% Collect all replies that already have arrived
+	    rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
+    after Time ->
+	    case rpc:call(N, erlang, whereis, [Name]) of
+		Pid when is_pid(Pid) -> % It exists try again.
+		    rec_nodes(Tag, [N|Tail], Name, Badnodes,
+			      Replies, infinity, TimerId);
+		_ -> % badnode
+		    receive {nodedown, N} -> ok after 0 -> ok end,
+		    monitor_node(N, false),
+		    rec_nodes(Tag, Tail, Name, [N|Badnodes],
+			      Replies, 2000, TimerId)
+	    end
+    end;
+rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
+    case catch erlang:cancel_timer(TimerId) of
+	false ->  % It has already sent it's message
+	    receive
+		{timeout, TimerId, _} -> ok
+	    after 0 ->
+		    ok
+	    end;
+	_ -> % Timer was cancelled, or TimerId was 'undefined'
+	    ok
+    end,
+    {Replies, Badnodes}.
+
+%% Collect all replies that already have arrived
+rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
+    receive
+	{'DOWN', R, _, _, _} ->
+	    rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+	{{Tag, N}, Reply} -> %% Tag is bound !!!
+	    unmonitor(R),
+	    rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+    after 0 ->
+	    unmonitor(R),
+	    rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+    end;
+rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
+    %% R6 node
+    receive
+	{nodedown, N} ->
+	    monitor_node(N, false),
+	    rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+	{{Tag, N}, Reply} ->  %% Tag is bound !!!
+	    receive {nodedown, N} -> ok after 0 -> ok end,
+	    monitor_node(N, false),
+	    rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+    after 0 ->
+	    receive {nodedown, N} -> ok after 0 -> ok end,
+	    monitor_node(N, false),
+	    rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+    end;
+rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
+    {Replies, Badnodes}.
+
+
+%%% ---------------------------------------------------
+%%% Monitor functions
+%%% ---------------------------------------------------
+
+start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
+    if node() =:= nonode@nohost, Node =/= nonode@nohost ->
+	    Ref = make_ref(),
+	    self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
+	    {Node, Ref};
+       true ->
+	    case catch erlang:monitor(process, {Name, Node}) of
+		{'EXIT', _} ->
+		    %% Remote node is R6
+		    monitor_node(Node, true),
+		    Node;
+		Ref when is_reference(Ref) ->
+		    {Node, Ref}
+	    end
+    end.
+
+%% Cancels a monitor started with Ref=erlang:monitor(_, _).
+unmonitor(Ref) when is_reference(Ref) ->
+    erlang:demonitor(Ref),
+    receive
+	{'DOWN', Ref, _, _, _} ->
+	    true
+    after 0 ->
+	    true
+    end.
+
+%%% ---------------------------------------------------
+%%% Message handling functions
+%%% ---------------------------------------------------
+
+dispatch({'$gen_cast', Msg}, Mod, State) ->
+    Mod:handle_cast(Msg, State);
+dispatch(Info, Mod, State) ->
+    Mod:handle_info(Info, State).
+
+handle_msg({'$gen_call', From, Msg},
+           Parent, Name, State, Mod, TimeoutState, Queue) ->
+    case catch Mod:handle_call(Msg, From, State) of
+	{reply, Reply, NState} ->
+	    reply(From, Reply),
+	    loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
+	{reply, Reply, NState, Time1} ->
+	    reply(From, Reply),
+	    loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+	{noreply, NState} ->
+	    loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
+	{noreply, NState, Time1} ->
+	    loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+	{stop, Reason, Reply, NState} ->
+	    {'EXIT', R} = 
+		(catch terminate(Reason, Name, Msg, Mod, NState, [])),
+	    reply(From, Reply),
+	    exit(R);
+	Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State,
+                                     TimeoutState, Queue)
+    end;
+handle_msg(Msg,
+           Parent, Name, State, Mod, TimeoutState, Queue) ->
+    Reply = (catch dispatch(Msg, Mod, State)),
+    handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+                        TimeoutState, Queue).
+
+handle_msg({'$gen_call', From, Msg},
+           Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+    case catch Mod:handle_call(Msg, From, State) of
+	{reply, Reply, NState} ->
+	    Debug1 = reply(Name, From, Reply, NState, Debug),
+	    loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+                 Debug1);
+	{reply, Reply, NState, Time1} ->
+	    Debug1 = reply(Name, From, Reply, NState, Debug),
+	    loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+	{noreply, NState} ->
+	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+				      {noreply, NState}),
+	    loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+                 Debug1);
+	{noreply, NState, Time1} ->
+	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+				      {noreply, NState}),
+	    loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+	{stop, Reason, Reply, NState} ->
+	    {'EXIT', R} = 
+		(catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
+	    reply(Name, From, Reply, NState, Debug),
+	    exit(R);
+	Other ->
+	    handle_common_reply(Other, Parent, Name, Msg, Mod, State,
+                                TimeoutState, Queue, Debug)
+    end;
+handle_msg(Msg,
+           Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+    Reply = (catch dispatch(Msg, Mod, State)),
+    handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+                        TimeoutState, Queue, Debug).
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+                    TimeoutState, Queue) ->
+    case Reply of
+	{noreply, NState} ->
+	    loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
+	{noreply, NState, Time1} ->
+	    loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+	{stop, Reason, NState} ->
+	    terminate(Reason, Name, Msg, Mod, NState, []);
+	{'EXIT', What} ->
+	    terminate(What, Name, Msg, Mod, State, []);
+	_ ->
+	    terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
+    end.
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
+                    Debug) ->
+    case Reply of
+	{noreply, NState} ->
+	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+				      {noreply, NState}),
+	    loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+                 Debug1);
+	{noreply, NState, Time1} ->
+	    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+				      {noreply, NState}),
+	    loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+	{stop, Reason, NState} ->
+	    terminate(Reason, Name, Msg, Mod, NState, Debug);
+	{'EXIT', What} ->
+	    terminate(What, Name, Msg, Mod, State, Debug);
+	_ ->
+	    terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug)
+    end.
+
+reply(Name, {To, Tag}, Reply, State, Debug) ->
+    reply({To, Tag}, Reply),
+    sys:handle_debug(Debug, {?MODULE, print_event}, Name, 
+		     {out, Reply, To, State} ).
+
+
+%%-----------------------------------------------------------------
+%% Callback functions for system messages handling.
+%%-----------------------------------------------------------------
+system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
+    loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
+
+-ifdef(use_specs).
+-spec system_terminate(_, _, _, [_]) -> no_return().
+-endif.
+
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time,
+                                          _TimeoutState, _Queue]) ->
+    terminate(Reason, Name, [], Mod, State, Debug).
+
+system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
+                   OldVsn, Extra) ->
+    case catch Mod:code_change(OldVsn, State, Extra) of
+	{ok, NewState} ->
+            {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]};
+	Else ->
+            Else
+    end.
+
+%%-----------------------------------------------------------------
+%% Format debug messages.  Print them as the call-back module sees
+%% them, not as the real erlang messages.  Use trace for that.
+%%-----------------------------------------------------------------
+print_event(Dev, {in, Msg}, Name) ->
+    case Msg of
+	{'$gen_call', {From, _Tag}, Call} ->
+	    io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
+		      [Name, Call, From]);
+	{'$gen_cast', Cast} ->
+	    io:format(Dev, "*DBG* ~p got cast ~p~n",
+		      [Name, Cast]);
+	_ ->
+	    io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
+    end;
+print_event(Dev, {out, Msg, To, State}, Name) ->
+    io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", 
+	      [Name, Msg, To, State]);
+print_event(Dev, {noreply, State}, Name) ->
+    io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
+print_event(Dev, Event, Name) ->
+    io:format(Dev, "*DBG* ~p dbg  ~p~n", [Name, Event]).
+
+
+%%% ---------------------------------------------------
+%%% Terminate the server.
+%%% ---------------------------------------------------
+
+terminate(Reason, Name, Msg, Mod, State, Debug) ->
+    case catch Mod:terminate(Reason, State) of
+	{'EXIT', R} ->
+	    error_info(R, Name, Msg, State, Debug),
+	    exit(R);
+	_ ->
+	    case Reason of
+		normal ->
+		    exit(normal);
+		shutdown ->
+		    exit(shutdown);
+		{shutdown,_}=Shutdown ->
+		    exit(Shutdown);
+		_ ->
+		    error_info(Reason, Name, Msg, State, Debug),
+		    exit(Reason)
+	    end
+    end.
+
+error_info(_Reason, application_controller, _Msg, _State, _Debug) ->
+    %% OTP-5811 Don't send an error report if it's the system process
+    %% application_controller which is terminating - let init take care
+    %% of it instead
+    ok;
+error_info(Reason, Name, Msg, State, Debug) ->
+    Reason1 = 
+	case Reason of
+	    {undef,[{M,F,A}|MFAs]} ->
+		case code:is_loaded(M) of
+		    false ->
+			{'module could not be loaded',[{M,F,A}|MFAs]};
+		    _ ->
+			case erlang:function_exported(M, F, length(A)) of
+			    true ->
+				Reason;
+			    false ->
+				{'function not exported',[{M,F,A}|MFAs]}
+			end
+		end;
+	    _ ->
+		Reason
+	end,    
+    format("** Generic server ~p terminating \n"
+           "** Last message in was ~p~n"
+           "** When Server state == ~p~n"
+           "** Reason for termination == ~n** ~p~n",
+	   [Name, Msg, State, Reason1]),
+    sys:print_log(Debug),
+    ok.
+
+%%% ---------------------------------------------------
+%%% Misc. functions.
+%%% ---------------------------------------------------
+
+opt(Op, [{Op, Value}|_]) ->
+    {ok, Value};
+opt(Op, [_|Options]) ->
+    opt(Op, Options);
+opt(_, []) ->
+    false.
+
+debug_options(Name, Opts) ->
+    case opt(debug, Opts) of
+	{ok, Options} -> dbg_options(Name, Options);
+	_ -> dbg_options(Name, [])
+    end.
+
+dbg_options(Name, []) ->
+    Opts = 
+	case init:get_argument(generic_debug) of
+	    error ->
+		[];
+	    _ ->
+		[log, statistics]
+	end,
+    dbg_opts(Name, Opts);
+dbg_options(Name, Opts) ->
+    dbg_opts(Name, Opts).
+
+dbg_opts(Name, Opts) ->
+    case catch sys:debug_options(Opts) of
+	{'EXIT',_} ->
+	    format("~p: ignoring erroneous debug options - ~p~n",
+		   [Name, Opts]),
+	    [];
+	Dbg ->
+	    Dbg
+    end.
+
+get_proc_name(Pid) when is_pid(Pid) ->
+    Pid;
+get_proc_name({local, Name}) ->
+    case process_info(self(), registered_name) of
+	{registered_name, Name} ->
+	    Name;
+	{registered_name, _Name} ->
+	    exit(process_not_registered);
+	[] ->
+	    exit(process_not_registered)
+    end;    
+get_proc_name({global, Name}) ->
+    case global:safe_whereis_name(Name) of
+	undefined ->
+	    exit(process_not_registered_globally);
+	Pid when Pid =:= self() ->
+	    Name;
+	_Pid ->
+	    exit(process_not_registered_globally)
+    end.
+
+get_parent() ->
+    case get('$ancestors') of
+	[Parent | _] when is_pid(Parent)->
+            Parent;
+        [Parent | _] when is_atom(Parent)->
+            name_to_pid(Parent);
+	_ ->
+	    exit(process_was_not_started_by_proc_lib)
+    end.
+
+name_to_pid(Name) ->
+    case whereis(Name) of
+	undefined ->
+	    case global:safe_whereis_name(Name) of
+		undefined ->
+		    exit(could_not_find_registerd_name);
+		Pid ->
+		    Pid
+	    end;
+	Pid ->
+	    Pid
+    end.
+
+%%-----------------------------------------------------------------
+%% Status information
+%%-----------------------------------------------------------------
+format_status(Opt, StatusData) ->
+    [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time,
+                                      TimeoutState, Queue]] =
+        StatusData,
+    NameTag = if is_pid(Name) ->
+		      pid_to_list(Name);
+		 is_atom(Name) ->
+		      Name
+	      end,
+    Header = lists:concat(["Status for generic server ", NameTag]),
+    Log = sys:get_debug(log, Debug, []),
+    Specfic = 
+	case erlang:function_exported(Mod, format_status, 2) of
+	    true ->
+		case catch Mod:format_status(Opt, [PDict, State]) of
+		    {'EXIT', _} -> [{data, [{"State", State}]}];
+		    Else -> Else
+		end;
+	    _ ->
+		[{data, [{"State", State}]}]
+	end,
+    Specfic1 = case TimeoutState of
+                   undefined -> Specfic;
+                   {Current, Min, undefined} ->
+                       [ {"Binary Timeout Current and Min", {Current, Min}}
+                       | Specfic]
+               end,
+    [{header, Header},
+     {data, [{"Status", SysState},
+	     {"Parent", Parent},
+	     {"Logged events", Log},
+             {"Queued messages", priority_queue:to_list(Queue)}]} |
+     Specfic1].

apps/riak/src/jaywalker_resource.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+
+
+%% @author Bryan Fink <bryan@basho.com>
+%% @doc Jaywalker resource provides a limited interface to jiak object
+%%      linkwalking over HTTP.  The interface exposed is:
+%%
+%%      /jaywalker/Bucket/Key[/b,t,acc]
+%%
+%%      where:
+%%
+%%      Bucket/Key tells jaywalker where to start
+%%
+%%      each /b,t,acc segment is a request to follow some links
+%%
+%%      b is a filter on buckets
+%%      t is a filter on tags
+%%      acc is whether or not to return the objects from that step
+%%
+%%      each of b,t,acc may be underscore, to signify wildcard
+%%
+%%      acc is by default '0' (do not return these objects), except
+%%      for the final /b,t,acc segment, for which it is by default '1'
+%%      (return the objects)
+%%
+%%      Return from jaywalker resource is a JSON object with one
+%%      field, "results", which is a list of lists. Each lists in
+%%      "results" is the list of results from the corresponding step
+%%      in the query.
+%%
+%%      so:
+%%
+%%      /jaywalker/foo/123/bar,_,_ : returns all bar objects
+%%      attached to foo 123 {results: [[bar1, bar2, ...]]}
+%%
+%%      /jaywalker/foo/123/bar,_,1/_,_,_ : returns all
+%%      bar objects attached to foo 123, and all objects attached
+%%      to those bar objects {results: [[bar1, bar2, ...], [baz1,
+%%      quux2, ...]]}
+-module(jaywalker_resource).
+-export([init/1,
+         allowed_methods/2,
+         content_types_provided/2,
+         is_authorized/2,
+         resource_exists/2,
+         expires/2,
+         to_json/2,
+         process_post/2]).
+
+-include_lib("webmachine/include/webmachine.hrl").
+
+-record(ctx, {start, cache_secs, jiak_client}).
+
+init(Props) ->
+    {ok, JiakClient} = 
+        case proplists:get_value(riak_local, Props) of
+            true ->
+                jiak:local_client();
+            _ ->
+                Node = proplists:get_value(riak_node, Props),
+                Cookie = proplists:get_value(riak_cookie, Props),
+                erlang:set_cookie(node(), Cookie),
+                jiak:client_connect(Node)
+        end,
+    {ok, #ctx{cache_secs=proplists:get_value(cache_secs, Props, 600),
+              jiak_client=JiakClient}}.
+
+allowed_methods(RD, Ctx) ->
+    {['GET', 'HEAD', 'POST'], RD, Ctx}.
+
+content_types_provided(RD, Ctx) ->
+    {[{"application/json", to_json}], RD, Ctx}.
+
+is_authorized(RD, Ctx) -> {true, RD, Ctx}. %%TODO: auth
+
+expires(RD, Ctx=#ctx{cache_secs=Secs}) ->
+    {calendar:gregorian_seconds_to_datetime(
+       Secs+calendar:datetime_to_gregorian_seconds(
+              calendar:universal_time())),
+     RD, Ctx}.
+
+resource_exists(RD, Ctx=#ctx{jiak_client=JiakClient}) ->
+    Bucket = list_to_binary(mochiweb_util:unquote(
+                              wrq:path_info(bucket, RD))),
+    Key = list_to_binary(mochiweb_util:unquote(
+                           wrq:path_info(key, RD))),
+    case JiakClient:get(Bucket, Key, 2) of
+        {ok, Start} ->
+            {true, RD, Ctx#ctx{start=Start}};
+        _ ->
+            {false, RD, Ctx}
+    end.
+
+to_json(RD, Ctx=#ctx{start=Start, jiak_client=JiakClient}) ->
+    Results = execute_query(JiakClient, [Start], extract_query(RD)),
+    {mochijson2:encode({struct, [{<<"results">>, Results}]}), RD, Ctx}.
+
+execute_query(_, _, []) -> [];
+execute_query(JiakClient, StartObjects, [{Bucket, Tag, Acc}|RestQuery]) ->
+    StartLinks = lists:append([jiak_object:links(O, Bucket, Tag)
+                               || O <- StartObjects]),
+    StartBKs = [{{B, K},T} || [B, K, T] <- StartLinks],
+    {SegResults,Leftover} =
+        if Acc ->
+                {execute_segment(JiakClient, StartBKs, []), RestQuery};
+        true ->
+            {SafeQuery, [LastSafe|UnsafeQuery]} =
+                lists:splitwith(fun({_,_,SegAcc}) -> not SegAcc end,
+                                RestQuery),
+            {execute_segment(JiakClient, StartBKs,SafeQuery++[LastSafe]),
+             UnsafeQuery}
+     end,
+    [[jiak_resource:apply_read_mask(R) || R <- SegResults]
+     |execute_query(JiakClient,SegResults,Leftover)].
+
+execute_segment(JiakClient, Start, Steps) ->
+    MR = [{link, Bucket, Key, false} || {Bucket, Key, _} <- Steps]
+        ++[{reduce, {modfun, riak_mapreduce, reduce_set_union}, none, false},
+           {map, {modfun, jiak_object, mapreduce_identity}, none, true}],
+    {ok, Objects} = (JiakClient:riak_client()):mapred(Start, MR),
+    %% strip link tags from objects
+    lists:map(fun(O={struct,_}) -> O;
+                 ({O={struct,_},_Tag}) -> O
+              end,
+              Objects).
+
+extract_query(RD) ->
+    Path = wrq:disp_path(RD),
+    Parts = [ string:tokens(P, ",") || P <- string:tokens(Path, "/") ],
+    parts_to_query(Parts, []).
+
+parts_to_query([], Acc) -> lists:reverse(Acc);
+parts_to_query([[B,T,A]|Rest], Acc) ->
+    parts_to_query(Rest,
+                   [{if B == "_" -> '_';
+                        true     -> list_to_binary(mochiweb_util:unquote(B))
+                     end,
+                     if T == "_" -> '_';
+                        true     -> list_to_binary(mochiweb_util:unquote(T))
+                     end,
+                     if A == "1"          -> true;
+                        A == "0"          -> false;
+%%% default of "acc" is 'true' for final step
+                        length(Rest) == 0 -> true;
+%%% default of "acc" is 'false' for intermediate steps
+                        true              -> false
+                     end}
+                    |Acc]).
+
+%% do nothing with POST
+%% just allow client to use it to invalidate browser cache
+process_post(RD, Ctx) ->
+    {true, RD, Ctx}.

apps/riak/src/jiak.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+%% @doc Jiak: Riak utilities, focused on JSON-encoded data.
+%%
+%%      Riak is web-shaped, and one of the most useful
+%%      interchange formats on the web is JSON.  The Jiak libraries
+%%      are intended to provide a simple system for serving data from
+%%      Riak in the JSON format.
+%%
+%%      Jiak objects take the shape of JSON objects of the form:
+%%<pre>
+%%      {
+%%       "bucket":"foo",
+%%       "key":"123",
+%%       "object":{
+%%                 "barField":"bar",
+%%                 "bazField":42
+%%                }
+%%       "links":[
+%%                ["quuxbucket","quuxkey","quuxtag"],
+%%                ["abcbucket","abckey","qbctag"]
+%%               ]
+%%       "vclock":"opaque-riak-vclock",
+%%       "lastmod":"Mon, 03 Aug 2009 18:49:42 GMT"
+%%      }
+%%</pre>
+%%      Within Riak, these fields get broken into their appropriate
+%%      places in riak_object, but using the Jiak interface,
+%%      applications see only mochijson2-encoded objects.  An
+%%      application can choose to either dig through the structure the
+%%      Erlang pattern-match way, or by using the interface provided
+%%      by the jiak_object module.
+%%
+%%      Access to these objects as JSON is provided to clients that
+%%      can speak HTTP by way of the jiak_resource Webmachine resource
+%%      module.  Clients can GET bucket schemas and key lists, as well
+%%      as fetch and store objects.  Semantics about which fields are
+%%      allowed/readable/editable/etc. are provided by user-defined
+%%      Erlang modules named for the bucket of the object.  For an
+%%      example of such a module, see {@link jiak_example}.
+%%
+%%      Further access is provided by means of 'link' mapreduce specs
+%%      and jaywalker_resource.  To support each of these, you should
+%%      open a jiak_client, then call set_bucket/2 with your bucket
+%%      name and the result of default_jiak_bucket_props/0.  Without
+%%      that setting, the mapreduce system won't know how to extract
+%%      links for following.
+-module(jiak).
+
+-export([local_client/0, local_client/1,
+         client_connect/1, client_connect/2]).
+-export([default_jiak_bucket_props/0]).
+-export([standard_sibling_merge/1]).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%% @spec local_client() -> {ok, jiak_client()}|error_term()
+%% @equiv local_client(undefined)
+local_client() -> local_client(undefined).
+
+%% @spec local_client(binary()|undefined) ->
+%%         {ok, jiak_client()}|error_term()
+%% @doc Open a Riak client for modifying Jiak objects.
+%% @see riak:local_client/1
+local_client(ClientId) -> client_connect(node(), ClientId).
+
+%% @spec client_connect(Node :: node())
+%%        -> {ok, Client :: jiak_client()} | exception
+%% @equiv client_connect(Node, undefined)
+client_connect(Node) -> client_connect(Node, undefined).
+
+%% @spec client_connect(Node :: node(), ClientId :: binary()|undefined)
+%%        -> {ok, Client :: jiak_client()} | exception
+%% @doc The usual way to get a client.  Timeout often means either a bad
+%%      cookie or a poorly-connected distributed erlang network.
+client_connect(Node, ClientId) ->
+    case riak:client_connect(Node, ClientId) of
+        {ok, C} -> {ok, jiak_client:new(C)};
+        Error -> Error
+    end.
+
+%% @spec default_jiak_bucket_props() -> [bucket_prop()]
+%% @doc Returns the default additional bucket parameters for Jiak
+%%      buckets, suitable for the second parameter in a call to
+%%      jiak_client:set_bucket/2.
+%%      The only property is currently 'linkfun' which sets up
+%%      the given bucket to support the {link, Bucket, Tag, Acc}
+%%      mapreduce spec.
+default_jiak_bucket_props() ->
+    [{linkfun, {modfun, jiak_object, mapreduce_linkfun}}].
+
+%%
+%% Utility - Merging siblings
+%%
+
+%% @doc basic strategy:
+%%<ul><li> set-union all links
+%%</li><li>create an object with most-recent version of each field,
+%%           according to object's last-modified date - fields missing
+%%           in newer versions will have their values taken from older
+%%           versions
+%%</li></ul>
+standard_sibling_merge(Sibs) ->
+    [{Ms,{{struct,Os},Ls}}|Rest] = sort_sibs(Sibs),
+    lists:foldl(fun merge_sibs/2,
+                {Ms,{{struct,lists:keysort(1,Os)},Ls}},
+                Rest).
+
+%% @private
+sort_sibs(Sibs) ->
+    lists:sort(
+      fun({MD1, _}, {MD2, _}) ->
+              riak_util:compare_dates(
+                dict:fetch(<<"X-Riak-Last-Modified">>, MD1),
+                dict:fetch(<<"X-Riak-Last-Modified">>, MD2))
+      end,
+      Sibs).
+
+%% @private
+merge_sibs({Min,  {{struct, Oin},  Lin}},
+           {Macc, {{struct, Oacc}, Lacc}}) ->
+    %% add keys to M, but do not overwrite
+    M = dict:merge(fun(_, Ma, _) -> Ma end, Macc, Min),
+    %% add keys to O, but do not overwrite
+    O = lists:foldl(fun({Field, Value}, Acc) ->
+                            case proplists:is_defined(Field, Acc) of
+                                true -> Acc;
+                                false -> [{Field, Value}|Acc]
+                            end
+                    end,
+                    Oacc, Oin),
+    %% union set of links
+    L = sets:to_list(
+          sets:union(sets:from_list(Lin), sets:from_list(Lacc))),
+    {M, {{struct, O}, L}}.
+
+%%
+%% Testing
+%%
+
+sib_sort_test() ->
+    Dated = fun(D,N) ->
+                    {dict:store(<<"X-Riak-Last-Modified">>,
+                                httpd_util:rfc1123_date(D),
+                                dict:new()),
+                     N}
+            end,
+    A = Dated({{2009,8,28},{14,0,0}}, a),
+    B = Dated({{2009,8,28},{14,0,1}}, b),
+    C = Dated({{2009,8,28},{14,1,0}}, c),
+    D = Dated({{2009,8,28},{15,0,0}}, d),
+    E = Dated({{2009,8,29},{14,0,0}}, e),
+    F = Dated({{2009,9,28},{14,0,0}}, f),
+    G = Dated({{2010,8,28},{14,0,0}}, g),
+    [G,F,E,D,C,B,A] = sort_sibs([D,G,B,A,C,F,E]).
+
+merge_sibs_test_() ->
+    [fun merge_output_structure_t/0,
+     fun merge_metadata_t/0,
+     fun merge_fields_t/0,
+     fun merge_links_t/0].
+
+%% merge didn't fail
+merge_output_structure_t() ->
+    {_CM, {{struct, _CO}, _CL}} =
+        merge_sibs({dict:new(),{{struct,[]},[]}},
+                   {dict:new(),{{struct,[]},[]}}).
+
+%% metadata properly constructed
+merge_metadata_t() ->
+    {CM,_} = merge_sibs({dict:store(b, b, dict:store(c, b, dict:new())),
+                         {{struct,[]},[]}},
+                        {dict:store(a, a, dict:store(c, a, dict:new())),
+                         {{struct,[]},[]}}),
+    3 = dict:size(CM),
+    a = dict:fetch(a, CM),
+    b = dict:fetch(b, CM),
+    a = dict:fetch(c, CM).
+    
+%% fields properly constructed
+merge_fields_t() ->
+    {_,{{struct,CO},_}} = merge_sibs({dict:new(),
+                                      {{struct, [{b,b},{c,b}]},[]}},
+                                     {dict:new(),
+                                      {{struct, [{a,a},{c,a}]},[]}}),
+    3 = length(CO),
+    a = proplists:get_value(a, CO),
+    b = proplists:get_value(b, CO),
+    a = proplists:get_value(c, CO).
+
+%% links properly constructed
+merge_links_t() ->
+    {_,{_,CL}} = merge_sibs({dict:new(),{{struct,[]},[b,c]}},
+                            {dict:new(),{{struct,[]},[a,c]}}),
+    3 = length(CL),
+    true = lists:member(a, CL),
+    true = lists:member(b, CL),
+    true = lists:member(c, CL).
+
+standard_sibling_merge_test() ->
+    A = {dict:store(<<"X-Riak-Last-Modified">>,
+                    httpd_util:rfc1123_date({{2009,8,28},{14,0,0}}),
+                    dict:store(a,a,dict:store(x,a,dict:store(y,a,dict:new())))),
+         {{struct,[{a,a},{x,a},{y,a}]},[a,z]}},
+    B = {dict:store(<<"X-Riak-Last-Modified">>,
+                    httpd_util:rfc1123_date({{2009,8,28},{14,0,1}}),
+                    dict:store(b,b,dict:store(x,b,dict:store(z,b,dict:new())))),
+         {{struct,[{b,b},{x,b},{z,b}]},[b,z]}},
+    C = {dict:store(<<"X-Riak-Last-Modified">>,
+                    httpd_util:rfc1123_date({{2009,8,28},{14,0,2}}),
+                    dict:store(c,c,dict:store(y,c,dict:store(z,c,dict:new())))),
+         {{struct,[{c,c},{y,c},{z,c}]},[c,z]}},
+
+    %% structure okay
+    {MM, {{struct, MF}, ML}} = standard_sibling_merge([A,B,C]),
+    
+    %% metadata okay
+    7 = dict:size(MM),
+    ?assertEqual(httpd_util:rfc1123_date({{2009,8,28},{14,0,2}}),
+                 dict:fetch(<<"X-Riak-Last-Modified">>, MM)),
+    [a,b,c,b,c,c] = [ dict:fetch(Q, MM) || Q <- [a,b,c,x,y,z] ],
+    
+    %% fields okay
+    6 = length(MF),
+    [a,b,c,b,c,c] = [ proplists:get_value(Q, MF) || Q <- [a,b,c,x,y,z] ],
+    
+    %% links okay
+    4 = length(ML),
+    true = lists:all(fun(Q) -> lists:member(Q, ML) end, [a,b,c,z]).

apps/riak/src/jiak.hrl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+%% wildcard for specifying "any field" in
+%% allowed_fields, read_mask, and write_mask
+%% of Jiak bucket schema
+-define(JIAK_SCHEMA_WILDCARD, <<"*">>).

apps/riak/src/jiak_client.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+%% @doc The equivalent of {@link riak_client}, but for the "jiak"
+%%      pattern of access.  Functions defined here are equivalent
+%%      to the like-named functions in riak_client, but expect
+%%      jiak-style paramenters (like binary object keys) and
+%%      return {@link jiak_object}s.
+%% @type jiak_client() = term()
+-module(jiak_client, [RiakClient]).
+-author('Bryan Fink <bryan@basho.com>').
+
+-export([mapred/2,mapred/3]).
+-export([get/3,get/4]).
+-export([put/2,put/3,put/4]).
+-export([delete/3,delete/4]).
+-export([list_keys/1]).
+-export([set_bucket/2,get_bucket/1]).
+-export([reload_all/1]).
+-export([riak_client/0]).
+
+%% @type default_timeout() = 15000
+-define(DEFAULT_TIMEOUT, 15000).
+
+%% @see riak_object:mapred/2
+%% @equiv mapred(Inputs, Query, default_timeout())
+mapred(Inputs,Query) -> mapred(Inputs, Query, ?DEFAULT_TIMEOUT).
+
+%% @see riak_object:mapred/3
+%% @equiv riak_client:mapred(Inputs, Query, Timeout)
+mapred(Inputs,Query,Timeout) ->
+    RiakClient:mapred(Inputs,Query,Timeout).
+
+%% @spec get(riak_object:bucket(), jiak_object:key(), R :: integer()) ->
+%%       {ok, jiak_object()} |
+%%       {error, notfound} |
+%%       {error, timeout} |
+%%       {error, Err :: term()}
+%% @doc Fetch the object at Bucket/Key.  Return a value as soon as R
+%%      nodes have responded with a value or error.
+%% @equiv get(Bucket, Key, R, default_timeout())
+get(Bucket, Key, R) -> get(Bucket, Key, R, ?DEFAULT_TIMEOUT).
+
+%% @spec get(riak_object:riak_object(), jiak_object:key(), R :: integer(),
+%%           TimeoutMillisecs :: integer()) ->
+%%       {ok, jiak_object()} |
+%%       {error, notfound} |
+%%       {error, timeout} |
+%%       {error, Err :: term()}
+%% @doc Fetch the object at Bucket/Key.  Return a value as soon as R
+%%      nodes have responded with a value or error, or TimeoutMillisecs passes.
+get(Bucket, Key, R, Timeout) when is_binary(Bucket), is_binary(Key) ->
+    case RiakClient:get(Bucket, Key, R, Timeout) of
+        {ok, RiakObject} ->
+            JiakObject = jiak_object:from_riak_object(RiakObject),
+            {ok, JiakObject};
+        Error ->
+            Error
+    end.
+
+%% @spec put(JiakObject :: jiak_object(), W :: integer()) ->
+%%        ok |
+%%       {error, too_many_fails} |
+%%       {error, timeout}
+%% @doc Store RObj in the cluster.
+%%      Return as soon as at least W nodes have received the request.
+%% @equiv put(RObj, W, W, default_timeout())
+put(JiakObject, W) -> put(JiakObject, W, W, ?DEFAULT_TIMEOUT).
+
+%% @spec put(JiakObject :: jiak_object(), W :: integer(), RW :: integer()) ->
+%%        ok |
+%%       {error, too_many_fails} |
+%%       {error, timeout}
+%% @doc Store RObj in the cluster.
+%%      Return as soon as at least W nodes have received the request, and
+%%      at least DW nodes have stored it in their storage backend.
+%% @equiv put(Robj, W, DW, default_timeout())
+put(JiakObject, W, DW) -> put(JiakObject, W, DW, ?DEFAULT_TIMEOUT).
+
+%% @spec put(JiakObject :: jiak_object(), W :: integer(), RW :: integer(),
+%%           TimeoutMillisecs :: integer()) ->
+%%        ok |
+%%       {error, too_many_fails} |
+%%       {error, timeout}
+%% @doc Store RObj in the cluster.
+%%      Return as soon as at least W nodes have received the request, and
+%%      at least DW nodes have stored it in their storage backend, or
+%%      TimeoutMillisecs passes.
+put(JiakObject, W, DW, Timeout) ->    
+    RiakObject = jiak_object:to_riak_object(JiakObject),
+    RiakClient:put(RiakObject, W, DW, Timeout).
+
+%% @spec delete(riak_object:bucket(), jiak_object:key(), RW :: integer()) ->
+%%        ok |
+%%       {error, too_many_fails} |
+%%       {error, notfound} |
+%%       {error, timeout} |
+%%       {error, Err :: term()}
+%% @doc Delete the object at Bucket/Key.  Return a value as soon as RW
+%%      nodes have responded with a value or error.
+%% @equiv delete(Bucket, Key, RW, default_timeout())
+delete(Bucket,Key,RW) -> delete(Bucket,Key,RW,?DEFAULT_TIMEOUT).
+
+%% @spec delete(niak_object:bucket(), jiak_object:key(), RW :: integer(),
+%%           TimeoutMillisecs :: integer()) ->
+%%        ok |
+%%       {error, too_many_fails} |
+%%       {error, notfound} |
+%%       {error, timeout} |
+%%       {error, Err :: term()}
+%% @doc Delete the object at Bucket/Key.  Return a value as soon as RW
+%%      nodes have responded with a value or error, or TimeoutMillisecs passes.
+%% @equiv riak_client:delete(Bucket, Key, RW, Timeout)
+delete(Bucket,Key,RW,Timeout) ->
+    RiakClient:delete(Bucket, Key, RW, Timeout).
+
+%% @spec list_keys(riak_object:bucket()) ->
+%%       {ok, [Key :: riak_object:key()]} |
+%%       {error, timeout} |
+%%       {error, Err :: term()}
+%% @doc List the keys known to be present in Bucket.
+%%      Key lists are updated asynchronously, so this may be slightly
+%%      out of date if called immediately after a put or delete.
+%% @equiv riak_client:list_keys(Bucket)
+list_keys(Bucket) -> RiakClient:list_keys(Bucket).
+
+%% @spec set_bucket(riak_object:bucket(), [BucketProp :: {atom(),term()}]) -> ok
+%% @doc Set the given properties for Bucket.
+%% @equiv riak_client:set_bucket(BucketName, BucketProps)
+set_bucket(BucketName,BucketProps) ->
+    RiakClient:set_bucket(BucketName, BucketProps).
+
+%% @spec get_bucket(riak_object:bucket()) -> [BucketProp :: {atom(),term()}]
+%% @doc Get all properties for Bucket.
+%% @equiv riak_client:get_bucket(BucketName)
+get_bucket(BucketName) -> RiakClient:get_bucket(BucketName).
+
+%% @spec reload_all(Module :: atom()) -> term()
+%% @doc Force all Riak nodes to reload Module.
+%%      This is used when loading new modules for map/reduce functionality.
+%% @equiv riak_client:reload_all(Module)
+reload_all(Module) -> RiakClient:reload_all(Module).
+
+%% @spec riak_client() -> riak_client()
+%% @doc Get the riak_client that this jiak_client is using.
+riak_client() -> RiakClient.

apps/riak/src/jiak_context.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+%% @doc An instance of this parameterized module will be passed as the
+%%      Context parameter to functions in Jiak bucket modules.  Those
+%%      moduels can use the set_prop and get_prop to store state in
+%%      this context, allowing threading of state between check_write,
+%%      affect_write, etc.
+-module(jiak_context,[Diff,DataProps]).
+
+-export([diff/0, set_diff/1,
+         set_prop/2, get_prop/1,
+         set_props/1]).
+
+%% @spec diff() -> jiak_resource:diff()
+%% @doc Get the computed modifications to the object being stored,
+%%      as computed by jiak_resource.
+diff() -> Diff.
+
+%% @spec set_diff(jiak_resource:diff()) -> jiak_context()
+%% @doc Set the list of modifications.
+set_diff(NewDiff) ->
+    jiak_context:new(NewDiff,DataProps).
+
+%% @spec set_prop(Key :: term(), Value :: term()) -> jiak_context()
+%% @doc Associate Value with the name Key in this context.  A
+%%      subesquent call of get_prop(Key) on the resulting jiak_context
+%%      will return Value.
+set_prop(Key,Value) ->
+    jiak_context:new(Diff,
+                     [{Key, Value}|proplists:delete(Key, DataProps)]).
+
+%% @spec set_props([{Key :: term(), Value :: term()}]) -> jiak_context()
+%% @doc Associate each Value with each Key in a new jiak_context.
+%% @equiv lists:foldl(fun({K,V},C) -> C:set_prop(K,V) end,
+%%                    Context, List)
+set_props(List) ->
+    jiak_context:new(Diff,
+                     lists:foldl(fun({K, V}, Acc) ->
+                                         [{K,V}|proplists:delete(K, Acc)]
+                                 end,
+                                 DataProps,
+                                 List)).
+
+%% @spec get_prop(term()) -> term()|undefined
+%% @doc Get the Value that was previously associated with Key in a
+%%      call to set_prop/2.  If no value is associated with Key,
+%%      the atom 'undefined' is returned.
+get_prop(Key) -> proplists:get_value(Key,DataProps).

apps/riak/src/jiak_context_tests.erl

+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+
+%%   http://www.apache.org/licenses/LICENSE-2.0
+
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.    
+
+%% @doc Tests for jiak_context.  These must be in a separate module,
+%%      because eunit doesn't work with parameterized modules (due to
+%%      the inability to export zero-arity functions from them).
+-module(jiak_context_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+diff_test() ->
+    A = jiak_context:new([], []),
+    ?assertEqual([], A:diff()),
+    B = A:set_diff([{bogus, diff}]),
+    ?assertEqual([{bogus, diff}], B:diff()).
+
+single_prop_test() ->
+    A = jiak_context:new([], []),
+    B = A:set_prop(foo, bar),
+    ?assertEqual(bar, B:get_prop(foo)),
+    C = B:set_prop(foo, quux),
+    ?assertEqual(quux, C:get_prop(foo)).
+
+multi_prop_test() ->
+    A = jiak_context:new([], []),
+    B = A:set_props([{foo, 1},{bar,2},{baz,3}]),
+    ?assertEqual([1,2,3], [ B:get_prop(P) || P <- [foo, bar, baz] ]),
+    C = B:set_props([{foo, 10},{quux,20}]),
+    ?assertEqual([10,2,3,20],
+                 [ C:get_prop(P) || P <- [foo, bar, baz, quux] ]).
+
+all_together_test() ->