Commits

Andy Gross  committed 2315239 Merge

merge from tip

  • Participants
  • Parent commits 5f76d0c, 92b094d

Comments (0)

Files changed (123)

 01b5275fd2ad22ad36ee837610fdaa16e14bdec0 riak-0.12.0rc1
 96aee914c29f569869f7d9480f6c89f93c226bc3 riak-0.12.0rc2
 c9ad3232e093485988b5666d1dc2d58072bf65eb riak-0.12.1
+0efa139ee9ca19805c7c48a9112b56f1555592ca riak-0.12.0
 ##
 stagedevrel: dev1 dev2 dev3
 	$(foreach dev,$^,\
-	  $(foreach app,$(wildcard apps/*), rm -rf dev/$(dev)/lib/$(shell basename $(app))-* && ln -sf $(abspath $(app)) dev/$(dev)/lib;)\
 	  $(foreach dep,$(wildcard deps/*), rm -rf dev/$(dev)/lib/$(shell basename $(dep))-* && ln -sf $(abspath $(dep)) dev/$(dev)/lib;))
 
 devrel: dev1 dev2 dev3
 	rm -rf dev
 
 stage : rel
-	cd rel/riak/lib && \
-	rm -rf riak_core-* riak_kv-* && \
-	ln -s ../../../apps/riak_core && \
-	ln -s ../../../apps/riak_kv
+	$(foreach dep,$(wildcard deps/*), rm -rf rel/riak/lib/$(shell basename $(dep))-* && ln -sf $(abspath $(dep)) rel/riak/lib;)
 
 ##
 ## Doc targets

File apps/luke/Makefile

-all:
-	./rebar compile
-
-clean:
-	rm -rf tests_ebin docs
-	./rebar clean
-
-test: all
-	@mkdir -p tests_ebin
-	@cd tests;erl -make
-	@erl -noshell -boot start_sasl -pa ebin -pa tests_ebin -s luke -eval 'test_suite:test().' -s init stop
-	@rm -f ebin/test_* ebin/*_tests.erl
-
-#docs: all
-#	@mkdir -p docs
-#	@./build_docs.sh

File apps/luke/ebin/luke.app

-% -*- mode: erlang -*-
-{application, luke,
- [{description,  "Map/Reduce Framework"},
-  {vsn,          "0.2.0"},
-  {modules,      [luke, luke_flow, luke_flow_sup, luke_phase, luke_phase_sup, luke_phases, luke_sup]},
-  {registered,   [luke_flow_sup, luke_phase_sup, luke_sup]},
-  {applications, [kernel, stdlib, sasl]},
-  {mod, {luke, []}}]}.

File apps/luke/rebar

Binary file removed.

File apps/luke/rebar.config

-{erl_opts, [debug_info, fail_on_warning]}.

File apps/luke/src/luke.erl

-%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
-
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
-%% @doc The main entry point for Luke. This module is responsible
-%%      for starting Luke as an OTP application and also
-%%      running new process flows.
-
--module(luke).
-
--behaviour(application).
-
--define(DEFAULT_TIMEOUT, 60000).
-
-%% Application callbacks
--export([start/0,
-         start/2,
-         stop/1]).
-
-%% Public API
--export([new_flow/2,
-         new_flow/3,
-         new_flow/4,
-         new_flow/5,
-         new_flow/6]).
-
-new_flow(FlowId, FlowDesc) ->
-    new_flow(self(), FlowId, FlowDesc, ?DEFAULT_TIMEOUT).
-
-new_flow(Client, FlowId, FlowDesc) ->
-    new_flow(Client, FlowId, FlowDesc, ?DEFAULT_TIMEOUT).
-
-new_flow(Client, FlowId, FlowDesc, Timeout) ->
-    luke_flow_sup:new_flow(Client, FlowId, FlowDesc, Timeout).
-
-new_flow(Node, Client, FlowId, FlowDesc, Timeout) ->
-    new_flow(Node, Client, FlowId, FlowDesc, undefined, Timeout).
-
-new_flow(Node, Client, FlowId, FlowDesc, ResultTransformer, Timeout) ->
-    luke_flow_sup:new_flow(Node, Client, FlowId, FlowDesc, ResultTransformer, Timeout).
-
-
-start() ->
-    application:start(luke).
-
-start(_StartType, _StartArgs) ->
-    luke_sup:start_link().
-
-stop(_State) ->
-    ok.

File apps/luke/src/luke_flow.erl

-%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
-
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
-%% @doc Manages the execution of a flow
--module(luke_flow).
-
--behaviour(gen_fsm).
-
-%% API
--export([start_link/5,
-         add_inputs/2,
-         finish_inputs/1,
-         collect_output/2]).
-
-%% FSM states
--export([get_phases/1,
-         executing/2,
-         executing/3]).
-
-%% gen_fsm callbacks
--export([init/1,
-         handle_event/3,
-         handle_sync_event/4,
-         handle_info/3,
-         terminate/3,
-         code_change/4]).
-
--record(state, {flow_id,
-                fsms,
-                client,
-                flow_timeout,
-                tref,
-                xformer,
-                results=[]}).
-
-%% @doc Add inputs to the flow. Inputs will be sent to the
-%%      first phase
-%% @spec add_inputs(pid(), any()) -> ok
-add_inputs(FlowPid, Inputs) ->
-    gen_fsm:send_event(FlowPid, {inputs, Inputs}).
-
-%% @doc Informs the phases all inputs are complete.
-%% @spec finish_inputs(pid()) -> ok
-finish_inputs(FlowPid) ->
-    gen_fsm:send_event(FlowPid, inputs_done).
-
-%% @doc Collects flow output. This function will block
-%%      until the flow completes or exceeds the flow_timeout.
-%% @spec collect_output(any(), integer()) -> [any()] | {error, any()}
-collect_output(FlowId, Timeout) ->
-    collect_output(FlowId, Timeout, dict:new()).
-
-%% @doc Returns the pids for each phase. Intended for
-%%      testing only
-%% @spec get_phases(pid()) -> [pid()]
-get_phases(FlowPid) ->
-    gen_fsm:sync_send_event(FlowPid, get_phases).
-
-start_link(Client, FlowId, FlowDesc, FlowTransformer, Timeout) when is_list(FlowDesc),
-                                                                    is_pid(Client) ->
-    gen_fsm:start_link(?MODULE, [Client, FlowId, FlowDesc, FlowTransformer, Timeout], []).
-
-init([Client, FlowId, FlowDesc, FlowTransformer, Timeout]) ->
-    process_flag(trap_exit, true),
-    Tref = case Timeout of
-               infinity ->
-                   undefined;
-               _ ->
-                   {ok, T} = timer:send_after(Timeout, flow_timeout),
-                   T
-           end,
-    case start_phases(FlowDesc, Timeout) of
-        {ok, FSMs} ->
-            {ok, executing, #state{fsms=FSMs, flow_id=FlowId, flow_timeout=Timeout, client=Client, xformer=FlowTransformer, tref=Tref}};
-        Error ->
-            {stop, Error}
-    end.
-
-executing({inputs, Inputs}, #state{fsms=[H|_]}=State) ->
-    luke_phases:send_inputs(H, Inputs),
-    {next_state, executing, State};
-executing(inputs_done, #state{fsms=[H|_]}=State) ->
-    luke_phases:send_inputs_done(H),
-    {next_state, executing, State};
-executing(timeout, #state{client=Client, flow_id=FlowId}=State) ->
-    Client ! {flow_results, FlowId, done},
-    {stop, normal, State};
-executing({results, done}, #state{client=Client, flow_id=FlowId}=State) ->
-    Client ! {flow_results, FlowId, done},
-    {stop, normal, State};
-executing({results, PhaseId, Result0}, #state{client=Client, flow_id=FlowId, xformer=XFormer}=State) ->
-    Result = transform_results(XFormer, Result0),
-    Client ! {flow_results, PhaseId, FlowId, Result},
-    {next_state, executing, State}.
-
-executing(get_phases, _From, #state{fsms=FSMs}=State) ->
-    {reply, FSMs, executing, State}.
-
-handle_event(_Event, StateName, State) ->
-    {next_state, StateName, State}.
-
-handle_sync_event(_Event, _From, StateName, State) ->
-    {reply, ignored, StateName, State}.
-
-handle_info(flow_timeout, _StateName, State) ->
-    {stop, flow_timeout, State};
-handle_info({'EXIT', _Pid, normal}, StateName, State) ->
-    {next_state, StateName, State};
-handle_info({'EXIT', _Pid, Reason}, _StateName, #state{flow_id=FlowId, client=Client}=State) ->
-    Client ! {flow_error, FlowId, Reason},
-    {stop, normal, State};
-handle_info(_Info, StateName, State) ->
-    {next_state, StateName, State}.
-
-terminate(_Reason, _StateName, #state{tref=Tref}) ->
-    timer:cancel(Tref),
-    ok.
-
-code_change(_OldVsn, StateName, State, _Extra) ->
-    {ok, StateName, State}.
-
-%% Internal functions
-start_phases(FlowDesc, Timeout) ->
-    start_phases(lists:reverse(FlowDesc), length(FlowDesc) - 1, Timeout, []).
-
-start_phases([], _Id, _Timeout, Accum) ->
-    {ok, Accum};
-start_phases([{PhaseMod, Behaviors, Args}|T], Id, Timeout, Accum) ->
-    NextFSM = next_fsm(Accum),
-    case proplists:get_value(converge, Behaviors) of
-        undefined ->
-            case luke_phase_sup:new_phase(Id, PhaseMod, Behaviors, NextFSM, self(), Timeout, Args) of
-                {ok, Pid} ->
-                    erlang:link(Pid),
-                    start_phases(T, Id - 1, Timeout, [Pid|Accum]);
-                Error ->
-                    Error
-            end;
-        InstanceCount ->
-            Pids = start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, self(), Timeout, Args, InstanceCount),
-            start_phases(T, Id - 1, Timeout, [Pids|Accum])
-    end.
-
-collect_output(FlowId, Timeout, Accum) ->
-    receive
-        {flow_results, FlowId, done} ->
-            {ok, finalize_results(Accum)};
-        {flow_results, PhaseId, FlowId, Results} ->
-            collect_output(FlowId, Timeout, accumulate_results(PhaseId, Results, Accum));
-        {flow_error, FlowId, Error} ->
-            Error
-    after Timeout ->
-            case dict:size(Accum) of
-                0 ->
-                    {error, timeout};
-                _ ->
-                    {ok, finalize_results(Accum)}
-            end
-    end.
-
-next_fsm(Accum) ->
- if
-     length(Accum) == 0 ->
-         undefined;
-     true ->
-         case hd(Accum) of
-             P when is_pid(P) ->
-                 [P];
-             P ->
-                 P
-         end
- end.
-
-start_converging_phases(Id, PhaseMod, Behaviors0, NextFSM, Flow, Timeout, Args, Count) ->
-    Behaviors = [normalize_behavior(B) || B <- Behaviors0],
-    Pids = start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count, []),
-    [Leader|_] = Pids,
-    lists:foreach(fun(P) -> luke_phase:partners(P, Leader, Pids) end, Pids),
-    Pids.
-
-start_converging_phases(_Id, _PhaseMod, _Behaviors, _NextFSM, _Flow, _Timeout, _Args, 0, Accum) ->
-    Accum;
-start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count, Accum) ->
-    case luke_phase_sup:new_phase(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args) of
-        {ok, Pid} ->
-            erlang:link(Pid),
-            start_converging_phases(Id, PhaseMod, Behaviors, NextFSM, Flow, Timeout, Args, Count - 1, [Pid|Accum]);
-        Error ->
-            throw(Error)
-    end.
-
-normalize_behavior({converge, _}) ->
-    converge;
-normalize_behavior(Behavior) ->
-    Behavior.
-
-finalize_results(Accum) ->
-    case [lists:append(R) || {_, R} <- lists:sort(dict:to_list(Accum))] of
-        [R] ->
-            R;
-        R ->
-            R
-    end.
-
-accumulate_results(PhaseId, Results, Accum) ->
-    case dict:find(PhaseId, Accum) of
-        error ->
-            dict:store(PhaseId, [Results], Accum);
-        {ok, PhaseAccum} ->
-            dict:store(PhaseId, [Results|PhaseAccum], Accum)
-    end.
-
-transform_results(undefined, Results) ->
-    Results;
-transform_results(Xformer, Results) when is_list(Results) ->
-    [Xformer(R) || R <- Results];
-transform_results(Xformer, Results) ->
-    Xformer(Results).

File apps/luke/src/luke_flow_sup.erl

-%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
-
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(luke_flow_sup).
-
--behaviour(supervisor).
-
-%% API
--export([start_link/0,
-         new_flow/4,
-         new_flow/6]).
-
-%% Supervisor callbacks
--export([init/1]).
-
-new_flow(Client, FlowId, FlowDesc, Timeout) ->
-    start_child(Client, FlowId, FlowDesc, Timeout).
-
-new_flow(Node, Client, FlowId, FlowDesc, ResultTransformer, Timeout) ->
-    start_child(Node, Client, FlowId, FlowDesc, ResultTransformer, Timeout).
-
-
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-init([]) ->
-    SupFlags = {simple_one_for_one, 0, 1},
-    Process = {undefined,
-               {luke_flow, start_link, []},
-               temporary, brutal_kill, worker, dynamic},
-    {ok, {SupFlags, [Process]}}.
-
-%% Internal functions
-start_child(Client, FlowId, FlowDesc, Timeout) ->
-    supervisor:start_child(?MODULE, [Client, FlowId, FlowDesc, undefined, Timeout]).
-start_child(Node, Client, FlowId, FlowDesc, ResultTransformer, Timeout) ->
-    supervisor:start_child({?MODULE, Node}, [Client, FlowId, FlowDesc, ResultTransformer, Timeout]).

File apps/luke/src/luke_phase.erl

-%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
-
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(luke_phase).
-
--behaviour(gen_fsm).
-
-%% API
--export([start_link/7,
-         complete/0,
-         partners/3]).
-
-%% Behaviour
--export([behaviour_info/1]).
-
-%% States
--export([executing/2]).
-
-%% gen_fsm callbacks
--export([init/1,
-         handle_event/3,
-         handle_sync_event/4,
-         handle_info/3,
-         terminate/3,
-         code_change/4]).
-
--record(state, {id,
-                mod,
-                modstate,
-                converge=false,
-                accumulate=false,
-                lead_partner,
-                partners,
-                next_phases,
-                done_count=1,
-                flow,
-                flow_timeout}).
-
-behaviour_info(callbacks) ->
-  [{init, 1},
-   {handle_timeout, 1},
-   {handle_input, 3},
-   {handle_input_done, 1},
-   {handle_event, 2},
-   {handle_info, 2},
-   {terminate, 2}];
-behaviour_info(_) ->
-    undefined.
-
-start_link(PhaseMod, Id, Behaviors, NextPhases, Flow, Timeout, PhaseArgs) ->
-    gen_fsm:start_link(?MODULE, [Id, PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs], []).
-
-complete() ->
-    gen_fsm:send_event(self(), complete).
-
-partners(PhasePid, Leader, Partners) ->
-    gen_fsm:send_event(PhasePid, {partners, Leader, Partners}).
-
-init([Id, PhaseMod, Behaviors, NextPhases, Flow, Timeout, PhaseArgs]) ->
-    case PhaseMod:init(PhaseArgs) of
-        {ok, ModState} ->
-            Accumulate = lists:member(accumulate, Behaviors),
-            Converge = lists:member(converge, Behaviors),
-            {ok, executing, #state{id=Id, mod=PhaseMod, modstate=ModState, next_phases=NextPhases,
-                                   flow=Flow, accumulate=Accumulate, converge=Converge, flow_timeout=Timeout}};
-        {stop, Reason} ->
-            {stop, Reason}
-    end.
-
-executing({partners, Lead0, Partners0}, #state{converge=true}=State) when is_list(Partners0) ->
-    Me = self(),
-    Lead = case Lead0 of
-               Me ->
-                   undefined;
-               _ ->
-                   erlang:link(Lead0),
-                   Lead0
-           end,
-    Partners = lists:delete(self(), Partners0),
-    DoneCount = if
-                    Lead =:= undefined ->
-                        length(Partners) + 1;
-                    true ->
-                        1
-                end,
-    {next_state, executing, State#state{lead_partner=Lead, partners=Partners, done_count=DoneCount}};
-executing({partners, _, _}, State) ->
-    {stop, {error, no_convergence}, State};
-executing({inputs, Input}, #state{mod=PhaseMod, modstate=ModState, flow_timeout=Timeout}=State) ->
-    handle_callback(PhaseMod:handle_input(Input, ModState, Timeout), State);
-executing(inputs_done, #state{mod=PhaseMod, modstate=ModState, done_count=DoneCount0}=State) ->
-    case DoneCount0 - 1 of
-        0 ->
-            handle_callback(PhaseMod:handle_input_done(ModState), State#state{done_count=0});
-        DoneCount ->
-            {next_state, executing, State#state{done_count=DoneCount}}
-    end;
-executing(complete, #state{lead_partner=Leader}=State) when is_pid(Leader) ->
-    luke_phases:send_inputs_done(Leader),
-    {stop, normal, State};
-executing(complete, #state{flow=Flow, next_phases=Next}=State) ->
-    case Next of
-        undefined ->
-            luke_phases:send_flow_complete(Flow);
-        _ ->
-            luke_phases:send_inputs_done(Next)
-    end,
-    {stop, normal, State};
-executing(timeout, #state{mod=Mod, modstate=ModState}=State) ->
-    handle_callback(Mod:handle_timeout(ModState), State);
-executing(timeout, State) ->
-    {stop, normal, State};
-executing(Event, #state{mod=PhaseMod, modstate=ModState}=State) ->
-    handle_callback(PhaseMod:handle_event(Event, ModState), State).
-
-handle_event(_Event, StateName, State) ->
-    {next_state, StateName, State}.
-
-handle_sync_event(_Event, _From, StateName, State) ->
-    {reply, ignored, StateName, State}.
-handle_info(timeout, executing, #state{mod=Mod, modstate=ModState}=State) ->
-    handle_callback(Mod:handle_timeout(ModState), State);
-handle_info(Info, _StateName, #state{mod=PhaseMod, modstate=ModState}=State) ->
-    handle_callback(PhaseMod:handle_info(Info, ModState), State).
-
-terminate(Reason, _StateName, #state{mod=PhaseMod, modstate=ModState}) ->
-    PhaseMod:terminate(Reason, ModState),
-    ok.
-
-code_change(_OldVsn, StateName, State, _Extra) ->
-    {ok, StateName, State}.
-
-%% Internal functions
-%% Handle callback module return values
-handle_callback({no_output, NewModState}, State) ->
-    {next_state, executing, State#state{modstate=NewModState}};
-handle_callback({no_output, NewModState, PhaseTimeout}, #state{flow_timeout=Timeout}=State) when PhaseTimeout < Timeout ->
-    {next_state, executing, State#state{modstate=NewModState}, PhaseTimeout};
-handle_callback({output, Output, NewModState}, State) ->
-    State1 = route_output(Output, State),
-    {next_state, executing, State1#state{modstate=NewModState}};
-handle_callback({output, Output, NewModState, PhaseTimeout}, #state{flow_timeout=Timeout}=State) when PhaseTimeout < Timeout ->
-    State1 = route_output(Output, State),
-    {next_state, executing, State1#state{modstate=NewModState}, PhaseTimeout};
-handle_callback({stop, Reason, NewModState}, State) ->
-    {stop, Reason, State#state{modstate=NewModState}};
-handle_callback(BadValue, _State) ->
-  throw({error, {bad_return, BadValue}}).
-
-%% Route output to lead when converging
-%% Accumulation is ignored for non-leads of converging phases
-%% since all accumulation is performed in the lead process
-route_output(Output, #state{converge=true, lead_partner=Lead}=State) when is_pid(Lead) ->
-    propagate_inputs([Lead], Output),
-    State;
-
-%% Send output to flow for accumulation and propagate as inputs
-%% to the next phase. Accumulation is only true for the lead
-%% process of a converging phase
-route_output(Output, #state{id=Id, converge=true, accumulate=Accumulate, lead_partner=undefined,
-                            flow=Flow, next_phases=Next}=State) ->
-    if
-        Accumulate =:= true ->
-            luke_phases:send_flow_results(Flow, Id, Output);
-        true ->
-            ok
-    end,
-    RotatedNext = propagate_inputs(Next, Output),
-    State#state{next_phases=RotatedNext};
-
-%% Route output to the next phase. Accumulate output
-%% to the flow if accumulation is turned on.
-route_output(Output, #state{id=Id, converge=false, accumulate=Accumulate, flow=Flow, next_phases=Next} = State) ->
-    if
-        Accumulate =:= true ->
-            luke_phases:send_flow_results(Flow, Id, Output);
-        true ->
-            ok
-    end,
-    RotatedNext = propagate_inputs(Next, Output),
-    State#state{next_phases=RotatedNext}.
-
-propagate_inputs(undefined, _Results) ->
-    undefined;
-propagate_inputs(Next, Results) ->
-    luke_phases:send_inputs(Next, Results).

File apps/luke/src/luke_phase_sup.erl

-%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
-
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(luke_phase_sup).
-
--behaviour(supervisor).
-
-%% API
--export([start_link/0,
-         new_phase/7]).
-
-%% Supervisor callbacks
--export([init/1]).
-
-new_phase(Id, PhaseMod, Behavior, NextPhases, Flow, Timeout, PhaseArgs) when is_atom(PhaseMod),
-                                                                             is_list(PhaseArgs) ->
-    start_child(PhaseMod, [Id, Behavior, NextPhases, Flow, Timeout, PhaseArgs]).
-
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-init([]) ->
-    SupFlags = {simple_one_for_one, 0, 1},
-    Process = {undefined,
-               {luke_phase, start_link, []},
-               temporary, brutal_kill, worker, dynamic},
-    {ok, {SupFlags, [Process]}}.
-
-%% Internal functions
-start_child(ModName, Args) ->
-  supervisor:start_child(?MODULE, [ModName|Args]).

File apps/luke/src/luke_phases.erl

-%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
-
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
-%% @doc Encapsulates the messaging protocol used during flow processing.
-%%      Some of these functions operate on either a single
-%%      phase pid or a list of phase pids.
-%%
-%%      Multiple pids are encountered when a phase has parallel
-%%      instances running. This is triggerd by the 'converge'
-%%      phase behavior.
--module(luke_phases).
-
--export([send_inputs/2,
-         send_inputs_done/1,
-         send_flow_complete/1]).
--export([send_flow_results/3]).
-
-%% @doc Sends inputs to a phase process
-%%      If a phase has multiple processes, inputs
-%%      will be distributed in a round robin fashion.
-%% @spec send_inputs(pid() | [pid()], any()) -> ok
-send_inputs(PhasePids, Inputs) when is_list(PhasePids) ->
-    [H|T] = PhasePids,
-    send_inputs(H, Inputs),
-    T ++ [H];
-send_inputs(PhasePid, Inputs) when is_pid(PhasePid) ->
-    gen_fsm:send_event(PhasePid, {inputs, Inputs}).
-
-%% @doc Signals completion of inputs to a phase
-%%      or a list of phases.
-send_inputs_done(PhasePids) when is_list(PhasePids) ->
-    lists:foreach(fun(Pid) -> send_inputs_done(Pid) end, PhasePids);
-send_inputs_done(PhasePid) when is_pid(PhasePid) ->
-    gen_fsm:send_event(PhasePid, inputs_done).
-
-%% @doc Signal completion of flow to the flow pid
-%%      This is sent by the last process of the last
-%%      phase in the flow
-%% @spec send_flow_complete(pid()) -> ok
-send_flow_complete(FlowPid) ->
-    gen_fsm:send_event(FlowPid, {results, done}).
-
-%% @doc Sends flow results to the flow pid
-%%      This is sent by phases which are configured
-%%      to accumulate their results
-%% @spec send_flow_results(pid(), any(), any()) -> ok
-send_flow_results(FlowPid, Id, Results) ->
-    gen_fsm:send_event(FlowPid, {results, Id, Results}).

File apps/luke/src/luke_sup.erl

-%% Copyright (c) 2010 Basho Technologies, Inc.  All Rights Reserved.
-
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(luke_sup).
-
--behaviour(supervisor).
-
-%% API
--export([start_link/0]).
-
-%% Supervisor callbacks
--export([init/1]).
-
--define(SERVER, ?MODULE).
-
-start_link() ->
-    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
-
-init([]) ->
-    RestartStrategy = one_for_one,
-    MaxRestarts = 1000,
-    MaxSecondsBetweenRestarts = 3600,
-
-    SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
-
-    Restart = permanent,
-    Shutdown = infinity,
-    Type = supervisor,
-
-    PhaseSup = {luke_phase_sup, {luke_phase_sup, start_link, []},
-                Restart, Shutdown, Type, [luke_phase_sup]},
-    FlowSup = {luke_flow_sup, {luke_flow_sup, start_link, []},
-                Restart, Shutdown, Type, [luke_flow_sup]},
-
-    {ok, {SupFlags, [FlowSup, PhaseSup]}}.

File apps/luke/tests/Emakefile

-{"*", [warn_obsolete_guard, warn_unused_import,
-       warn_shadow_vars, warn_export_vars, debug_info,
-       {i, "../include"},
-       {pa, "../ebin"},
-       {outdir, "../tests_ebin"}]}.

File apps/luke/tests/async_phase.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(async_phase).
-
--behaviour(luke_phase).
-
--export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
-         handle_timeout/1, handle_info/2, terminate/2]).
-
--record(state, {}).
-
-init([]) ->
-    {ok, #state{}}.
-
-handle_input(Inputs, State, _Timeout) ->
-    {output, Inputs, State}.
-
-handle_input_done(State) ->
-    luke_phase:complete(),
-    {no_output, State}.
-
-handle_event(_Event, State) ->
-    {no_output, State}.
-
-handle_timeout(State) ->
-    {no_output, State}.
-
-handle_info(_Info, State) ->
-    {no_output, State}.
-
-terminate(_Reason, _State) ->
-    ok.

File apps/luke/tests/lifecycle_tests.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(lifecycle_tests).
-
--include_lib("eunit/include/eunit.hrl").
--include("tests.hrl").
-
-setup_teardown_test_() ->
-    [fun() ->
-             %% Startup/teardown, no input
-             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
-             Phases = test_util:verify_phases(Pid, 2),
-             exit(Pid, kill),
-             timer:sleep(10),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             %% Startup/teardown, infinity timeout, no input
-             {ok, Pid} = luke:new_flow(self(), make_ref(), ?TWO_PHASE_FLOW, infinity),
-             Phases = test_util:verify_phases(Pid, 2),
-             exit(Pid, kill),
-             timer:sleep(10),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             %% Startup/teardown, input w/no end
-             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
-             Phases = test_util:verify_phases(Pid, 2),
-             luke_flow:add_inputs(Pid, [100]),
-             exit(Pid, kill),
-             timer:sleep(10),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             %% Startup/teardown, input w/finish
-             {ok, Pid} = luke:new_flow(make_ref(), ?TWO_PHASE_FLOW),
-             Phases = test_util:verify_phases(Pid, 2),
-             luke_flow:add_inputs(Pid, [100]),
-             luke_flow:add_inputs(Pid, [200]),
-             luke_flow:finish_inputs(Pid),
-             exit(Pid, kill),
-             timer:sleep(10),
-             test_util:assertDead([Pid|Phases]) end].

File apps/luke/tests/map_phase.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(map_phase).
-
--behaviour(luke_phase).
-
--include_lib("eunit/include/eunit.hrl").
-
--export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
-         handle_timeout/1, handle_info/2, terminate/2]).
-
--record(state, {workers=[], done=false}).
-
-init([]) ->
-    {ok, #state{}}.
-
-handle_input(Inputs, #state{workers=Workers}=State, _Timeout) ->
-    Worker = worker(self(), length(Inputs)),
-    {no_output, State#state{workers=[Worker|Workers]}}.
-
-handle_input_done(#state{workers=[]}=State) ->
-    luke_phase:complete(),
-    {no_output, State};
-handle_input_done(State) ->
-    {no_output, State#state{done=true}}.
-
-handle_event({mapexec_results, Worker, Data}, #state{done=Done, workers=Workers0}=State) ->
-    Workers = lists:delete(Worker, Workers0),
-    case Done =:= true andalso length(Workers) == 0 of
-        true ->
-            luke_phase:complete();
-        false ->
-            ok
-    end,
-    {output, Data, State#state{workers=Workers}};
-
-handle_event(_Event, State) ->
-    {no_output, State}.
-
-handle_timeout(State) ->
-    {no_output, State}.
-
-handle_info(_Info, State) ->
-    {no_output, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-worker(Phase, Size) ->
-    spawn(fun() ->
-                  Data = generate_data(Size),
-                  gen_fsm:send_event(Phase, {mapexec_results, self(), Data}) end).
-
-generate_data(Size) ->
-    generate_data(Size, []).
-
-generate_data(0, Accum) ->
-    Accum;
-generate_data(Size, Accum) ->
-    generate_data(Size - 1, [5|Accum]).

File apps/luke/tests/reduce_phase.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(reduce_phase).
-
--behaviour(luke_phase).
-
--export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
-         handle_timeout/1, handle_info/2, terminate/2]).
-
--record(state, {reduced=0}).
-
-init([]) ->
-    {ok, #state{}}.
-
-handle_input(Inputs, #state{reduced=Reduced}=State, _Timeout) ->
-    {no_output, State#state{reduced=lists:sum(Inputs ++ [Reduced])}}.
-
-handle_input_done(#state{reduced=Reduced}=State) ->
-    luke_phase:complete(),
-    {output, [Reduced], State}.
-
-handle_event(_Event, State) ->
-    {no_output, State}.
-
-handle_timeout(State) ->
-    {no_output, State}.
-
-handle_info(_Info, State) ->
-    {no_output, State}.
-
-terminate(_Reason, _State) ->
-    ok.

File apps/luke/tests/results_tests.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(results_tests).
-
--include_lib("eunit/include/eunit.hrl").
--include("tests.hrl").
-
-all_test_() ->
-    [fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_PHASE_FLOW),
-             luke_flow:add_inputs(Pid, ["hello"]),
-             test_util:verify_results(FlowId, none),
-             exit(Pid, kill),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_PHASE_FLOW),
-             luke_flow:add_inputs(Pid, ["hello"]),
-             luke_flow:finish_inputs(Pid),
-             {ok, Results} = test_util:verify_results(FlowId, results),
-             test_util:verify_results(FlowId, done),
-             ?assertMatch(["hello"], Results),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?THREE_PHASE_NESTED_FLOW),
-             luke_flow:add_inputs(Pid, ["hello", "goodbye"]),
-             luke_flow:finish_inputs(Pid),
-             ?assertMatch({ok, [["hello", "goodbye"], ["hello", "goodbye"], ["hello", "goodbye"]]}, luke_flow:collect_output(FlowId, 100)),
-             test_util:verify_results(FlowId, none),
-             test_util:assertDead([Pid|Phases]) end,
-
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?TWO_ASYNC_FLOW),
-             luke_flow:add_inputs(Pid, "testing"),
-             {ok, "testing"} = test_util:verify_results(FlowId, results),
-             luke_flow:add_inputs(Pid, [100, 200]),
-             {ok, [100, 200]} = test_util:verify_results(FlowId, results),
-             luke_flow:finish_inputs(Pid),
-             test_util:verify_results(FlowId, done),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_FLOW),
-             luke_flow:add_inputs(Pid, [a,b]),
-             {ok, [5,5]} = test_util:verify_results(FlowId, results),
-             test_util:verify_results(FlowId, none),
-             luke_flow:add_inputs(Pid, [a,b]),
-             {ok, [5,5]} = test_util:verify_results(FlowId, results),
-             luke_flow:finish_inputs(Pid),
-             test_util:verify_results(FlowId, done),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_FLOW),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:add_inputs(Pid, [a,b]),
-             ?assertMatch({ok, [5,5,5,5]}, luke_flow:collect_output(FlowId, 100)),
-             test_util:verify_results(FlowId, none),
-             luke_flow:finish_inputs(Pid),
-             test_util:verify_results(FlowId, done),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAP_DBL_FLOW),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:add_inputs(Pid, [a,b]),
-             ?assertMatch({ok, [[5,5,5,5],[5,5,5,5]]}, luke_flow:collect_output(FlowId, 100)),
-             test_util:verify_results(FlowId, none),
-             luke_flow:finish_inputs(Pid),
-             test_util:verify_results(FlowId, done),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_FLOW),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:finish_inputs(Pid),
-             ?assertMatch({ok, [20]}, luke_flow:collect_output(FlowId, 500)),
-             test_util:verify_results(FlowId, none),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_FLOW1),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:finish_inputs(Pid),
-             ?assertMatch({ok, [20]}, luke_flow:collect_output(FlowId, 100)),
-             test_util:verify_results(FlowId, none),
-             test_util:assertDead([Pid|Phases]) end,
-     fun() ->
-             {FlowId, Pid, Phases} = test_util:start_flow(?MAPRED_EMPTY),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:add_inputs(Pid, [a,b]),
-             luke_flow:finish_inputs(Pid),
-             ?assertMatch({ok, []}, luke_flow:collect_output(FlowId, 100)),
-             test_util:verify_results(FlowId, none),
-             test_util:assertDead([Pid|Phases]) end].

File apps/luke/tests/simple_phase.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(simple_phase).
-
--behaviour(luke_phase).
-
--export([init/1, handle_input/3, handle_input_done/1, handle_event/2,
-         handle_timeout/1, handle_info/2, terminate/2]).
-
--record(state, {inputs=[]}).
-
-init([]) ->
-    {ok, #state{}}.
-
-handle_input(Inputs, #state{inputs=Inputs0}=State, _Timeout) ->
-    {no_output, State#state{inputs=Inputs0 ++ Inputs}}.
-
-handle_input_done(State) ->
-    luke_phase:complete(),
-    {output, State#state.inputs, State}.
-
-handle_event(_Event, State) ->
-    {no_output, State}.
-
-handle_timeout(State) ->
-    {no_output, State}.
-
-handle_info(_Info, State) ->
-    {no_output, State}.
-
-terminate(_Reason, _State) ->
-    ok.

File apps/luke/tests/test_suite.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(test_suite).
-
--include_lib("eunit/include/eunit.hrl").
-
-all_test_() ->
-    [{module, lifecycle_tests},
-     {module, results_tests}].

File apps/luke/tests/test_util.erl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--module(test_util).
-
--include_lib("eunit/include/eunit.hrl").
-
--export([start_flow/1, verify_phases/2, verify_results/2, assertDead/1]).
-
-start_flow(FlowDesc) ->
-    FlowId = make_ref(),
-    {ok, Pid} = luke:new_flow(FlowId, FlowDesc),
-    Phases = test_util:verify_phases(Pid, length(FlowDesc)),
-    {FlowId, Pid, Phases}.
-
-
-verify_phases(Pid, Size) ->
-    Phases = luke_flow:get_phases(Pid),
-    ?assertEqual(Size, length(Phases)),
-    Phases.
-
-verify_results(FlowId, none) ->
-    receive
-        {flow_results, FlowId, done} ->
-            throw({error, unexpected_done});
-        {flow_results, _PhaseId, FlowId, Results} ->
-            throw({error, unexpected_results, Results})
-    after 100 ->
-            ok
-    end;
-verify_results(FlowId, results) ->
-    receive
-        {flow_results, FlowId, done} ->
-            throw({error, unexpected_done});
-        {flow_results, _PhaseId, FlowId, Results} ->
-            {ok, Results}
-    after 100 ->
-            throw({error, no_results})
-    end;
-verify_results(FlowId, done) ->
-    receive
-        {flow_results, FlowId, done} ->
-            ok;
-        {flow_results, _PhaseId, FlowId, Results} ->
-            throw({error, unexpected_results, Results})
-    after 100 ->
-            throw({error, no_results})
-    end.
-
-assertDead(Pids)->
-    timer:sleep(25),
-    assertDead0(Pids).
-
-assertDead0([]) ->
-    ok;
-assertDead0([H|T]) when is_list(H) ->
-    ok = assertDead0(H),
-    assertDead0(T);
-assertDead0([H|T]) when is_pid(H) ->
-    ?assertMatch(false, erlang:is_process_alive(H)),
-    assertDead0(T).

File apps/luke/tests/tests.hrl

-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-
-%%   http://www.apache.org/licenses/LICENSE-2.0
-
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-
--define(TWO_PHASE_FLOW, [{simple_phase, [], []},
-                         {simple_phase, [accumulate], []}]).
-
--define(THREE_PHASE_NESTED_FLOW, [{nested_phase, [accumulate], []},
-                                  {simple_phase, [accumulate], []},
-                                  {simple_phase, [accumulate], []}]).
-
-
--define(TWO_ASYNC_FLOW, [{async_phase, [], []},
-                         {async_phase, [accumulate], []}]).
-
--define(MAP_FLOW, [{map_phase, [accumulate], []}]).
-
--define(MAP_DBL_FLOW, [{map_phase, [accumulate], []},
-                       {map_phase, [accumulate], []}]).
-
--define(MAPRED_FLOW, [{map_phase, [], []},
-                      {reduce_phase, [{converge, 3}], []},
-                      {reduce_phase, [accumulate], []}]).
-
--define(MAPRED_FLOW1, [{map_phase, [], []},
-                       {reduce_phase, [{converge, 1}, accumulate], []}]).
-
--define(MAPRED_EMPTY, [{map_phase, [], []},
-                       {reduce_phase, [{converge, 1}], []}]).

File apps/riak_core/include/riak_core_handoff.hrl

--define(PT_MSG_INIT, 0).
--define(PT_MSG_OBJ, 1).
--define(PT_MSG_OLDSYNC, 2).
--define(PT_MSG_SYNC, 3).
--define(PT_MSG_CONFIGURE, 4).

File apps/riak_core/include/riak_core_vnode.hrl

--type sender_type() :: fsm | server | raw.
--type sender() :: {sender_type(), reference(), pid()} |
-                  %% TODO: Double-check that these special cases are kosher
-                  {server, undefined, undefined} | % special case in
-                                                   % riak_core_vnode_master.erl
-                  {fsm, undefined, pid()} |        % special case in
-                                                   % riak_kv_util:make_request/2.erl
-                  ignore | noreply.
--type partition() :: non_neg_integer().
--type vnode_req() :: term().
-
--record(riak_vnode_req_v1, {
-          index :: partition(),
-          sender=noreply :: sender(),
-          request :: vnode_req()}).
-
-
--record(riak_core_fold_req_v1, {
-          foldfun :: fun(),
-          acc0 :: term()}).
-
--define(VNODE_REQ, #riak_vnode_req_v1).
--define(FOLD_REQ, #riak_core_fold_req_v1).
-

File apps/riak_core/rebar.config

-{lib_dirs, [".."]}.
-{erl_first_files, ["src/gen_nb_server.erl", "src/gen_server2.erl"]}.

File apps/riak_core/src/app_helper.erl

-%% -------------------------------------------------------------------
-%%
-%% riak_core: Core Riak Application
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc.  All Rights Reserved.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-%%
-%%   http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% -------------------------------------------------------------------
-
--module(app_helper).
-
--export([get_env/1, get_env/2, get_env/3]).
-
-%% ===================================================================
-%% Public API
-%% ===================================================================
-
-%% @spec get_env(App :: atom()) -> [{Key :: atom(), Value :: term()}]
-%% @doc Retrieve all Key/Value pairs in the env for the specified app.
-get_env(App) ->
-    application:get_all_env(App).
-
-%% @spec get_env(App :: atom(), Key :: atom()) -> term()
-%% @doc The official way to get a value from the app's env.
-%%      Will return the 'undefined' atom if that key is unset.
-get_env(App, Key) ->
-    get_env(App, Key, undefined).
-
-%% @spec get_env(App :: atom(), Key :: atom(), Default :: term()) -> term()
-%% @doc The official way to get a value from this application's env.
-%%      Will return Default if that key is unset.
-get_env(App, Key, Default) ->
-    case application:get_env(App, Key) of
-	{ok, Value} ->
-            Value;
-        _ ->
-            Default
-    end.

File apps/riak_core/src/bloom.erl

-% ``The contents of this file are subject to the Erlang Public License,
-%% Version 1.1, (the "License"); you may not use this file except in
-%% compliance with the License. You should have received a copy of the
-%% Erlang Public License along with this software. If not, it can be
-%% retrieved via the world wide web at http://www.erlang.org/.
-%% 
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and limitations
-%% under the License.
-%% 
--module(bloom).
--author("Paulo Sergio Almeida <psa@di.uminho.pt>").
--export([sbf/1, sbf/2, sbf/3, sbf/4,
-         bloom/1, bloom/2,
-         member/2, add/2,
-         size/1, capacity/1]).
--export([is_element/2, add_element/2]). % alternative names
--import(math, [log/1, pow/2]).
-
-is_element(E, B) -> member(E, B).
-add_element(E, B) -> add(E, B).
-
-%% Based on
-%% Scalable Bloom Filters
-%% Paulo Sérgio Almeida, Carlos Baquero, Nuno Preguiça, David Hutchison
-%% Information Processing Letters
-%% Volume 101, Issue 6, 31 March 2007, Pages 255-261 
-%%
-%% Provides scalable bloom filters that can grow indefinitely while
-%% ensuring a desired maximum false positive probability. Also provides
-%% standard partitioned bloom filters with a maximum capacity. Bit arrays
-%% are dimensioned as a power of 2 to enable reusing hash values across
-%% filters through bit operations. Double hashing is used (no need for
-%% enhanced double hashing for partitioned bloom filters).
-
-%% modified slightly by Justin Sheehy to make it a single file
-%% (incorporated the array-based bitarray internally)
-
--define(W, 27).
-
--record(bloom, {
-  e,    % error probability
-  n,    % maximum number of elements
-  mb,   % 2^mb = m, the size of each slice (bitvector)
-  size, % number of elements
-  a     % list of bitvectors
-}).
-
--record(sbf, {
-  e,    % error probability
-  r,    % error probability ratio
-  s,    % log 2 of growth ratio
-  size, % number of elements
-  b     % list of plain bloom filters
-}).
-
-%% Constructors for (fixed capacity) bloom filters
-%%
-%% N - capacity
-%% E - error probability
-
-bloom(N) -> bloom(N, 0.001).
-bloom(N, E) when is_number(N), N > 0,
-            is_float(E), E > 0, E < 1,
-            N >= 4/E -> % rule of thumb; due to double hashing
-  bloom(size, N, E).
-
-bloom(Mode, Dim, E) ->
-  K = 1 + trunc(log2(1/E)),
-  P = pow(E, 1 / K),
-  case Mode of
-    size -> Mb = 1 + trunc(-log2(1 - pow(1 - P, 1 / Dim)));
-    bits -> Mb = Dim
-  end,
-  M = 1 bsl Mb,
-  N = trunc(log(1-P) / log(1-1/M)),
-  #bloom{e=E, n=N, mb=Mb, size = 0,
-         a = [bitarray_new(1 bsl Mb) || _ <- lists:seq(1, K)]}.
-
-log2(X) -> log(X) / log(2).
-
-%% Constructors for scalable bloom filters
-%%
-%% N - initial capacity before expanding
-%% E - error probability
-%% S - growth ratio when full (log 2) can be 1, 2 or 3
-%% R - tightening ratio of error probability
-
-sbf(N) -> sbf(N, 0.001).
-sbf(N, E) -> sbf(N, E, 1).
-sbf(N, E, 1) -> sbf(N, E, 1, 0.85);
-sbf(N, E, 2) -> sbf(N, E, 2, 0.75);
-sbf(N, E, 3) -> sbf(N, E, 3, 0.65).
-sbf(N, E, S, R) when is_number(N), N > 0,
-                     is_float(E), E > 0, E < 1,
-                     is_integer(S), S > 0, S < 4,
-                     is_float(R), R > 0, R < 1,
-                     N >= 4/(E*(1-R)) -> % rule of thumb; due to double hashing
-  #sbf{e=E, s=S, r=R, size=0, b=[bloom(N, E*(1-R))]}.
-
-%% Returns number of elements
-%%
-size(#bloom{size=Size}) -> Size;
-size(#sbf{size=Size}) -> Size.
-
-%% Returns capacity
-%%
-capacity(#bloom{n=N}) -> N;
-capacity(#sbf{}) -> infinity.
-
-%% Test for membership
-%%
-member(Elem, #bloom{mb=Mb}=B) ->
-  Hashes = make_hashes(Mb, Elem),
-  hash_member(Hashes, B);
-member(Elem, #sbf{b=[H|_]}=Sbf) ->
-  Hashes = make_hashes(H#bloom.mb, Elem),
-  hash_member(Hashes, Sbf).
-
-hash_member(Hashes, #bloom{mb=Mb, a=A}) ->
-  Mask = 1 bsl Mb -1,
-  {I1, I0} = make_indexes(Mask, Hashes),
-  all_set(Mask, I1, I0, A);
-hash_member(Hashes, #sbf{b=B}) ->
-  lists:any(fun(X) -> hash_member(Hashes, X) end, B).
-
-make_hashes(Mb, E) when Mb =< 16 ->
-  erlang:phash2({E}, 1 bsl 32);
-make_hashes(Mb, E) when Mb =< 32 ->
-  {erlang:phash2({E}, 1 bsl 32), erlang:phash2([E], 1 bsl 32)}.
-
-make_indexes(Mask, {H0, H1}) when Mask > 1 bsl 16 -> masked_pair(Mask, H0, H1);
-make_indexes(Mask, {H0, _}) -> make_indexes(Mask, H0);
-make_indexes(Mask, H0) -> masked_pair(Mask, H0 bsr 16, H0).
-
-masked_pair(Mask, X, Y) -> {X band Mask, Y band Mask}.
-
-all_set(_Mask, _I1, _I, []) -> true;
-all_set(Mask, I1, I, [H|T]) ->
-  case bitarray_get(I, H) of
-    true -> all_set(Mask, I1, (I+I1) band Mask, T);
-    false -> false
-  end.
-
-%% Adds element to set
-%%
-add(Elem, #bloom{mb=Mb} = B) ->
-  Hashes = make_hashes(Mb, Elem),
-  hash_add(Hashes, B);
-add(Elem, #sbf{size=Size, r=R, s=S, b=[H|T]=Bs}=Sbf) ->
-  #bloom{mb=Mb, e=E, n=N, size=HSize} = H,
-  Hashes = make_hashes(Mb, Elem),
-  case hash_member(Hashes, Sbf) of
-    true -> Sbf;
-    false ->
-      case HSize < N of
-        true -> Sbf#sbf{size=Size+1, b=[hash_add(Hashes, H)|T]};
-        false ->
-          B = add(Elem, bloom(bits, Mb + S, E * R)),
-          Sbf#sbf{size=Size+1, b=[B|Bs]}
-      end
-  end.
-
-hash_add(Hashes, #bloom{mb=Mb, a=A, size=Size} = B) ->
-  Mask = 1 bsl Mb -1,
-  {I1, I0} = make_indexes(Mask, Hashes),
-  case all_set(Mask, I1, I0, A) of
-    true -> B;
-    false -> B#bloom{size=Size+1, a=set_bits(Mask, I1, I0, A, [])}
-  end.
-
-set_bits(_Mask, _I1, _I, [], Acc) -> lists:reverse(Acc);
-set_bits(Mask, I1, I, [H|T], Acc) ->
-  set_bits(Mask, I1, (I+I1) band Mask, T, [bitarray_set(I, H) | Acc]).
-
-bitarray_new(N) -> array:new((N-1) div ?W + 1, {default, 0}).
-
-bitarray_set(I, A) ->
-  AI = I div ?W,
-  V = array:get(AI, A),
-  V1 = V bor (1 bsl (I rem ?W)),
-  array:set(AI, V1, A).
-
-bitarray_get(I, A) ->
-  AI = I div ?W,
-  V = array:get(AI, A),
-  V band (1 bsl (I rem ?W)) =/= 0.
-

File apps/riak_core/src/chash.erl

-%% -------------------------------------------------------------------
-%%
-%% chash: basic consistent hashing
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc.  All Rights Reserved.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License.  You may obtain
-%% a copy of the License at
-%%
-%%   http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied.  See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% -------------------------------------------------------------------
-
-%% @doc A consistent hashing implementation.  The space described by the ring
-%%      coincides with SHA-1 hashes, and so any two keys producing the same
-%%      SHA-1 hash are considered identical within the ring.
-%%
-%% @reference Karger, D.; Lehman, E.; Leighton, T.; Panigrahy, R.; Levine, M.;
-%% Lewin, D. (1997). "Consistent hashing and random trees". Proceedings of the
-%% twenty-ninth annual ACM symposium on Theory of computing: 654~663. ACM Press
-%% New York, NY, USA
-
--module(chash).
--author('Justin Sheehy <justin@basho.com>').
--author('Andy Gross <andy@basho.com>').
-
--export([fresh/2,update/3,lookup/2,members/1,size/1,nodes/1,
-     successors/2,successors/3,
-     predecessors/2,predecessors/3,
-     contains_name/2,key_of/1,
-     merge_rings/2]).
-    
--define(RINGTOP, trunc(math:pow(2,160)-1)).  % SHA-1 space
--include_lib("eunit/include/eunit.hrl").
-
-% @type chash() = {NumPartitions, [NodeEntry]}
-%  NumPartitions = integer()
-%  NodeEntry = {IndexAsInt, Node}
-%  IndexAsInt = integer()
-%  Node = node().
-% It is not recommended that code outside this module make use
-% of the structure of a chash.
-
-% @type index() = binary().
-% Indices into the ring, used as keys for object location, are binary
-% representations of 160-bit integers.
-
-% @type node() = term().
-% A Node is the unique identifier for the owner of a given partition.
-% An Erlang Pid works well here, but the chash module allows it to
-% be any term.
-
-% @doc Create a brand new ring.  The size and seednode are specified;
-%      initially all partitions are owned by the seednode.  If NumPartitions
-%      is not much larger than the intended eventual number of
-%       participating nodes, then performance will suffer.
-% @spec fresh(NumPartitions :: integer(), SeedNode :: node()) -> chash()
-fresh(NumPartitions, SeedNode) ->
-    Inc = ?RINGTOP div NumPartitions,
-    {NumPartitions, [{IndexAsInt, SeedNode} ||
-           IndexAsInt <- lists:seq(0,(?RINGTOP-1),Inc)]}.
-
-% @doc Find the Node that owns the partition identified by IndexAsInt.
-% @spec lookup(IndexAsInt :: integer(), CHash :: chash()) -> node()
-lookup(IndexAsInt, CHash) ->
-    {_NumPartitions, Nodes} = CHash,
-    {IndexAsInt, X} = proplists:lookup(IndexAsInt, Nodes),
-    X.
-
-% @doc Return true if named Node owns any partitions in the ring, else false.
-% @spec contains_name(Name :: node(), CHash :: chash()) -> bool()
-contains_name(Name, CHash) ->
-    {_NumPartitions, Nodes} = CHash,
-    [X || {_,X} <- Nodes, X == Name] =/= [].
-
-% @doc Make the partition beginning at IndexAsInt owned by Name'd node.
-% @spec update(IndexAsInt :: integer(), Name :: node(), CHash :: chash())
-%                -> chash()
-update(IndexAsInt, Name, CHash) ->
-    {NumPartitions, Nodes} = CHash,
-    NewNodes = lists:keyreplace(IndexAsInt, 1, Nodes, {IndexAsInt, Name}),
-    {NumPartitions, NewNodes}.
-
-% @doc Given an object key, return all NodeEntries in order starting at Index.
-% @spec successors(Index :: index(), CHash :: chash()) -> [NodeEntry]
-successors(Index, CHash) ->
-    {NumPartitions, _Nodes} = CHash,
-    successors(Index, CHash, NumPartitions).
-% @doc Given an object key, return the next N NodeEntries in order
-%      starting at Index.
-% @spec successors(Index :: index(), CHash :: chash(), N :: integer())
-%                                                             -> [NodeEntry]
-successors(Index, CHash, N) ->
-    Num = max_n(N, CHash),
-    {Res, _} = lists:split(Num, ordered_from(Index, CHash)),
-    Res.
-
-% @doc Given an object key, return all NodeEntries in reverse order
-%      starting at Index.
-% @spec predecessors(Index :: index(), CHash :: chash()) -> [NodeEntry]
-predecessors(Index, CHash) ->
-    {NumPartitions, _Nodes} = CHash,
-    predecessors(Index, CHash, NumPartitions).
-% @doc Given an object key, return the next N NodeEntries in reverse order
-%      starting at Index.
-% @spec predecessors(Index :: index(), CHash :: chash(), N :: integer())
-%                                                             -> [NodeEntry]
-predecessors(Index, CHash, N) ->
-    Num = max_n(N, CHash),
-    {Res, _} = lists:split(Num, lists:reverse(ordered_from(Index,CHash))),
-    Res.
-
-% @doc Return either N or the number of partitions in the ring, whichever
-%      is lesser.
-% @spec max_n(N :: integer(), CHash :: chash()) -> integer()
-max_n(N, {NumPartitions, _Nodes}) ->
-    erlang:min(N, NumPartitions).
-
-% @doc Given an object key, return all NodeEntries in order starting at Index.
-% @spec ordered_from(Index :: index(), CHash :: chash()) -> [NodeEntry]
-ordered_from(Index, {NumPartitions, Nodes}) ->
-    <<IndexAsInt:160/integer>> = Index,
-    Inc = ?RINGTOP div NumPartitions,
-    {A, B} = lists:split((IndexAsInt div Inc)+1, Nodes),
-    B ++ A.
-
-% @doc Given any term used to name an object, produce that object's key
-%      into the ring.  Two names with the same SHA-1 hash value are
-%      considered the same name.
-% @spec key_of(ObjectName :: term()) -> index()
-key_of(ObjectName) ->    
-    crypto:sha(term_to_binary(ObjectName)).
-
-% @doc Return all Nodes that own any partitions in the ring.
-% @spec members(CHash :: chash()) -> [Node]
-members(CHash) ->
-    {_NumPartitions, Nodes} = CHash,
-    lists:usort([X || {_Idx,X} <- Nodes]).
-
-% @doc Return the entire set of NodeEntries in the ring.
-% @spec nodes(CHash :: chash()) -> [NodeEntry]
-nodes(CHash) ->
-    {_NumPartitions, Nodes} = CHash,
-    Nodes.
-
-% @doc Return a randomized merge of two rings.
-%      If multiple nodes are actively claiming nodes in the same
-%      time period, churn will occur.  Be prepared to live with it.
-% @spec merge_rings(CHashA :: chash(), CHashB :: chash()) -> chash()
-merge_rings(CHashA,CHashB) ->
-    {NumPartitions, NodesA} = CHashA,
-    {NumPartitions, NodesB} = CHashB,
-    {NumPartitions, [{I,randomnode(A,B)} || 
-           {{I,A},{I,B}} <- lists:zip(NodesA,NodesB)]}.
-
-% @spec randomnode(NodeA :: node(), NodeB :: node()) -> node()
-randomnode(NodeA,NodeA) -> NodeA;
-randomnode(NodeA,NodeB) -> lists:nth(crypto:rand_uniform(1,3),[NodeA,NodeB]).
-
-% @doc Return the number of partitions in the ring.
-% @spec size(CHash :: chash()) -> integer()
-size(CHash) ->
-    {_NumPartitions,Nodes} = CHash,
-    length(Nodes).
-
-update_test() ->
-    Node = 'old@host', NewNode = 'new@host',
-    
-    % Create a fresh ring...
-    CHash = chash:fresh(5, Node),
-    GetNthIndex = fun(N, {_, Nodes}) -> {Index, _} = lists:nth(N, Nodes), Index end,
-    
-    % Test update...
-    FirstIndex = GetNthIndex(1, CHash),
-    ThirdIndex = GetNthIndex(3, CHash),
-    {5, [{_, NewNode}, {_, Node}, {_, Node}, {_, Node}, {_, Node}, {_, Node}]} = update(FirstIndex, NewNode, CHash),
-    {5, [{_, Node}, {_, Node}, {_, NewNode}, {_, Node}, {_, Node}, {_, Node}]} = update(ThirdIndex, NewNode, CHash).
-
-contains_test() ->
-    CHash = chash:fresh(8, the_node),
-    ?assertEqual(true, contains_name(the_node,CHash)),
-    ?assertEqual(false, contains_name(some_other_node,CHash)).
-
-max_n_test() ->
-    CHash = chash:fresh(8, the_node),
-    ?assertEqual(1, max_n(1,CHash)),
-    ?assertEqual(8, max_n(11,CHash)).
-    
-simple_size_test() ->
-    ?assertEqual(8, length(chash:nodes(chash:fresh(8,the_node)))).
-
-successors_length_test() ->
-    ?assertEqual(8, length(chash:successors(chash:key_of(0),
-                                            chash:fresh(8,the_node)))).
-inverse_pred_test() ->
-    CHash = chash:fresh(8,the_node),
-    S = [I || {I,_} <- chash:successors(chash:key_of(4),CHash)],
-    P = [I || {I,_} <- chash:predecessors(chash:key_of(4),CHash)],
-    ?assertEqual(S,lists:reverse(P)).
-
-merge_test() ->
-    CHashA = chash:fresh(8,node_one),
-    CHashB = chash:update(0,node_one,chash:fresh(8,node_two)),
-    CHash = chash:merge_rings(CHashA,CHashB),
-    ?assertEqual(node_one,chash:lookup(0,CHash)).

File apps/riak_core/src/gen_nb_server.erl

-%% Copyright (c) 2009 Hypothetical Labs, Inc.
- 
-%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%% of this software and associated documentation files (the "Software"), to deal
-%% in the Software without restriction, including without limitation the rights
-%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%% copies of the Software, and to permit persons to whom the Software is
-%% furnished to do so, subject to the following conditions:
-%%
-%% The above copyright notice and this permission notice shall be included in
-%% all copies or substantial portions of the Software.
-%%
-%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-%% THE SOFTWARE.
- 
--module(gen_nb_server).
- 
--author('kevin@hypotheticalabs.com').
- 
--behaviour(gen_server).
- 
-%% API
--export([start_link/4]).
- 
-%% Behavior callbacks
--export([behaviour_info/1]).
- 
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
- 
--define(SERVER, ?MODULE).
- 
--record(state, {cb,
-                sock,
-                server_state}).
- 
-%% @hidden
-behaviour_info(callbacks) ->
-  [{init, 1},
-   {handle_call, 3},
-   {handle_cast, 2},
-   {handle_info, 2},
-   {terminate, 2},
-   {sock_opts, 0},
-   {new_connection, 2}];
- 
-behaviour_info(_) ->
-  undefined.
- 
-%% @spec start_link(CallbackModule, IpAddr, Port, InitParams) -> Result
-%% CallbackModule = atom()
-%% IpAddr = string()
-%% Port = integer()
-%% InitParams = [any()]
-%% Result = {ok, pid()} | {error, any()}
-%% @doc Start server listening on IpAddr:Port
-start_link(CallbackModule, IpAddr, Port, InitParams) ->
-  gen_server:start_link(?MODULE, [CallbackModule, IpAddr, Port, InitParams], []).
- 
-%% @hidden
-init([CallbackModule, IpAddr, Port, InitParams]) ->
-  case CallbackModule:init(InitParams) of
-    {ok, ServerState} ->
-      case listen_on(CallbackModule, IpAddr, Port) of
-        {ok, Sock} ->
-          {ok, #state{cb=CallbackModule, sock=Sock, server_state=ServerState}};
-        Error ->
-          CallbackModule:terminate(Error, ServerState),
-          Error
-      end;
-    Err ->
-      Err
-  end.
- 
-%% @hidden
-handle_call(Request, From, #state{cb=Callback, server_state=ServerState}=State) ->
-  case Callback:handle_call(Request, From, ServerState) of
-    {reply, Reply, NewServerState} ->
-      {reply, Reply, State#state{server_state=NewServerState}};
-    {reply, Reply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
-      {reply, Reply, State#state{server_state=NewServerState}, Arg};
-    {noreply, NewServerState} ->
-      {noreply, State#state{server_state=NewServerState}};
-    {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
-      {noreply, State#state{server_state=NewServerState}, Arg};
-    {stop, Reason, NewServerState} ->
-      {stop, Reason, State#state{server_state=NewServerState}};
-    {stop, Reason, Reply, NewServerState} ->
-      {stop, Reason, Reply, State#state{server_state=NewServerState}}
-  end.
- 
-%% @hidden
-handle_cast(Msg, #state{cb=Callback, server_state=ServerState}=State) ->
-  case Callback:handle_cast(Msg, ServerState) of
-    {noreply, NewServerState} ->
-      {noreply, State#state{server_state=NewServerState}};
-    {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
-      {noreply, State#state{server_state=NewServerState}, Arg};
-    {stop, Reason, NewServerState} ->
-      {stop, Reason, State#state{server_state=NewServerState}}