Commits

Anonymous committed 9030e59 Merge

merge with bitbucket

  • Participants
  • Parent commits 8dbfea8, b7edc6c

Comments (0)

Files changed (4)

 Kirill A. Korinskiy
 Kevin Smith
 Jonathan Lee
+Sean Cribbs

File client_lib/jiak.rb

 #     Version 2.0 (the "License"); you may not use this file
 #     except in compliance with the License.  You may obtain
 #     a copy of the License at
-   
+
 #       http://www.apache.org/licenses/LICENSE-2.0
-   
+
 #     Unless required by applicable law or agreed to in writing,
 #     software distributed under the License is distributed on an
 #     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 #     KIND, either express or implied.  See the License for the
 #     specific language governing permissions and limitations
-#     under the License.    
+#     under the License.
 
 # A Ruby interface for speaking to Riak.
 # Example usage code can be found at the end of this file.
 # This library requires that you have a library providing 'json'
 # installed ("gem install json" will get you one).
+#
+# 
+
 require 'net/http'
 require 'rubygems'
 require 'json'
 
-class JiakClient
-  def initialize(ip, port, jiakPrefix='/jiak/', options={})
-    @ip = ip
-    @port = port
-    @prefix = jiakPrefix
-    @opts = options
+module Riak
 
-    if (@opts['clientId'])
-      if (@opts['clientId'].kind_of? Integer &&
-          @opts['clientId'] > 0 &&
-          @opts['clientId'] < 4294967296)
-        @opts['clientId'] = base64(@opts['clientId'])
+  class ClientException < ::Exception; end
+
+  class Client
+    MAX_CLIENT_ID = 4294967296
+    
+    def initialize(ip, port, jiakPrefix='/jiak/', options={})
+      @ip, @port, @prefix, @opts = ip, port, jiakPrefix, options
+
+      clientId = @opts['clientId']
+      if clientId
+        @opts['clientId'] = base64(clientId) if (0..MAX_CLIENT_ID).include?(clientId)
+      else
+        @opts['clientId'] = base64(rand(MAX_CLIENT_ID))
       end
-    else
-      @opts['clientId'] = base64(rand(4294967296))
-    end
-  end
-
-  # Set the schema for 'bucket'.  The schema parameter
-  # must be a hash with at least an 'allowed_fields' field.
-  # Other valid fields are 'requried_fields', 'read_mask',
-  # and 'write_mask'
-  def set_bucket_schema(bucket, schema)
-    if (!schema['required_fields'])
-      schema['required_fields'] = []
-    end
-    if (!schema['read_mask'])
-      schema['read_mask'] = schema['allowed_fields']
-    end
-    if (!schema['write_mask'])
-      schema['write_mask'] = schema['read_mask']
     end
 
-    do_req(set_data(Net::HTTP::Put.new(path(bucket)),
-                    {'schema'=>schema}),
-           '204')
-  end
+    # Set the schema for 'bucket'.  The schema parameter
+    # must be a hash with at least an 'allowed_fields' field.
+    # Other valid fields are 'requried_fields', 'read_mask',
+    # and 'write_mask'
+    def set_bucket_schema(bucket, schema)
+      schema['required_fields'] ||= []
+      schema['read_mask']       ||= schema['required_fields']
+      schema['write_mask']      ||= schema['read_mask']
 
-  # Get the schema and key list for 'bucket'
-  def list_bucket(bucket)
-    do_req(Net::HTTP::Get.new(path(bucket)), '200')
-  end
-  
-  # Get the object stored in 'bucket' at 'key'
-  def fetch(bucket, key, r=nil)
-    do_req(Net::HTTP::Get.new(path(bucket, key,
-                                   {'r'=>(r||@opts['r'])})),
-           '200')
-  end
-
-  # Store 'object' in Riak.  If the object has not defined
-  # its 'key' field, a key will be chosen for it by the server.
-  def store(object, w=nil, dw=nil, r=nil)
-    q = {
-      'returnbody'=>'true',
-      'w'=>(w||@opts['w']),
-      'dw'=>(dw||@opts['dw']),
-      'r'=>(r||@opts['r'])
-    }
-    if (object['key'])
-      req = Net::HTTP::Put.new(path(object['bucket'], object['key'], q),
-                               initheader={"X-Riak-ClientId" => @opts['clientId']})
-      code = '200'
-    else
-      req = Net::HTTP::Post.new(path(object['bucket'], nil, q),
-                                initheader={"X-Riak-ClientId" => @opts['clientId']})
-      code = '201'
+      do_req(set_data(Net::HTTP::Put.new(path(bucket)),
+                      {'schema'=>schema}),
+             '204')
     end
 
-    do_req(set_data(req, object), code)
-  end
-
-  # Delete the data stored in 'bucket' at 'key'
-  def delete(bucket, key, rw=nil)
-    do_req(Net::HTTP::Delete.new(path(bucket, key,
-                                      {'rw'=>(rw||@opts['rw'])}),
-                                 initheader={"X-Riak-ClientId" => @opts['clientId']}),
-           '204')
-  end
-
-  # Follow links from the object stored in 'bucket' at 'key'
-  # to other objects.  The 'spec' parameter should be an array
-  # of hashes, each hash optinally defining 'bucket', 'tag',
-  # and 'acc' fields.  If a field is not defined in a spec hash,
-  # the wildcard '_' will be used instead.
-  def walk(bucket, key, spec)
-    do_req(Net::HTTP::Get.new(path(bucket, key)+convert_walk_spec(spec)),
-           '200')
-  end
-
-  def convert_walk_spec(spec)
-    acc = ''
-    spec.each do |step|
-      acc += URI.encode(step['bucket']||'_')+','+
-        URI.encode(step['tag']||'_')+','+
-        (step['acc']||'_')+'/'
-    end
-    acc
-  end
-
-  def path(bucket, key=nil, reqOpts={})
-    p = @prefix + URI.encode(bucket) + '/'
-    if (key)
-      p += URI.encode(key) + '/'
+    # Get the schema and key list for 'bucket'
+    def list_bucket(bucket)
+      do_req(Net::HTTP::Get.new(path(bucket)), '200')
     end
 
-    q = [];
-    reqOpts.each do |n,v|
-      if (v): q.push "#{n}=#{v}" end
+    # Get the object stored in 'bucket' at 'key'
+    def fetch(bucket, key, r=nil)
+      do_req(Net::HTTP::Get.new(path(bucket, key,
+                                     {'r'=>(r||@opts['r'])})),
+             '200')
     end
-    if (q.length > 0): p += '?'+(q.join('&')) end
 
-    p
+    # Store 'object' in Riak.  If the object has not defined
+    # its 'key' field, a key will be chosen for it by the server.
+    def store(object, w=nil, dw=nil, r=nil)
+      q = {
+        'returnbody'=>'true',
+        'w'=>(w||@opts['w']),
+        'dw'=>(dw||@opts['dw']),
+        'r'=>(r||@opts['r'])
+      }
+      if (object['key'])
+        req = Net::HTTP::Put.new(path(object['bucket'], object['key'], q),
+                                 initheader={"X-Riak-ClientId" => @opts['clientId']})
+        code = '200'
+      else
+        req = Net::HTTP::Post.new(path(object['bucket'], nil, q),
+                                  initheader={"X-Riak-ClientId" => @opts['clientId']})
+        code = '201'
+      end
+
+      do_req(set_data(req, object), code)
+    end
+
+    # Delete the data stored in 'bucket' at 'key'
+    def delete(bucket, key, rw=nil)
+      do_req(Net::HTTP::Delete.new(path(bucket, key,
+                                        {'rw'=>(rw||@opts['rw'])}),
+                                   initheader={"X-Riak-ClientId" => @opts['clientId']}),
+             '204')
+    end
+
+    # Follow links from the object stored in 'bucket' at 'key'
+    # to other objects.  The 'spec' parameter should be an array
+    # of hashes, each hash optinally defining 'bucket', 'tag',
+    # and 'acc' fields.  If a field is not defined in a spec hash,
+    # the wildcard '_' will be used instead.
+    def walk(bucket, key, spec)
+      do_req(Net::HTTP::Get.new(path(bucket, key)+convert_walk_spec(spec)),
+             '200')
+    end
+
+    private
+      def convert_walk_spec(spec)
+        spec.map do |step|
+          URI.encode [(step['bucket']||'_'),(step['tag']||'_'),(step['acc']||'_')].join(",")
+        end.join("/")
+      end
+
+      def path(bucket, key=nil, reqOpts={})
+        path = URI.encode(@prefix + [bucket, key].compact.join("/"))
+        q = URI.encode(reqOpts.map {|k,v| !v.nil? && "#{k}=#{v}" }.compact.join("&"))
+        q.empty? ? path : [path, q].join("?")
+      end
+
+      def set_data(req, data)
+        req.content_type='application/json'
+        req.body=JSON.generate(data)
+        req
+      end
+
+      def do_req(req, expect)
+        res = Net::HTTP.start(@ip, @port) {|http|
+          http.request(req)
+        }
+        if (res.code == expect)
+          res.body ? JSON.parse(res.body) : true
+        else
+          raise ClientException.new(res.code+' '+res.message+' '+res.body)
+        end
+      end
+
+      def base64(n)
+        base64digits = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="
+        "%c%c%c%c%c%c==" %
+          [base64digits[(n >> 26)],
+           base64digits[((n >> 20)&63)],
+           base64digits[((n >> 14)&63)],
+           base64digits[((n >> 8)&63)],
+           base64digits[((n >> 2)&63)],
+           base64digits[((n << 4)&63)]]
+      end
   end
-
-  def set_data(req, data)
-    req.content_type='application/json'
-    req.body=JSON.generate(data)
-    req
-  end
-
-  def do_req(req, expect)
-    res = Net::HTTP.start(@ip, @port) {|http|
-      http.request(req)
-    }
-    if (res.code == expect)
-      res.body ? JSON.parse(res.body) : true
-    else
-      raise JiakException.new(res.code+' '+res.message+' '+res.body)
-    end
-  end
-
-  def base64(n)
-    base64digits = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="
-    "%c%c%c%c%c%c==" %
-      [base64digits[(n >> 26)],
-       base64digits[((n >> 20)&63)],
-       base64digits[((n >> 14)&63)],
-       base64digits[((n >> 8)&63)],
-       base64digits[((n >> 2)&63)],
-       base64digits[((n << 4)&63)]]
-  end
-  private:convert_walk_spec
-  private:path
-  private:set_data
-  private:do_req
-  private:base64
 end
 
-class JiakException<Exception
- end
-
 # Example usage
 if __FILE__ == $0
-  jc = JiakClient.new("127.0.0.1", 8000)
+  jc = Riak::Client.new("127.0.0.1", 8098)
 
   puts "Creating bucket foo..."
   jc.set_bucket_schema('foo', {'allowed_fields'=>['bar','baz']})

File deps/webmachine/src/wmtrace_resource.erl

 -record(ctx, {trace_dir, trace}).
 
 -define(MAP_EXTERNAL, "static/map.png").
--define(MAP_INTERNAL, "deps/webmachine/docs/http-headers-status-v3.png").
+-define(MAP_INTERNAL, "docs/http-headers-status-v3.png").
 -define(SCRIPT_EXTERNAL, "static/wmtrace.js").
--define(SCRIPT_INTERNAL, "deps/webmachine/trace/wmtrace.js").
+-define(SCRIPT_INTERNAL, "trace/wmtrace.js").
 -define(STYLE_EXTERNAL, "static/wmtrace.css").
--define(STYLE_INTERNAL, "deps/webmachine/trace/wmtrace.css").
+-define(STYLE_INTERNAL, "trace/wmtrace.css").
 
 %%
 %% Dispatch Modifiers
                      Ctx}
             end;
         ?MAP_EXTERNAL ->
-            {filelib:is_file(?MAP_INTERNAL), RD, Ctx};
+            {filelib:is_file(wm_path(?MAP_INTERNAL)), RD, Ctx};
         ?SCRIPT_EXTERNAL ->
-            {filelib:is_file(?SCRIPT_INTERNAL), RD, Ctx};
+            {filelib:is_file(wm_path(?SCRIPT_INTERNAL)), RD, Ctx};
         ?STYLE_EXTERNAL ->
-            {filelib:is_file(?STYLE_INTERNAL), RD, Ctx};
+            {filelib:is_file(wm_path(?STYLE_INTERNAL)), RD, Ctx};
         TraceName ->
             TracePath = filename:join([Ctx#ctx.trace_dir, TraceName]),
             {filelib:is_file(TracePath), RD, Ctx#ctx{trace=TracePath}}
     end.
 
+wm_path(File) ->
+    filename:join(code:lib_dir(webmachine), File).
+
 content_types_provided(RD, Ctx) ->
     case wrq:disp_path(RD) of
         ?MAP_EXTERNAL ->
          ]).
 
 produce_javascript(RD, Ctx) ->
-    {ok, Script} = file:read_file(?SCRIPT_INTERNAL),
+    {ok, Script} = file:read_file(wm_path(?SCRIPT_INTERNAL)),
     {Script, RD, Ctx}.
 
 produce_map(RD, Ctx) ->
-    {ok, Map} = file:read_file(?MAP_INTERNAL),
+    {ok, Map} = file:read_file(wm_path(?MAP_INTERNAL)),
     {Map, RD, Ctx}.
 
 produce_css(RD, Ctx) ->
-    {ok, Script} = file:read_file(?STYLE_INTERNAL),
+    {ok, Script} = file:read_file(wm_path(?STYLE_INTERNAL)),
     {Script, RD, Ctx}.
 
 %%

File src/riak_vnode.erl

 
 %% @private
 % upon receipt of a handoff datum, there is no client FSM
-do_diffobj_put(BKey, DiffObj, 
+do_diffobj_put(BKey={Bucket,_}, DiffObj, 
        _StateData=#state{mod=Mod,modstate=ModState}) ->
     ReqID = erlang:phash2(erlang:now()),
     case syntactic_put_merge(Mod, ModState, BKey, DiffObj, ReqID) of
         {newobj, NewObj} ->
-            Val = term_to_binary(NewObj),
+            AMObj = enforce_allow_mult(NewObj, riak_bucket:get_bucket(Bucket)),
+            Val = term_to_binary(AMObj),
             Mod:put(ModState, BKey, Val),
             riak_stat:update(vnode_put);
         _ -> nop
             gen_fsm:send_event(FSM_pid, {dw, Idx, ReqID});
         {newobj, NewObj} ->
             VC = riak_object:vclock(NewObj),
-            ObjToStore = riak_object:set_vclock(NewObj,
+            AMObj = enforce_allow_mult(NewObj, BProps),
+            ObjToStore = riak_object:set_vclock(AMObj,
                                            vclock:prune(VC,PruneTime,BProps)),
             Val = term_to_binary(ObjToStore),
             case Mod:put(ModState, BKey, Val) of
     end.
 
 %% @private
+%% enforce allow_mult bucket property so that no backend ever stores
+%% an object with multiple contents if allow_mult=false for that bucket
+enforce_allow_mult(Obj, BProps) ->
+    case proplists:get_value(allow_mult, BProps) of
+        true -> Obj;
+        _ ->
+            case riak_object:get_contents(Obj) of
+                [_] -> Obj;
+                Mult ->
+                    {MD, V} = select_newest_content(Mult),
+                    riak_object:set_contents(Obj, [{MD, V}])
+            end
+    end.
+
+%% @private
+%% choose the latest content to store for the allow_mult=false case
+select_newest_content(Mult) ->
+    hd(lists:sort(
+         fun({MD0, _}, {MD1, _}) ->
+                 riak_util:compare_dates(
+                   dict:fetch(<<"X-Riak-Last-Modified">>, MD0),
+                   dict:fetch(<<"X-Riak-Last-Modified">>, MD1))
+         end,
+         Mult)).
+
+%% @private
 do_map(ClientPid,{map,FunTerm,Arg,_Acc},
        BKey,KeyData,Cache,Mod,ModState,VNode) ->
     CacheVal = case FunTerm of