Source

riak / src / riak_client.erl

Diff from to

File src/riak_client.erl

 %%      See the map/reduce documentation for explanation of behavior.
 mapred(Inputs,Query,Timeout)
   when is_list(Inputs), is_list(Query), is_integer(Timeout) ->
-    gen_server2:call({riak_api,Node}, {mapred,Inputs,Query,Timeout}, Timeout).
-
+    Me = self(),
+    spawn(Node, riak_mapreduce_fsm, start, [Inputs,Query,Timeout,Me]),
+    receive
+        {error, Err} -> {error, Err};
+        {ok, Res} -> {ok, Res}
+    after Timeout ->
+            {error, timeout}
+    end.
 
 %% @spec get(riak_object:bucket(), riak_object:key(), R :: integer()) ->
 %%       {ok, riak_object:riak_object()} |
 %%       {error, Err :: term()}
 %% @doc Fetch the object at Bucket/Key.  Return a value as soon as R
 %%      nodes have responded with a value or error, or TimeoutMillisecs passes.
-get(Bucket, Key, R, Timeout) when is_atom(Bucket),
-                                  (is_list(Key) orelse is_binary(Key)),
+get(Bucket, Key, R, Timeout) when is_atom(Bucket), is_binary(Key),
                                   is_integer(R), is_integer(Timeout) ->
-    gen_server2:call({riak_api,Node}, {get,Bucket,Key,R,Timeout}, Timeout).
-
+    Me = self(),
+    spawn(Node, riak_get_fsm, start, [Bucket,Key,R,Timeout,Me]),
+    receive
+        {error, Err} -> {error, Err};
+        {ok, RObj} -> {ok, RObj}
+    after Timeout ->
+            {error, timeout}
+    end.
 
 %% @spec put(RObj :: riak_object:riak_object(), W :: integer()) ->
 %%        ok |
 %%      TimeoutMillisecs passes.
 put(RObj, W, DW, Timeout) ->
     R0 = riak_object:increment_vclock(RObj, ClientId),
-    gen_server2:call({riak_api,Node}, {put,R0,W,DW,Timeout}, Timeout).
-
+    Me = self(),
+    spawn(Node, riak_put_fsm, start, [R0,W,DW,Timeout,Me]),
+    receive
+        ok -> ok
+    after Timeout ->
+            {error, timeout}
+    end.
 
 %% @spec delete(riak_object:bucket(), riak_object:key(), RW :: integer()) ->
 %%        ok |
 %% @doc Delete the object at Bucket/Key.  Return a value as soon as RW
 %%      nodes have responded with a value or error, or TimeoutMillisecs passes.
 delete(Bucket,Key,RW,Timeout) ->
-    gen_server2:call({riak_api,Node}, {delete,Bucket,Key,RW,Timeout}, Timeout).
-
+    Me = self(),
+    spawn(Node, riak_delete, delete, [Bucket,Key,RW,Timeout,Me]),
+    receive
+        ok -> ok
+    after Timeout ->
+            {error, timeout}
+    end.
 
 %% @spec list_keys(riak_object:bucket()) ->
 %%       {ok, [Key :: riak_object:key()]} |
 %%      Key lists are updated asynchronously, so this may be slightly
 %%      out of date if called immediately after a put or delete.
 list_keys(Bucket, Timeout) -> 
-    gen_server2:call({riak_api,Node}, {list_keys,Bucket,Timeout}, Timeout).
+    Me = self(),
+    spawn(Node, riak_keys_fsm, start, [Bucket,Timeout,Me]),
+    receive
+        {ok, Reply} -> {ok, Reply}
+    after Timeout ->
+            {error, timeout}
+    end.
 
 %% @spec set_bucket(riak_object:bucket(), [BucketProp :: {atom(),term()}]) -> ok
 %% @doc Set the given properties for Bucket.
 %%      to ensure expected per-bucket behavior.
 %% See riak_bucket for expected useful properties.
 set_bucket(BucketName,BucketProps) ->
-    gen_server2:call({riak_api,Node}, {set_bucket,BucketName,BucketProps}).
+    rpc:call(Node,riak_bucket,set_bucket,[BucketName,BucketProps]).
 %% @spec get_bucket(riak_object:bucket()) -> [BucketProp :: {atom(),term()}]
 %% @doc Get all properties for Bucket.
 %% See riak_bucket for expected useful properties.
 get_bucket(BucketName) ->
-    gen_server2:call({riak_api,Node}, {get_bucket,BucketName}).
-
+    rpc:call(Node,riak_bucket,get_bucket,[BucketName]).
 %% @spec reload_all(Module :: atom()) -> term()
 %% @doc Force all Riak nodes to reload Module.
 %%      This is used when loading new modules for map/reduce functionality.
-reload_all(Module) -> gen_server2:call({riak_api,Node}, {reload_all, Module}).
+reload_all(Module) -> rpc:call(Node,riak_util,reload_all,[Module]).
 
 %% @spec remove_from_cluster(ExitingNode :: atom()) -> term()
 %% @doc Cause all partitions owned by ExitingNode to be taken over
 %%      by other nodes.
 remove_from_cluster(ExitingNode) ->
-    gen_server2:call({riak_api,Node}, {remove_from_cluster,ExitingNode}).
+    rpc:call(Node,riak_ring_gossiper,remove_from_cluster,[ExitingNode]).
 
 %% @spec send_event(EventName::atom(), EventDetail::term()) -> ok
 %% @doc  Send a client-generated event to the Riak eventer.
 send_event(EventName, EventDetail) ->
-    gen_server2:cast({riak_api,Node},
-                     {send_event,ClientId,EventName,EventDetail}).
+    rpc:call(Node,riak_eventer,notify,
+             [client_event, EventName, {ClientId, EventDetail}]).