Sean Cribbs avatar Sean Cribbs committed 754d797

Add put request, factor out decoding content.

Comments (0)

Files changed (4)

 *.so
 *.bundle
 Makefile
+mkmf.log

riak-client/ext/mri/riakpb-ruby-decode.cc

   uint8_t msgcode;
   socket = SOCKET;
   str = ReadSocket(socket, 5);
+  // At this point, a bad request might have killed the socket. We
+  // need to raise an error and potentially reopen the socket.
+  // if(NIL_P(str)) return Qnil;
   msglen = ntohl(((uint32_t*)RSTRING_PTR(str))[0]) - 1;
   msgcode = (uint8_t)(RSTRING_PTR(str)[4]);
   if(msglen == 0){
     case GetResp:
       return rpb_decode_get(str);
       break;
+    case PutResp:
+      return rpb_decode_put(str);
+      break;
     case ListBucketsResp:
       return rpb_decode_list_buckets(str);
       break;
 
 VALUE rpb_decode_get(VALUE string){
   RpbGetResp res = RpbGetResp();
-  VALUE obj, values, contents, links, link, usermeta;
-  int i,j,k;
+  VALUE obj, values, contents;
+  int i;
   DecodeProtobuff(res, string);
 
   obj = rb_hash_new();
   values = rb_ary_new2(res.content_size());
   rb_hash_aset(obj, rb_str_new2("values"), values);
   for(i = 0; i < res.content_size(); i++){
-    contents = rb_hash_new();
-    rb_ary_push(values, contents);
-    rb_hash_aset(contents, rb_str_new2("data"), rb_str_new2(res.content(i).value().c_str()));
-    if(res.content(i).has_content_type()){
-      rb_hash_aset(contents, rb_str_new2("content-type"), rb_str_new2(res.content(i).content_type().c_str()));
-    }
-    if(res.content(i).has_charset()){
-      rb_hash_aset(contents, rb_str_new2("charset"), rb_str_new2(res.content(i).charset().c_str()));
-    }
-    if(res.content(i).has_content_encoding()){
-      rb_hash_aset(contents, rb_str_new2("encoding"), rb_str_new2(res.content(i).content_encoding().c_str()));
-    }
-    if(res.content(i).has_vtag()){
-      rb_hash_aset(contents, rb_str_new2("vtag"), rb_str_new2(res.content(i).vtag().c_str()));
-    }
-    if(res.content(i).has_last_mod()){
-      rb_hash_aset(contents, rb_str_new2("last-modified"), UINT2NUM(res.content(i).last_mod()));
-    }
-    if(res.content(i).has_last_mod_usecs()){
-      rb_hash_aset(contents, rb_str_new2("last-modified-usecs"), UINT2NUM(res.content(i).last_mod_usecs()));
-    }
-    links = rb_ary_new2(res.content(i).links_size());
-    rb_hash_aset(contents, rb_str_new2("links"), links);
-    for(j = 0; j < res.content(i).links_size(); j++){
-      link = rb_hash_new();
-      rb_ary_push(links, link);
-      if(res.content(i).links(j).has_bucket()){
-        rb_hash_aset(link, rb_str_new2("bucket"), rb_str_new2(res.content(i).links(j).bucket().c_str()));
-      }
-      if(res.content(i).links(j).has_key()){
-        rb_hash_aset(link, rb_str_new2("key"), rb_str_new2(res.content(i).links(j).key().c_str()));
-      }
-      if(res.content(i).links(j).has_tag()){
-        rb_hash_aset(link, rb_str_new2("tag"), rb_str_new2(res.content(i).links(j).tag().c_str()));
-      }
-    }
-    usermeta = rb_hash_new();
-    rb_hash_aset(contents, rb_str_new2("meta"), usermeta);
-    for(k = 0; k < res.content(i).usermeta_size(); k++){
-      if(res.content(i).usermeta(k).has_value())
-        rb_hash_aset(usermeta, rb_str_new2(res.content(i).usermeta(k).key().c_str()), rb_str_new2(res.content(i).usermeta(k).value().c_str()));
-      else
-        rb_hash_aset(usermeta, rb_str_new2(res.content(i).usermeta(k).key().c_str()), Qnil);
-    }
+    rb_ary_push(values, rpb_decode_content(res.mutable_content(i)));
+  }
+  return obj;
+}
+
+VALUE rpb_decode_put(VALUE string) {
+  RpbPutResp res = RpbPutResp();
+  VALUE obj, values;
+  int i;
+  DecodeProtobuff(res, string);
+
+  obj = rb_hash_new();
+  if(res.has_vclock()){
+    rb_hash_aset(obj, rb_str_new2("vclock"), rb_str_new2(res.vclock().c_str()));
+  }
+  values = rb_ary_new2(res.content_size());
+  rb_hash_aset(obj, rb_str_new2("values"), values);
+  for(i = 0; i < res.content_size(); i++){
+    rb_ary_push(values, rpb_decode_content(res.mutable_content(i)));
   }
   return obj;
 }
   }
   return hash;
 }
+
+VALUE rpb_decode_content(RpbContent *c){
+  VALUE contents = rb_hash_new(), links, link, usermeta;
+  int j,k;
+  rb_hash_aset(contents, rb_str_new2("raw_data"), rb_str_new2(c->value().c_str()));
+  if(c->has_content_type()){
+    rb_hash_aset(contents, rb_str_new2("content-type"), rb_str_new2(c->content_type().c_str()));
+  }
+  if(c->has_charset()){
+    rb_hash_aset(contents, rb_str_new2("charset"), rb_str_new2(c->charset().c_str()));
+  }
+  if(c->has_content_encoding()){
+    rb_hash_aset(contents, rb_str_new2("encoding"), rb_str_new2(c->content_encoding().c_str()));
+  }
+  if(c->has_vtag()){
+    rb_hash_aset(contents, rb_str_new2("vtag"), rb_str_new2(c->vtag().c_str()));
+  }
+  if(c->has_last_mod()){
+    rb_hash_aset(contents, rb_str_new2("last-modified"), UINT2NUM(c->last_mod()));
+  }
+  if(c->has_last_mod_usecs()){
+    rb_hash_aset(contents, rb_str_new2("last-modified-usecs"), UINT2NUM(c->last_mod_usecs()));
+  }
+  links = rb_ary_new2(c->links_size());
+  rb_hash_aset(contents, rb_str_new2("links"), links);
+  for(j = 0; j < c->links_size(); j++){
+    link = rb_hash_new();
+    rb_ary_push(links, link);
+    if(c->links(j).has_bucket()){
+      rb_hash_aset(link, rb_str_new2("bucket"), rb_str_new2(c->links(j).bucket().c_str()));
+    }
+    if(c->links(j).has_key()){
+      rb_hash_aset(link, rb_str_new2("key"), rb_str_new2(c->links(j).key().c_str()));
+    }
+    if(c->links(j).has_tag()){
+      rb_hash_aset(link, rb_str_new2("tag"), rb_str_new2(c->links(j).tag().c_str()));
+    }
+  }
+  usermeta = rb_hash_new();
+  rb_hash_aset(contents, rb_str_new2("meta"), usermeta);
+  for(k = 0; k < c->usermeta_size(); k++){
+    if(c->usermeta(k).has_value())
+      rb_hash_aset(usermeta, rb_str_new2(c->usermeta(k).key().c_str()), rb_str_new2(c->usermeta(k).value().c_str()));
+    else
+      rb_hash_aset(usermeta, rb_str_new2(c->usermeta(k).key().c_str()), Qnil);
+  }
+  return contents;
+}

riak-client/ext/mri/riakpb-ruby.cc

     return rpb_decode_response(self);
   }
 
+  VALUE rpb_put(int argc, VALUE *argv, VALUE self){
+    VALUE robject, w, dw, returnbody; // inputs
+    VALUE data, links, link, keys, metak, metav,
+      vclock, ctype, meta, bucket, key, tag; // internal uses
+    RpbPutReq *req = new RpbPutReq();
+    RpbContent *content = req->mutable_content();
+    RpbPair *pair;
+    RpbLink *pblink;
+    rb_scan_args(argc, argv, "13", &robject, &returnbody, &w, &dw);
+
+    // Set bucket, key, w, dw, returnbody
+    bucket = rb_funcall(rb_funcall(robject, rb_intern("bucket"), 0), rb_intern("name"), 0);
+    key = rb_funcall(robject, rb_intern("key"), 0);
+    req->set_bucket((void*)RSTRING_PTR(bucket), (size_t)RSTRING_LEN(bucket));
+    req->set_key((void*)RSTRING_PTR(key), (size_t)RSTRING_LEN(key));
+    if(!NIL_P(w))
+      req->set_w(QuorumValue(w));
+    if(!NIL_P(dw))
+      req->set_dw(QuorumValue(dw));
+    if(RTEST(returnbody))
+      req->set_return_body(1);
+
+    // Set vclock if present. For now Base64-decode it, until we do
+    // it in the client backend.
+    vclock = rb_funcall(robject, rb_intern("vclock"), 0);
+    vclock = rb_funcall(rb_const_get(rb_cObject, rb_intern("Base64")), rb_intern("decode64"), 1, vclock);
+    if(!NIL_P(vclock))
+      req->set_vclock((void*)RSTRING_PTR(vclock), (size_t)RSTRING_LEN(vclock));
+
+    // Set the data
+    data = rb_funcall(robject, rb_intern("raw_data"), 0);
+    content->set_value((void*)RSTRING_PTR(data), (size_t)RSTRING_LEN(data));
+
+    // Set content type if present
+    ctype = rb_funcall(robject, rb_intern("content_type"), 0);
+    if(!NIL_P(ctype))
+      content->set_content_type((void*)RSTRING_PTR(ctype), (size_t)RSTRING_LEN(ctype));
+
+    // Set user meta
+    meta = rb_funcall(robject, rb_intern("meta"), 0);
+    if(!NIL_P(meta)){
+      keys = rb_funcall(meta, rb_intern("keys"), 0);
+      while(!NIL_P(metak = rb_ary_shift(keys))){
+        pair = content->add_usermeta();
+        metav = rb_funcall(rb_hash_aref(meta, metak), rb_intern("to_s"), 0);
+        pair->set_key((void*)RSTRING_PTR(metak), (size_t)RSTRING_LEN(metak));
+        pair->set_value((void*)RSTRING_PTR(metav), (size_t)RSTRING_LEN(metav));
+      }
+    }
+
+    // Set links
+    links = rb_funcall(rb_funcall(robject, rb_intern("links"), 0), rb_intern("to_a"), 0);
+    if(!NIL_P(links) && !RTEST(rb_funcall(links, rb_intern("empty?"), 0))){
+      while(!NIL_P(link = rb_ary_shift(links))){
+        if(NIL_P(rb_funcall(link, rb_intern("key"), 0)))
+          continue;
+        pblink = content->add_links();
+        bucket = rb_funcall(link, rb_intern("bucket"), 0);
+        key = rb_funcall(link, rb_intern("key"), 0);
+        tag = rb_funcall(link, rb_intern("tag"), 0);
+        pblink->set_bucket((void*)RSTRING_PTR(bucket), (size_t)RSTRING_LEN(bucket));
+        pblink->set_key((void*)RSTRING_PTR(key), (size_t)RSTRING_LEN(key));
+        pblink->set_tag((void*)RSTRING_PTR(tag), (size_t)RSTRING_LEN(tag));
+      }
+    }
+
+    WriteProtobuff(SOCKET, PutReq, req);
+    return rpb_decode_response(self);
+  }
+
   VALUE rpb_delete(int argc, VALUE *argv, VALUE self){
     RpbDelReq req = RpbDelReq();
     VALUE bucket, key, rw;
     rb_alias(mProtobufs, rb_intern("client_id="), rb_intern("set_client_id"));
     rb_define_method(mProtobufs, "get_server_info", RUBY_METHOD_FUNC(rpb_get_server_info), 0);
     rb_define_method(mProtobufs, "get", RUBY_METHOD_FUNC(rpb_get), -1);
+    rb_define_method(mProtobufs, "put", RUBY_METHOD_FUNC(rpb_put), -1);
     rb_define_method(mProtobufs, "delete", RUBY_METHOD_FUNC(rpb_delete), -1);
     rb_define_method(mProtobufs, "list_buckets", RUBY_METHOD_FUNC(rpb_list_buckets), 0);
     rb_define_method(mProtobufs, "list_keys", RUBY_METHOD_FUNC(rpb_list_keys), 1);

riak-client/ext/mri/riakpb-ruby.h

   VALUE rpb_set_client_id(VALUE, VALUE);
   VALUE rpb_get_server_info(VALUE);
   VALUE rpb_get(int, VALUE*, VALUE);
+  VALUE rpb_put(int, VALUE*, VALUE);
   VALUE rpb_delete(int, VALUE*, VALUE);
   VALUE rpb_list_buckets(VALUE);
   VALUE rpb_list_keys(VALUE, VALUE);
 VALUE rpb_decode_get_client_id(VALUE);
 VALUE rpb_decode_get_server_info(VALUE);
 VALUE rpb_decode_get(VALUE);
+VALUE rpb_decode_put(VALUE);
 VALUE rpb_decode_list_buckets(VALUE);
 VALUE rpb_decode_list_keys(VALUE, VALUE);
 VALUE rpb_decode_get_bucket(VALUE);
+VALUE rpb_decode_content(RpbContent *);
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.