Anonymous avatar Anonymous committed 443a3d4

backport of protocol buffers interface

Comments (0)

Files changed (7)

apps/riak/ebin/riak.app

-% -*- mode: erlang -*-
-{application, riak,
- [{description, "riak"},
-  {vsn, "0.9.1"},
-  {modules, [
-             bloom,
-             chash,
-             gen_nb_server,
-             gen_server2,
-             json_pp,
-             mapred_resource,
-             merkerl,
-             ping_http_resource,
-             priority_queue,
-             raw_http_resource,
-             raw_link_walker_resource,
-             riak,
-             riak_app,
-             riak_backup,
-             riak_bucket,
-             riak_cache_backend,
-             riak_claim,
-             riak_client,
-             riak_connect,
-             riak_console,
-             riak_delete,
-             riak_dets_backend,
-             riak_ets_backend,
-             riak_event_logger,
-             riak_eventer,
-             riak_fs_backend,
-             riak_gb_trees_backend,
-             riak_get_fsm,
-             riak_handoff_listener,
-             riak_handoff_receiver,
-             riak_handoff_sender,
-             riak_js_manager,
-             riak_js_sup,
-             riak_js_vm,
-             riak_keys_fsm,
-             riak_local_logger,
-             riak_map_executor,
-             riak_map_localphase,
-             riak_map_phase,
-             riak_mapred_json,
-             riak_mapred_query,
-             riak_mapreduce,
-             riak_multi_backend,
-             riak_object,
-             riak_phase_proto,
-             riak_put_fsm,
-             riak_reduce_phase,
-             riak_ring,
-             riak_ring_manager,
-             riak_stat,
-             riak_sup,
-             riak_test_util,
-             riak_util,
-             riak_vnode,
-             riak_vnode_master,
-             riak_vnode_sup,
-             riak_web,
-             riakserver_pb,
-             slide,
-             spiraltime,
-             stats_http_resource,
-             vclock
-            ]},
-  {applications, [
-                  kernel,
-                  stdlib,
-                  sasl,
-                  crypto
-                 ]},
-  {registered, []},
-  {mod, {riak_app, []}},
-  {env, [
-         %% Cluster name
-         {cluster_name, "default"},
-
-         %% Default location of ringstate
-         {ring_state_dir, "data/ring"},
-
-         %% Default ring creation size
-         {ring_creation_size, 64},
-
-         %% Default gossip interval (milliseconds)
-         {gossip_interval, 60000},
-
-         %% Default claims functions
-         {wants_claim_fun, {riak_claim, default_wants_claim}},
-         {choose_claim_fun, {riak_claim, default_choose_claim}},
-
-         %% Number of VNodes allowed to do handoff concurrently.
-         {handoff_concurrency, 4},
-
-         %% Endpoint for system stats HTTP provider
-         {stats_urlpath, "stats"},
-
-         %% Secondary code paths
-         {add_paths, []}
-        ]}
- ]}.
+% -*- mode: erlang -*-
+{application, riak,
+ [{description, "riak"},
+  {vsn, "0.9.1"},
+  {modules, [
+             bloom,
+             chash,
+             gen_nb_server,
+             gen_server2,
+             json_pp,
+             mapred_resource,
+             merkerl,
+             ping_http_resource,
+             priority_queue,
+             raw_http_resource,
+             raw_link_walker_resource,
+             riak,
+             riak_app,
+             riak_backup,
+             riak_bucket,
+             riak_cache_backend,
+             riak_claim,
+             riak_client,
+             riak_connect,
+             riak_console,
+             riak_delete,
+             riak_dets_backend,
+             riak_ets_backend,
+             riak_event_logger,
+             riak_eventer,
+             riak_fs_backend,
+             riak_gb_trees_backend,
+             riak_get_fsm,
+             riak_handoff_listener,
+             riak_handoff_receiver,
+             riak_handoff_sender,
+             riak_js_manager,
+             riak_js_sup,
+             riak_js_vm,
+             riak_keys_fsm,
+             riak_local_logger,
+             riak_map_executor,
+             riak_map_localphase,
+             riak_map_phase,
+             riak_mapred_json,
+             riak_mapred_query,
+             riak_mapreduce,
+             riak_multi_backend,
+             riak_object,
+             riak_pb,
+             riak_pb_listener,
+             riak_pb_socket,
+             riak_pb_socket_sup,
+             riak_phase_proto,
+             riak_put_fsm,
+             riak_reduce_phase,
+             riak_ring,
+             riak_ring_manager,
+             riak_stat,
+             riak_sup,
+             riak_test_util,
+             riak_util,
+             riak_vnode,
+             riak_vnode_master,
+             riak_vnode_sup,
+             riak_web,
+             riakclient_pb,
+             riakserver_pb,
+             slide,
+             spiraltime,
+             stats_http_resource,
+             vclock
+            ]},
+  {applications, [
+                  kernel,
+                  stdlib,
+                  sasl,
+                  crypto
+                 ]},
+  {registered, []},
+  {mod, {riak_app, []}},
+  {env, [
+         %% Cluster name
+         {cluster_name, "default"},
+
+         %% Default location of ringstate
+         {ring_state_dir, "data/ring"},
+
+         %% Default ring creation size
+         {ring_creation_size, 64},
+
+         %% Default gossip interval (milliseconds)
+         {gossip_interval, 60000},
+
+         %% Default claims functions
+         {wants_claim_fun, {riak_claim, default_wants_claim}},
+         {choose_claim_fun, {riak_claim, default_choose_claim}},
+
+         %% Number of VNodes allowed to do handoff concurrently.
+         {handoff_concurrency, 4},
+
+         {pb_port, 8087},
+         {pb_ip, "0.0.0.0"},
+
+         %% Endpoint for system stats HTTP provider
+         {stats_urlpath, "stats"},
+
+         %% Secondary code paths
+         {add_paths, []}
+        ]}
+ ]}.

apps/riak/src/riak_pb.erl

+%% -------------------------------------------------------------------
+%%
+%% riak_pb: protocol buffer utility functions
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc.  All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc protocol buffer utilities
+
+-module(riak_pb).
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+-include("riakclient_pb.hrl").
+-compile([export_all]).
+
+%% Names of riak_object metadata fields
+-define(MD_CTYPE,    <<"content-type">>).
+-define(MD_CHARSET,  <<"charset">>).
+-define(MD_ENCODING, <<"content-encoding">>).
+-define(MD_VTAG,     <<"X-Riak-VTag">>).
+-define(MD_LINKS,    <<"Links">>).
+-define(MD_LASTMOD,  <<"X-Riak-Last-Modified">>).
+-define(MD_USERMETA, <<"X-Riak-Meta">>).
+
+%% Names of PB fields in bucket properties
+-define(PB_PROPS,   <<"props">>).
+-define(PB_KEYS,    <<"keys">>).
+-define(PB_LINKFUN, <<"linkfun">>).
+-define(PB_MOD,     <<"mod">>).
+-define(PB_FUN,     <<"fun">>).
+-define(PB_CHASH,   <<"chash_keyfun">>).
+-define(PB_JSFUN,    <<"jsfun">>).
+-define(PB_JSANON,   <<"jsanon">>).
+-define(PB_JSBUCKET, <<"bucket">>).
+-define(PB_JSKEY,    <<"key">>).
+-define(PB_ALLOW_MULT, <<"allow_mult">>).
+
+%% Create an iolist of msg code and protocol buffer message
+encode(Msg) when is_atom(Msg) ->
+    [msg_code(Msg)];
+encode(Msg) when is_tuple(Msg) ->
+    MsgType = element(1, Msg),
+    [msg_code(MsgType) | riakclient_pb:iolist(MsgType, Msg)].
+ 
+%% Decode a protocol buffer message given its type - if no bytes
+%% return the atom for the message code
+decode(MsgCode, <<>>) ->
+    msg_type(MsgCode);
+decode(MsgCode, MsgData) ->
+    riakclient_pb:decode(msg_type(MsgCode), MsgData).
+
+msg_type(0) -> rpberrorresp;
+msg_type(1) -> rpbhelloreq;
+msg_type(2) -> rpbhelloresp;
+msg_type(3) -> rpbpingreq;
+msg_type(4) -> rpbpingresp;
+msg_type(5) -> rpbgetreq;
+msg_type(6) -> rpbgetresp;
+msg_type(7) -> rpbputreq;
+msg_type(8) -> rpbputresp;
+msg_type(9) -> rpbdelreq;
+msg_type(10) -> rpbdelresp;
+msg_type(11) -> rpbgetbucketpropsreq;
+msg_type(12) -> rpbgetbucketpropsresp;
+msg_type(13) -> rpbsetbucketpropsreq;
+msg_type(14) -> rpbsetbucketpropsresp;
+msg_type(15) -> rpblistbucketsreq;
+msg_type(16) -> rpblistbucketsresp;
+msg_type(17) -> rpblistkeysreq;
+msg_type(18) -> rpblistkeysresp;
+msg_type(19) -> rpbmapredreq;
+msg_type(20) -> rpbmapredresp;
+msg_type(_) -> undefined.
+    
+msg_code(rpberrorresp) -> 0;
+msg_code(rpbhelloreq)  -> 1;
+msg_code(rpbhelloresp) -> 2;
+msg_code(rpbpingreq)   -> 3;
+msg_code(rpbpingresp)  -> 4;
+msg_code(rpbgetreq)    -> 5;
+msg_code(rpbgetresp)   -> 6;
+msg_code(rpbputreq)    -> 7;
+msg_code(rpbputresp)   -> 8;
+msg_code(rpbdelreq)    -> 9;
+msg_code(rpbdelresp)   -> 10;
+msg_code(rpbgetbucketpropsreq)   -> 11;
+msg_code(rpbgetbucketpropsresp)  -> 12;
+msg_code(rpbsetbucketpropsreq)   -> 13;
+msg_code(rpbsetbucketpropsresp)  -> 14;
+msg_code(rpblistbucketsreq)      -> 15;
+msg_code(rpblistbucketsresp)     -> 16;
+msg_code(rpblistkeysreq)         -> 17;
+msg_code(rpblistkeysresp)        -> 18;
+msg_code(rpbmapredreq)           -> 19;
+msg_code(rpbmapredresp)          -> 20.
+    
+
+%% ===================================================================
+%% Encoding/Decoding
+%% ===================================================================
+    
+%% Convert a vector clock to erlang
+erlify_rpbvc(undefined) ->
+    vclock:fresh();
+erlify_rpbvc(PbVc) ->
+    binary_to_term(zlib:unzip(PbVc)).
+
+%% Convert a vector clock to protocol buffers
+pbify_rpbvc(Vc) ->
+    zlib:zip(term_to_binary(Vc)).
+
+%% Convert options to protocol buffers
+pbify_rpboptions(undefined) ->
+    undefined;
+pbify_rpboptions(Options) ->
+   pbify_rpboptions(Options, #rpboptions{}).
+
+pbify_rpboptions([], RpbOpts) ->
+    RpbOpts;
+pbify_rpboptions([return_body|Rest], RpbOpts) ->
+    pbify_rpboptions(Rest, RpbOpts#rpboptions{return_body = true}).
+
+%% Convert a list of {MetaData,Value} pairs to protocol buffers
+pbify_rpbcontents([], Acc) ->
+    lists:reverse(Acc);
+pbify_rpbcontents([Content | Rest], Acc) ->
+    pbify_rpbcontents(Rest, [pbify_rpbcontent(Content) | Acc]).
+
+%% Convert a metadata/value pair into an #rpbcontent{} record    
+pbify_rpbcontent({Metadata, Value}) ->
+    {PbContent, LeftOver} = 
+        dict:fold(fun pbify_rpbcontent_entry/3, {#rpbcontent{value = Value}, []}, Metadata),
+    case LeftOver of
+        [] ->
+            PbLeftOver = undefined;
+        LeftOver ->
+            PbLeftOver = pbify_rpbterm({struct, LeftOver})
+    end,
+    PbContent#rpbcontent{metadata = PbLeftOver}.
+
+%% Convert the metadata dictionary entries to protocol buffers
+pbify_rpbcontent_entry(?MD_CTYPE, ContentType, {PbContent, LeftOver}) when is_list(ContentType) -> 
+    {PbContent#rpbcontent{content_type = ContentType}, LeftOver};
+pbify_rpbcontent_entry(?MD_CHARSET, Charset, {PbContent, LeftOver}) when is_list(Charset) ->
+    {PbContent#rpbcontent{charset = Charset}, LeftOver};
+pbify_rpbcontent_entry(?MD_ENCODING, Encoding, {PbContent, LeftOver}) when is_list(Encoding) ->
+    {PbContent#rpbcontent{content_encoding = Encoding}, LeftOver};
+pbify_rpbcontent_entry(?MD_VTAG, Vtag, {PbContent, LeftOver}) when is_list(Vtag) ->
+    {PbContent#rpbcontent{vtag = Vtag}, LeftOver};
+pbify_rpbcontent_entry(?MD_LINKS, Links, {PbContent, LeftOver}) when is_list(Links) ->
+    {PbContent#rpbcontent{links = [pbify_rpblink(E) || E <- Links]}, LeftOver};
+pbify_rpbcontent_entry(?MD_LASTMOD, {MS,S,US}, {PbContent, LeftOver}) -> 
+    {PbContent#rpbcontent{last_mod = 1000000*MS+S, last_mod_usecs = US}, LeftOver};
+pbify_rpbcontent_entry(?MD_USERMETA, UserMeta, {PbContent, LeftOver}) when is_list(UserMeta) ->
+    {PbContent#rpbcontent{usermeta = [pbify_rpbpair(E) || E <- UserMeta]}, LeftOver};
+pbify_rpbcontent_entry(Key, Value, {PbContent, LeftOver}) ->
+    {PbContent, [{Key, Value} | LeftOver]}.
+
+%% Convert an rpccontent pb message to an erlang {MetaData,Value} tuple
+erlify_rpbcontent(PbC) ->
+    case PbC#rpbcontent.metadata of
+        undefined ->
+            ErlMd0 = dict:new();
+        
+        PbMd ->
+            {struct, MdList} = erlify_rpbterm(PbMd), 
+            ErlMd0 = dict:from_list(MdList)
+    end,
+    case PbC#rpbcontent.content_type of
+        undefined ->
+            ErlMd1 = ErlMd0;
+        ContentType ->
+            ErlMd1 = dict:store(?MD_CTYPE, binary_to_list(ContentType), ErlMd0)
+    end,
+    case PbC#rpbcontent.charset of
+        undefined ->
+            ErlMd2 = ErlMd1;
+        Charset ->
+            ErlMd2 = dict:store(?MD_CHARSET, binary_to_list(Charset), ErlMd1)
+    end,
+    case PbC#rpbcontent.content_encoding of
+        undefined ->
+            ErlMd3 = ErlMd2;
+        Encoding ->
+            ErlMd3 = dict:store(?MD_ENCODING, binary_to_list(Encoding), ErlMd2)
+    end,
+    case PbC#rpbcontent.vtag of
+        undefined ->
+            ErlMd4 = ErlMd3;
+        Vtag ->
+            ErlMd4 = dict:store(?MD_VTAG, binary_to_list(Vtag), ErlMd3)
+    end,
+    case PbC#rpbcontent.links of
+        undefined ->
+            ErlMd5 = ErlMd4;
+        PbLinks ->
+            Links = [erlify_rpblink(E) || E <- PbLinks],
+            ErlMd5 = dict:store(?MD_LINKS, Links, ErlMd4)
+    end,
+    case PbC#rpbcontent.last_mod of
+        undefined ->
+            ErlMd6 = ErlMd5;
+        LastMod ->
+            case PbC#rpbcontent.last_mod_usecs of
+                undefined ->
+                    Usec = 0;
+                Usec ->
+                    Usec
+            end,
+            Msec = LastMod div 1000000,
+            Sec = LastMod rem 1000000,
+            ErlMd6 = dict:store(?MD_LASTMOD, {Msec,Sec,Usec}, ErlMd5)
+    end,
+    case PbC#rpbcontent.usermeta of
+        undefined ->
+            ErlMd = ErlMd6;
+        PbUserMeta ->
+            UserMeta = [erlify_rpbpair(E) || E <- PbUserMeta],
+            ErlMd = dict:store(?MD_USERMETA, UserMeta, ErlMd6)
+    end,
+
+    {ErlMd, PbC#rpbcontent.value}.
+    
+
+%% Convert {K,V} tuple to protocol buffers
+pbify_rpbpair({K,V}) ->
+    #rpbpair{key = K, value = V}.
+
+%% Convert RpbPair PB message to erlang {K,V} tuple
+erlify_rpbpair(#rpbpair{key = K, value = V}) ->
+    {binary_to_list(K), binary_to_list(V)}.
+    
+%% Covnert erlang link tuple to RpbLink PB message
+pbify_rpblink({{B,K},T}) ->
+    #rpblink{bucket = B, key = K, tag = T}.
+
+%% Convert RpbLink PB message to erlang link tuple
+erlify_rpblink(#rpblink{bucket = B, key = K, tag = T}) ->
+    {{B,K},T}.
+
+%% Protocol bufferify bucket properties
+%%
+pbify_bucket_props(Props) ->
+    pbify_rpbterm({struct, [pbify_bucket_prop(Prop) || Prop <- Props]}).
+
+
+pbify_bucket_prop({linkfun, {modfun, Module, Function}}) ->
+    {?PB_LINKFUN, {struct, [{?PB_MOD, to_binary(Module)},
+                             {?PB_FUN, to_binary(Function)}]}};
+pbify_bucket_prop({linkfun, {qfun, _}}) ->
+    {?PB_LINKFUN, <<"qfun">>};
+pbify_bucket_prop({linkfun, {jsfun, Name}}) ->
+    {?PB_LINKFUN, {struct, [{?PB_JSFUN, to_binary(Name)}]}};
+pbify_bucket_prop({linkfun, {jsanon, {Bucket, Key}}}) ->
+    {?PB_LINKFUN, {struct, [{?PB_JSANON,
+                               {struct, [{?PB_JSBUCKET, Bucket},
+                                         {?PB_JSKEY, Key}]}}]}};
+pbify_bucket_prop({linkfun, {jsanon, Source}}) ->
+    {?PB_LINKFUN, {struct, [{?PB_JSANON, Source}]}};
+pbify_bucket_prop({chash_keyfun, {Module, Function}}) ->
+    {?PB_CHASH, {struct, [{?PB_MOD, to_binary(Module)},
+                          {?PB_FUN, to_binary(Function)}]}};
+pbify_bucket_prop({Prop, Value}) ->
+    {list_to_binary(atom_to_list(Prop)), Value}.
+
+
+%% Erlify bucket properties
+%%
+erlify_bucket_props(RpbProps) ->
+    {struct, Props} = erlify_rpbterm(RpbProps),
+    [erlify_bucket_prop(Prop) || Prop <- Props].
+
+%% @spec erlify_bucket_prop({Property::binary(), jsonpropvalue()}) ->
+%%          {Property::atom(), erlpropvalue()}
+%% @doc The reverse of pbify_bucket_prop/1.  Converts PB representation
+%%      of bucket properties to their Erlang form.
+erlify_bucket_prop({?PB_LINKFUN, {struct, Props}}) ->
+    case {proplists:get_value(?PB_MOD, Props),
+          proplists:get_value(?PB_FUN, Props)} of
+        {Mod, Fun} when is_binary(Mod), is_binary(Fun) ->
+            {linkfun, {modfun,
+                       list_to_existing_atom(binary_to_list(Mod)),
+                       list_to_existing_atom(binary_to_list(Fun))}};
+        {undefined, undefined} ->
+            case proplists:get_value(?PB_JSFUN, Props) of
+                Name when is_binary(Name) ->
+                    {linkfun, {jsfun, Name}};
+                undefined ->
+                    case proplists:get_value(?PB_JSANON, Props) of
+                        {struct, Bkey} ->
+                            Bucket = proplists:get_value(?PB_JSBUCKET, Bkey),
+                            Key = proplists:get_value(?PB_JSKEY, Bkey),
+                            %% bomb if malformed
+                            true = is_binary(Bucket) andalso is_binary(Key),
+                            {linkfun, {jsanon, {Bucket, Key}}};
+                        Source when is_binary(Source) ->
+                            {linkfun, {jsanon, Source}}
+                    end
+            end
+    end;
+erlify_bucket_prop({?PB_CHASH, {struct, Props}}) ->
+    {chash_keyfun, {list_to_existing_atom(
+                      binary_to_list(
+                        proplists:get_value(?PB_MOD, Props))),
+                    list_to_existing_atom(
+                      binary_to_list(
+                        proplists:get_value(?PB_FUN, Props)))}};
+erlify_bucket_prop({?PB_ALLOW_MULT, Value}) ->
+    {allow_mult, any_to_bool(Value)};
+erlify_bucket_prop({Prop, Value}) ->
+    {list_to_existing_atom(binary_to_list(Prop)), Value}.
+
+%% Convert RpbMapRedInput message to erlang tuple
+erlify_mapred_input(#rpbmapredinput{bucket = Bucket, key = Key, data = undefined}) ->
+    {Bucket, Key};
+erlify_mapred_input(#rpbmapredinput{bucket = Bucket, key = Key, data = PbData}) ->
+    {Bucket, Key, erlify_rpbterm(PbData)}.
+
+%% Convert list of RpbMapRedPhase to list of erlang phase tuples
+erlify_mapred_query(PbQuery) ->
+    erlify_mapred_phases(PbQuery, []).
+
+erlify_mapred_phases([], ErlPhases) ->
+    {ok, lists:reverse(ErlPhases)};
+erlify_mapred_phases([#rpbmapredphase{type = <<"link">>, bucket = B, tag = T} | Rest], 
+                     ErlPhases) ->
+    case {B, T} of
+        {undefined, _} ->
+            {error, "link missing bucket"};
+        {_, undefined} ->
+            {error, "link missing tag"};
+        _ ->
+            erlify_mapred_phases(Rest, [{link, B, T} | ErlPhases])
+    end;
+erlify_mapred_phases([#rpbmapredphase{type = <<"map">>, keep = PbKeep, arg = PbArg}=PbPhase | Rest],
+                     ErlPhases) ->
+    case erlify_mapred_funterm(PbPhase) of
+        {ok, FunTerm} ->
+            Arg = erlify_rpbterm(PbArg),
+            erlify_mapred_phases(Rest, [{map, FunTerm, Arg, any_to_bool(PbKeep)} | ErlPhases]);
+        X ->
+            X
+    end;
+erlify_mapred_phases([#rpbmapredphase{type = <<"reduce">>, keep = PbKeep, arg = PbArg}=PbPhase | 
+                      Rest], ErlPhases) ->
+    case erlify_mapred_funterm(PbPhase) of
+        {ok, FunTerm} ->
+            Arg = erlify_rpbterm(PbArg),
+            erlify_mapred_phases(Rest, [{reduce, FunTerm, Arg, any_to_bool(PbKeep)} | ErlPhases]);
+        X ->
+            X
+    end;
+erlify_mapred_phases([#rpbmapredphase{type = Type} | _Rest], _ErlPhases) ->
+    {error, {unknown_phase_type, Type}}.
+
+%% Build the FunTerm for a phase from an RpbMapRedPhase message
+erlify_mapred_funterm(Phase) ->
+    case Phase of
+        #rpbmapredphase{language = <<"javascript">>, source = undefined, bucket = B, key = K,
+                       function = undefined} when 
+              B =/= undefined, K =/= undefined ->
+            {ok, {jsanon, {B, K}}};
+        #rpbmapredphase{language = <<"javascript">>, source = Source,
+                        bucket = undefined, key = undefined} when Source =/= undefined ->
+            {ok, {jsanon, Source}};
+        #rpbmapredphase{language = <<"javascript">>, source = undefined,
+                        bucket = undefined, key = undefined, function = Function} when 
+              function =/= undefined ->
+            {ok, {jsfun, Function}};
+        #rpbmapredphase{language = <<"erlang">>, source = undefined,
+                        module = Module, function = Function} when Module =/= undefined,
+                                                                   Function =/= undefined ->
+            try
+                {ok, {modfun, list_to_existing_atom(binary_to_list(Module)),
+                              list_to_existing_atom(binary_to_list(Function))}}
+            catch
+                error:badarg ->
+                    {error, "nonexistant module/function name"}
+            end;
+        #rpbmapredphase{language = <<"erlang">>, source = undefined,
+                        module = undefined, function = Function} when Function =/= undefined ->
+            try
+                F = binary_to_term(Function),
+                true = is_function(F),
+                {ok, {qfun, F}}
+            catch
+                error:badarg ->
+                    {error, "nonexistant module/function name"};
+                error:{batmatch,_} ->
+                    {error, "not an external binary encoded function"}
+            end;
+        _ ->
+            {error, "cannot parse function term"}
+    end.
+         
+%% Convert erlang term to RpbTerm message tree
+%% JSON-like generic mapping for sending general terms across the PB interface
+%% Uses the same {array, [term()]} and {object, [{term(),term()}]} convention
+%% as mochijson.
+
+-define(RPB_TERM_INTEGER, 1).
+-define(RPB_TERM_BOOLEAN, 2).
+-define(RPB_TERM_STRING, 3).
+-define(RPB_TERM_OBJECT, 5).
+-define(RPB_TERM_ARRAY, 6).
+ 
+pbify_rpbterm(T) when is_atom(T) ->
+    #rpbterm{type = ?RPB_TERM_STRING, string_value = list_to_binary(atom_to_list(T))};
+pbify_rpbterm(T) when is_list(T) ->
+    %% treat lists as lists - encode as a binary if you want a string
+    #rpbterm{type = ?RPB_TERM_ARRAY, array_values = [pbify_rpbterm(E) || E <- T]};
+pbify_rpbterm(T) when is_binary(T) ->
+    #rpbterm{type = ?RPB_TERM_STRING, string_value = T};
+pbify_rpbterm(T) when is_integer(T) ->
+    #rpbterm{type = ?RPB_TERM_INTEGER, int_value = T};
+pbify_rpbterm({array, L}) when is_list(L) ->
+    #rpbterm{type = ?RPB_TERM_ARRAY, array_values = [pbify_rpbterm(E) || E <- L]};
+pbify_rpbterm({struct, L}) ->
+    E = [#rpbobjectentry{name = to_binary(Name), value = pbify_rpbterm(Value)} ||
+            {Name, Value} <- L],
+    #rpbterm{type = ?RPB_TERM_OBJECT, object_entries = E}.
+
+%% Convert RpbTerm message to a term()
+erlify_rpbterm(#rpbterm{type = ?RPB_TERM_INTEGER, int_value = Int}) when Int =/= undefined ->
+    Int;
+erlify_rpbterm(#rpbterm{type = ?RPB_TERM_BOOLEAN, int_value = Int}) when Int =/= undefined ->
+    case Int of
+        0 ->
+            false;
+        _ ->
+            true
+    end;
+erlify_rpbterm(#rpbterm{type = ?RPB_TERM_STRING, string_value = Str}) when Str =/= undefined ->
+    binary_to_list(Str);
+erlify_rpbterm(#rpbterm{type = ?RPB_TERM_OBJECT, object_entries = undefined}) ->
+    {struct, []};
+erlify_rpbterm(#rpbterm{type = ?RPB_TERM_OBJECT, object_entries = List}) ->
+    {struct, [{Name, erlify_rpbterm(Value)} || 
+                  #rpbobjectentry{name=Name, value=Value} <- List]};
+erlify_rpbterm(#rpbterm{type = ?RPB_TERM_ARRAY, array_values = List}) ->
+    {array, [erlify_rpbterm(E) || E <- List]};
+erlify_rpbterm(undefined) ->
+    undefined.
+
+%% Make sure an atom/string/binary is definitely a binary
+to_binary(A) when is_atom(A) ->
+    list_to_binary(atom_to_list(A));
+to_binary(L) when is_list(L) ->
+    list_to_binary(L);
+to_binary(B) when is_binary(B) ->
+    B.
+
+%% Try and convert to true/false atoms
+any_to_bool(V) when is_list(V) ->
+    (V == "1") orelse (V == "true") orelse (V == "TRUE");
+any_to_bool(V) when is_binary(V) ->
+    any_to_bool(binary_to_list(V));
+any_to_bool(V) when is_integer(V) ->
+    V /= 0;
+any_to_bool(V) when is_boolean(V) ->
+    V.
+
+
+%% ===================================================================
+%% Unit Tests
+%% ===================================================================
+-ifdef(TEST).
+
+content_encode_decode_test() ->
+    MetaData = dict:from_list(
+                 [{?MD_CTYPE, "ctype"},
+                  {?MD_CHARSET, "charset"},
+                  {?MD_ENCODING, "encoding"},
+                  {?MD_VTAG, "vtag"},
+                  {?MD_LINKS, [{{<<"b1">>, <<"k1">>}, <<"v1">>},
+                               {{<<"b2">>, <<"k2">>}, <<"v2">>}
+                              ]},
+                  {?MD_LASTMOD, {1, 2, 3}},
+                  {?MD_USERMETA, [{"X-Riak-Meta-MyMetaData1","here it is"},
+                                  {"X-Riak-Meta-MoreMd", "have some more"}
+                                 ]}
+                 ]),
+    Value = <<"test value">>,
+    {MetaData2, Value2} = erlify_rpbcontent(
+                          riakclient_pb:decode_rpbcontent(
+                            riakclient_pb:encode_rpbcontent(
+                              pbify_rpbcontent({MetaData, Value})))),
+    MdSame = (dict:to_list(MetaData) =:= dict:to_list(MetaData2)),
+    MdSame = true,
+    Value = Value2.
+
+-endif.
+  
+

apps/riak/src/riak_pb_listener.erl

+%% -------------------------------------------------------------------
+%%
+%% riak_pb_listener: Listen for protocol buffer clients
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc.  All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc entry point for TCP-based protocol buffers service
+
+-module(riak_pb_listener).
+-behavior(gen_nb_server).
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+-export([sock_opts/0, new_connection/2]).
+-record(state, {portnum}).
+
+start_link() ->
+    PortNum = riak:get_app_env(pb_port),
+    IpAddr = riak:get_app_env(pb_ip),
+    gen_nb_server:start_link(?MODULE, IpAddr, PortNum, [PortNum]).
+
+init([PortNum]) -> 
+    register(?MODULE, self()),
+    {ok, #state{portnum=PortNum}}.
+
+sock_opts() -> [binary, {packet, 4}, {reuseaddr, true}, {backlog, 64}].
+
+handle_call(handoff_port, _From, State=#state{portnum=P}) -> 
+    {reply, {ok, P}, State}.
+
+handle_cast(_Msg, State) -> {noreply, State}.
+
+handle_info(_Info, State) -> {noreply, State}.
+
+terminate(_Reason, _State) -> ok.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+new_connection(Socket, State) ->
+    {ok, Pid} = riak_pb_socket_sup:start_socket(Socket),
+    ok = gen_tcp:controlling_process(Socket, Pid),
+    {ok, State}.
+

apps/riak/src/riak_pb_socket.erl

+%% -------------------------------------------------------------------
+%%
+%% riak_pb_socket: service protocol buffer clients
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc.  All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc service protocol buffer clients
+
+-module(riak_pb_socket).
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+-include("riakclient_pb.hrl").
+-behaviour(gen_server2).
+
+-export([start_link/1]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-type msg() ::  atom() | tuple().
+
+-record(state, {sock,      % protocol buffers socket
+                hello,     % hello message from client
+                client,    % local client
+                req,       % current request (for multi-message requests like list keys)
+                req_ctx}). % context to go along with request (partial results, request ids etc)
+
+
+-define(PROTO_MAJOR, 1).
+-define(PROTO_MINOR, 0).
+-define(DEFAULT_TIMEOUT, 60000).
+
+%% ===================================================================
+%% Public API
+%% ===================================================================
+
+start_link(Socket) ->
+    gen_server2:start_link(?MODULE, [Socket], []).
+
+init([Socket]) -> 
+    inet:setopts(Socket, [{active, once}, {packet, 4}, {header, 1}]),
+    {ok, #state{sock = Socket}}.
+
+handle_call(_Request, _From, State) ->
+    {reply, not_implemented, State}.
+
+handle_cast(_Msg, State) -> 
+    {noreply, State}.
+
+handle_info({tcp_closed, Socket}, State=#state{sock=Socket}) ->
+    {stop, normal, State};
+handle_info({tcp, _Sock, Data}, State=#state{sock=Socket}) ->
+    [MsgCode|MsgData] = Data,
+    Msg = riak_pb:decode(MsgCode, MsgData),
+    case process_message(Msg, State) of
+        {pause, NewState} ->
+            ok;
+        NewState ->
+            inet:setopts(Socket, [{active, once}])
+    end,
+    {noreply, NewState};
+
+%% Handle responses from stream_list_keys 
+handle_info({ReqId, done},
+            State=#state{sock = Socket, req=#rpblistkeysreq{}, req_ctx=ReqId}) ->
+    NewState = send_msg(#rpblistkeysresp{done = 1}, State),
+    inet:setopts(Socket, [{active, once}]),
+    {noreply, NewState#state{req = undefined, req_ctx = undefined}};
+handle_info({ReqId, {keys, []}}, State=#state{req=#rpblistkeysreq{}, req_ctx=ReqId}) ->
+    {noreply, State}; % No keys - no need to send a message, will send done soon.
+handle_info({ReqId, {keys, Keys}}, State=#state{req=#rpblistkeysreq{}, req_ctx=ReqId}) ->
+    {noreply, send_msg(#rpblistkeysresp{keys = Keys}, State)};
+
+%% Handle response from mapred_stream/mapred_bucket_stream
+handle_info({flow_results, ReqId, done},
+            State=#state{sock = Socket, req=#rpbmapredreq{}, req_ctx=ReqId}) ->
+    NewState = send_msg(#rpbmapredresp{done = 1}, State),
+    inet:setopts(Socket, [{active, once}]),
+    {noreply, NewState#state{req = undefined, req_ctx = undefined}};
+
+handle_info({flow_results, ReqId, {error, Reason}},
+            State=#state{sock = Socket, req=#rpbmapredreq{}, req_ctx=ReqId}) ->
+    NewState = send_error("~p", [Reason], State),
+    inet:setopts(Socket, [{active, once}]),
+    {noreply, NewState#state{req = undefined, req_ctx = undefined}};
+
+handle_info({flow_results, PhaseId, ReqId, Res},
+            State=#state{req=#rpbmapredreq{}, req_ctx=ReqId}) ->
+    {noreply, send_msg(#rpbmapredresp{phase=PhaseId, data = riak_pb:pbify_rpbterm(Res)}, State)};
+
+handle_info({flow_error, ReqId, Error},
+            State=#state{sock = Socket, req=#rpbmapredreq{}, req_ctx=ReqId}) ->
+    NewState = send_error("~p", [Error], State),
+    inet:setopts(Socket, [{active, once}]),
+    {noreply, NewState#state{req = undefined, req_ctx = undefined}};
+
+handle_info(_, State) -> % Ignore any late replies from gen_servers/messages from fsms
+    {noreply, State}.
+
+terminate(_Reason, _State) -> ok.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+%% ===================================================================
+%% Message Handling
+%% ===================================================================
+
+%% Process an incoming protocol buffers message.  Return either
+%% a new #state{} if new incoming messages should be received
+%% or {pause, #state{}} if the incoming TCP socket should not be
+%% set active again.
+%%
+%% If 'pause' is returned, it needs to be re-enabled by whatever
+%% callbacks are waiting for it.
+%%
+-spec process_message(msg(), #state{}) ->  #state{} | {pause, #state{}}.
+process_message(#rpbhelloreq{proto_major = 1, client_id = ClientId} = Hello, State) ->
+    {ok, Client} = riak:local_client(ClientId), % optional, will be undefined if not given
+    send_msg(#rpbhelloresp{proto_major = ?PROTO_MAJOR, 
+                           proto_minor = ?PROTO_MINOR,
+                           node = list_to_binary(atom_to_list(node())),
+                           client_id = Client:get_client_id(),
+                           server_version = get_riak_version()},
+             State#state{hello = Hello, client = Client});
+
+process_message(#rpbhelloreq{}, State) ->
+    send_error("Only proto_major 1 currently supported", [], State);
+
+process_message(_Req, #state{hello = undefined} = State) ->
+    send_error("Please say Hello first", [], State);
+
+process_message(rpbpingreq, State) ->
+    send_msg(rpbpingresp, State);
+
+process_message(#rpbgetreq{bucket=B, key=K, options=RpbOptions0}, 
+                #state{client=C} = State) ->
+    Opts = default_rpboptions(RpbOptions0),
+    case C:get(B, K, Opts#rpboptions.r) of
+        {ok, O} ->
+            PbContent = riak_pb:pbify_rpbcontents(riak_object:get_contents(O), []),
+            GetResp = #rpbgetresp{content = PbContent,
+                                  vclock = riak_pb:pbify_rpbvc(riak_object:vclock(O))},
+            send_msg(GetResp, State);
+        {error, notfound} ->
+            send_msg(#rpbgetresp{}, State);
+        {error, Reason} ->
+            send_error("~p", [Reason], State)
+    end;
+
+process_message(#rpbputreq{bucket=B, key=K, vclock=PbVC, 
+                           content = RpbContent, options=RpbOptions0}, 
+                #state{client=C} = State) ->
+
+    Opts = default_rpboptions(RpbOptions0),
+    O0 = riak_object:new(B, K, <<>>),  
+    O1 = update_rpbcontent(O0, RpbContent),
+    O  = update_pbvc(O1, PbVC),
+
+    case C:put(O, Opts#rpboptions.w, Opts#rpboptions.dw) of
+        ok ->
+            case Opts#rpboptions.return_body of % erlang_protobuffs encodes as 1/0/undefined
+                1 ->
+                    send_put_return_body(B, K, Opts, State);
+                _ ->
+                    send_msg(#rpbputresp{}, State)
+            end;
+        {error, precommit_fail} ->
+            send_error("precommit fail", [], State);
+        {error, {precommit_fail, Reason}} ->
+            send_error("precommit fail - ~p", [Reason], State);
+        {error, Reason} ->
+            send_error("~p", [Reason], State)
+    end;
+
+process_message(#rpbdelreq{bucket=B, key=K, options=RpbOptions0}, 
+                #state{client=C} = State) ->
+    Opts = default_rpboptions(RpbOptions0),
+    case C:delete(B, K, Opts#rpboptions.rw) of
+        ok ->
+            send_msg(rpbdelresp, State);
+        {error, notfound} ->  %% delete succeeds if already deleted
+            send_msg(rpbdelresp, State);
+        {error, precommit_fail} ->
+            send_error("precommit fail", [], State);
+        {error, {precommit_fail, Reason}} ->
+            send_error("precommit fail - ~p", [Reason], State);
+        {error, Reason} ->
+            send_error("~p", [Reason], State)
+    end;
+
+process_message(#rpbgetbucketpropsreq{bucket=B, names = Names}, 
+                #state{client=C} = State) ->
+    Props = C:get_bucket(B),
+    PbProps = riak_pb:pbify_bucket_props(filter_props(Names, Props)),
+    Resp = #rpbgetbucketpropsresp{properties = PbProps},
+    send_msg(Resp, State);
+
+process_message(#rpbsetbucketpropsreq{bucket=B, properties = RpbTerm}, 
+                #state{client=C} = State) ->
+    ErlProps = riak_pb:erlify_bucket_props(RpbTerm),
+    C:set_bucket(B, ErlProps),
+    send_msg(rpbsetbucketpropsresp, State);
+
+process_message(rpblistbucketsreq, 
+                #state{client=C} = State) ->
+    case C:list_buckets() of
+        {ok, Buckets} ->
+            send_msg(#rpblistbucketsresp{buckets = Buckets, done = 1}, State);
+        {error, Reason} ->
+            send_error("~p", [Reason], State)
+    end;
+
+%% Start streaming in list keys - results will be processed in handle_info
+process_message(#rpblistkeysreq{bucket=B}=Req, 
+                #state{client=C} = State) ->
+    case C:stream_list_keys(B) of
+        {ok, ReqId} ->
+            {pause, State#state{req = Req, req_ctx = ReqId}}
+    end;
+
+%% Start map/reduce job - results will be processed in handle_info
+process_message(#rpbmapredreq{input_bucket=B, input_keys=PbKeys, phases=PbQuery}=Req, 
+                #state{client=C} = State) ->
+    case riak_pb:erlify_mapred_query(PbQuery) of
+        {error, Reason} ->
+            send_error("~p", [Reason], State);
+
+        {ok, Query} ->
+            if %% Check we have B or PbKeys
+                (B =/= undefined andalso PbKeys =:= undefined) ->
+                    {ok, ReqId} = C:mapred_bucket_stream(B, Query, self(), ?DEFAULT_TIMEOUT),
+                    {pause, State#state{req = Req, req_ctx = ReqId}};
+                (B =:= undefined andalso PbKeys =/= undefined) -> %
+                    Inputs = [riak_pb:erlify_mapred_input(PbKey) || PbKey <- PbKeys],
+                    {ok, {ReqId, FSM}} = C:mapred_stream(Query, self(), ?DEFAULT_TIMEOUT),
+                    luke_flow:add_inputs(FSM, Inputs),
+                    luke_flow:finish_inputs(FSM),
+                    {pause, State#state{req = Req, req_ctx = ReqId}};
+                true ->
+                    send_error("map/reduce takes either an input_bucket or input_keys, not both", 
+                               [], State)
+            end
+    end.
+
+%% @private
+%% @doc if return_body was requested, call the client to get it and return
+send_put_return_body(B, K, Opts, State=#state{client = C}) ->
+    case C:get(B, K, Opts#rpboptions.r) of
+        {ok, O} ->
+            PbContents = riak_pb:pbify_rpbcontents(riak_object:get_contents(O), []),
+            PutResp = #rpbputresp{contents = PbContents,
+                                  vclock = riak_pb:pbify_rpbvc(riak_object:vclock(O))},
+            send_msg(PutResp, State);
+        {error, notfound} ->
+            %% TODO: Decide what to do in this case - user may have NRW set so this is possible
+            send_msg(#rpbputresp{}, State);
+        {error, Reason} ->
+            send_error("~p", [Reason], State)
+    end.
+
+%% Send a message to the client
+-spec send_msg(msg(), #state{}) -> #state{}.
+send_msg(Msg, State) ->
+    Pkt = riak_pb:encode(Msg),
+    ok = gen_tcp:send(State#state.sock, Pkt),
+    State.
+    
+%% Send an error to the client
+-spec send_error(string(), list(), #state{}) -> #state{}.
+send_error(Msg, Fmt, State) ->
+    ErrMsg = lists:flatten(io_lib:format(Msg, Fmt)),
+    send_msg(#rpberrorresp{errmsg = ErrMsg}, State).
+
+%% Update riak_object with the pbcontent provided
+update_rpbcontent(O0, RpbContent) -> 
+    {MetaData, Value} = riak_pb:erlify_rpbcontent(RpbContent),
+    O1 = riak_object:update_metadata(O0, MetaData),
+    riak_object:update_value(O1, Value).
+
+%% Update riak_object with vector clock 
+update_pbvc(O0, PbVc) ->
+    Vclock = riak_pb:erlify_rpbvc(PbVc),
+    riak_object:set_vclock(O0, Vclock).
+
+%% Set default values in the options record if none are provided.
+%% Erlang protobuffs does not support default, so have to do it here.
+default_rpboptions(undefined) ->
+    #rpboptions{r = 2, w = 2, dw = 0, rw = 2};
+default_rpboptions(RpbOptions) ->
+    lists:foldl(fun default_rpboption/2, RpbOptions, 
+                [{#rpboptions.r, 2},
+                 {#rpboptions.w, 2},
+                 {#rpboptions.dw, 0},
+                 {#rpboptions.rw, 2}]).
+
+default_rpboption({Idx, Default}, RpbOptions) ->
+    case element(Idx, RpbOptions) of
+        undefined ->
+            setelement(Idx, RpbOptions, Default);
+        _ ->
+            RpbOptions
+    end.
+            
+%% Filter out the requested properties
+filter_props(undefined, Props) ->
+    Props;
+filter_props(Names, Props) ->
+    Atoms = make_bucket_prop_names(Names, []),
+    [Prop || {Name,_Value}=Prop <- Props, lists:member(Name, Atoms)].
+        
+%% Make bucket prop names - converting any binaries/lists to atoms   
+%% Drop any unknown names
+make_bucket_prop_names([], Acc) ->
+    Acc;
+make_bucket_prop_names([Name|Rest], Acc) when is_binary(Name) ->   
+    make_bucket_prop_names([binary_to_list(Name) | Rest], Acc);
+make_bucket_prop_names([Name|Rest], Acc) when is_list(Name) ->
+    case catch list_to_existing_atom(Name) of
+        {'EXIT', _} ->
+            make_bucket_prop_names(Rest, Acc);
+        Atom ->
+            make_bucket_prop_names(Rest, [Atom | Acc])
+    end;
+make_bucket_prop_names([Name|Rest], Acc) when is_atom(Name) ->
+    make_bucket_prop_names(Rest, [Name | Acc]).
+
+%% Return the current version of riak
+-spec get_riak_version() -> binary().
+get_riak_version() ->
+    Apps = application:which_applications(),
+    {value,{riak,_,Vsn}} = lists:keysearch(riak, 1, Apps),
+    riak_pb:to_binary(Vsn).

apps/riak/src/riak_pb_socket_sup.erl

+%% -------------------------------------------------------------------
+%%
+%% riak_pb_socket_sup: supervise riak_pb_socket processes
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc.  All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License.  You may obtain
+%% a copy of the License at
+%%
+%%   http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied.  See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc supervise riak_pb_socket processes
+
+-module(riak_pb_socket_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1, stop/1]).
+-export([start_socket/1]).
+
+start_socket(Socket) -> 
+    supervisor:start_child(?MODULE, [Socket]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+stop(_S) -> ok.
+
+%% @private
+init([]) ->
+    {ok, 
+     {{simple_one_for_one, 10, 10}, 
+      [{undefined,
+        {riak_pb_socket, start_link, []},
+        temporary, brutal_kill, worker, [riak_pb_socket]}]}}.

apps/riak/src/riak_sup.erl

     LocalLogger = {riak_local_logger,
                    {riak_local_logger, start_link, []},
                    permanent, 5000, worker, [riak_local_logger]},
+    RiakPb  = [{riak_pb_listener,
+                {riak_pb_listener, start_link, []},
+                permanent, 5000, worker, [riak_pb_listener]},
+               {riak_pb_socket_sup, 
+                {riak_pb_socket_sup, start_link, []},
+                permanent, infinity, supervisor, [riak_pb_socket_sup]}
+               ],
     RiakWeb = {webmachine_mochiweb,
                  {webmachine_mochiweb, start, [riak_web:config()]},
                   permanent, 5000, worker, dynamic},
                  permanent, infinity, supervisor, [riak_js_sup]},
     % Figure out which processes we should run...
     IsWebConfigured = (riak:get_app_env(riak_web_ip) /= undefined) andalso (riak:get_app_env(riak_web_ip) /= "undefined"),
+    IsPbConfigured = (riak:get_app_env(pb_ip) /= undefined)
+        andalso (riak:get_app_env(pb_port) /= undefined),
     HasStorageBackend = (riak:get_app_env(storage_backend) /= undefined) andalso (riak:get_app_env(storage_backend) /= "undefined"),
     IsStatEnabled = (riak:get_app_env(riak_stat) == true),
 
         Connect,
         LocalLogger,
         ?IF(IsWebConfigured, RiakWeb, []),
+        ?IF(IsPbConfigured, RiakPb, []),
         ?IF(IsStatEnabled, RiakStat, []),
         RiakJsSup,
         RiakJsMgr

apps/riak/src/riakclient.proto

+/* -------------------------------------------------------------------
+**
+** riakclient.proto: Protocol buffers for riak
+**
+** Copyright (c) 2007-2010 Basho Technologies, Inc.  All Rights Reserved.
+**
+** This file is provided to you under the Apache License,
+** Version 2.0 (the "License"); you may not use this file
+** except in compliance with the License.  You may obtain
+** a copy of the License at
+**
+**   http://www.apache.org/licenses/LICENSE-2.0
+**
+** Unless required by applicable law or agreed to in writing,
+** software distributed under the License is distributed on an
+** "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+** KIND, either express or implied.  See the License for the
+** specific language governing permissions and limitations
+** under the License.
+**
+** -------------------------------------------------------------------
+*/
+/*
+** Lowest Common Denominator Protocol Buffers Client
+**   - no ENUM (protobuffs_erlang does not support)
+**
+** <length:32>  <msg_code:8> <pbmsg>
+**
+** Messages
+**  0 - RpbErrorResp
+**  1 - RpbHelloReq
+**  2 - RpbHelloResp
+**  3 - RpbPingReq - 0 length
+**  4 - RpbPingResp (pong) - 0 length
+**  5 - RpbGetReq 
+**  6 - RpbGetResp
+**  7 - RpbPutReq 
+**  8 - RpbPutResp - 0 length
+**  9 - RpbDelReq 
+** 10 - RpbDelResp
+** 11 - RpbGetBucketPropsReq
+** 12 - RpbGetBucketPropsResp
+** 13 - RpbSetBucketPropsReq
+** 14 - RpbSetBucketPropsResp
+** 15 - RpbListBucketsReq
+** 16 - RpbListBucketsResp{1,}
+** 17 - RpbListKeysReq
+** 18 - RpbListKeysResp{1,}
+** 19 - RpbMapRedReq
+** 20 - RpbMapRedResp{1,}
+**
+** Protocol 
+**  
+**   On connect, server sends RpbHello with protocol/node information.
+**   After that client can make requests and will receive responses
+** 
+**   RpbPingReq -> RpbPingResp
+**   RpbGetReq -> RpbErrorResp | RbpGetResp
+**   RpbPutReq -> RpbErrorResp | RpbPutResp
+**   RpbDelReq -> RpbErrorResp | RpbDelResp
+**   RpbGetBucketPropsReq -> RpbErrorResp | RpbGetBucketPropsResp
+**   RpbSetBucketPropsReq -> RpbErrorResp | RpbSetBucketPropsResp
+**   RpbListBucketsReq -> RpbErrorResp | RpbListBucketsResp{1,}
+**   RpbListKeysReq -> RpbErrorResp | RpbListKeysResp{1,}
+**   RpbMapRedReq -> RpbErrorResp | RpbMapRedResp{1,}
+*/
+
+message RpbErrorResp {
+    required bytes errmsg = 1;
+}
+
+// Hello request - a client must say hello to the server on connection
+// with the maximum protocol version supported.
+message RpbHelloReq {
+    required int32 proto_major = 1; // Set to 1 for now
+    optional bytes client_id = 2;   // Client id to use
+}
+
+// Hello response from the server
+message RpbHelloResp {
+    optional int32 proto_major = 1;
+    optional int32 proto_minor = 2;
+    optional bytes node = 3;
+    optional bytes client_id = 4; 
+    optional bytes server_version = 5;
+}
+
+// Get Request - retrieve bucket/key
+message RpbGetReq {
+    required bytes bucket = 1;
+    required bytes key = 2;
+    optional RpbOptions options = 3;
+}
+
+// Get Response - if the record was not found there will be no content/vclock
+message RpbGetResp {
+    repeated RpbContent content = 1;
+    optional bytes vclock = 2;
+}
+
+message RpbPutReq {
+    required bytes bucket = 1;
+    required bytes key = 2;
+    optional bytes vclock = 3;
+    required RpbContent content = 4;
+    optional RpbOptions options = 5;
+}
+
+message RpbPutResp {
+    repeated RpbContent contents = 1;
+    optional bytes vclock = 2;
+}
+
+message RpbDelReq {
+    required bytes bucket = 1;
+    required bytes key = 2;
+    optional RpbOptions options = 3;
+}
+
+message RpbGetBucketPropsReq {
+    required bytes bucket = 1;
+    repeated bytes names = 2; // list of property names to receive - if empty return all
+}
+
+message RpbGetBucketPropsResp {
+    required RpbTerm properties = 1;
+}
+
+message RpbSetBucketPropsReq {
+    required bytes bucket = 1;
+    required RpbTerm properties = 2; 
+    optional RpbOptions options = 3;
+}
+
+message RpbListBucketsReq {
+}
+
+message RpbListBucketsResp {
+    repeated bytes buckets = 1;
+    optional bool done = 2;
+}
+
+message RpbListKeysReq {
+    required bytes bucket = 1;
+    optional RpbOptions options = 2;
+}
+
+message RpbListKeysResp {
+    repeated bytes keys = 1;
+    optional bool done = 2;
+}
+
+message RpbMapRedReq {
+    optional bytes input_bucket = 1; // Input bucket
+    repeated RpbMapRedInput input_keys = 2; // *OR* input_keys - not both
+    repeated RpbMapRedPhase phases = 3; // query is reserved word in erlang
+}
+message RpbMapRedResp {
+    optional int32 phase = 1;
+    optional RpbTerm data = 2;
+    optional bool done = 3;
+}
+
+message RpbContent {
+    required bytes value = 1;
+    optional RpbTerm metadata = 2;
+    optional bytes content_type = 3;
+    optional bytes charset = 4;
+    optional bytes content_encoding = 5;
+    optional bytes vtag = 6;
+    repeated RpbLink links = 7;
+    optional int32 last_mod = 8;
+    optional int32 last_mod_usecs = 9;
+    repeated RpbPair usermeta = 10;
+}
+
+message RpbPair {
+    required bytes key = 1;
+    optional bytes value = 2;
+}
+
+message RpbLink {
+    optional bytes bucket = 1;
+    optional bytes key = 2;
+    optional bytes tag = 3;
+}
+
+message RpbOptions {
+    optional int32 r = 1;
+    optional int32 w = 2;
+    optional int32 dw = 3;
+    optional int32 rw = 4;
+    optional bool return_body = 5;
+}
+
+/* Type values - no enums
+** 1 = integer
+** 2 = boolean (uses int_val. 0 == false, !0 == true)
+** 3 = string
+** 5 = object - JSON-like name/value pairs
+** 6 = array 
+*/
+message RpbTerm {
+    required int32 type = 1;
+    optional int32 int_value = 3;
+    optional bytes string_value = 4;
+    repeated RpbObjectEntry object_entries = 5;
+    repeated RpbTerm array_values = 6;
+}
+message RpbObjectEntry {
+    required bytes name = 1;
+    required RpbTerm value = 2;
+}
+
+message RpbMapRedInput {
+    required bytes bucket = 1;
+    required bytes key = 2;     
+    optional RpbTerm data = 3; //* What format should this be in? - maybe RpbTerm?
+}
+
+message RpbMapRedPhase {
+    required bytes type = 1;
+    optional bytes language = 2;
+    optional bytes source = 3;
+    optional bytes bucket = 4;
+    optional bytes key = 5;
+    optional bytes module = 6;
+    optional bytes function = 7;
+    optional bool keep = 8;
+    optional bytes tag = 9;
+    optional RpbTerm arg = 10;
+}
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.