Commits

Sean Cribbs committed db592f6

Add MapReduce to PBC.

  • Participants
  • Parent commits 754d797

Comments (0)

Files changed (3)

File riak-client/ext/mri/riakpb-ruby-decode.cc

   extern VALUE cTCPSocket;
   extern VALUE ivar_socket;
   extern VALUE eFailedRequest;
+  extern VALUE mJSON;
 }
 
 VALUE rpb_decode_response(VALUE self){
     case GetBucketResp:
       return rpb_decode_get_bucket(str);
       break;
+    case MapRedResp:
+      return rpb_decode_mapred(str, socket);
+      break;
     default:
       break;
     }
   return list;
 }
 
-
 VALUE rpb_decode_get_bucket(VALUE string){
   RpbGetBucketResp res;
   VALUE hash;
   return hash;
 }
 
+VALUE rpb_decode_mapred(VALUE string, VALUE socket){
+  RpbMapRedResp res = RpbMapRedResp();
+  bool done = 0;
+  VALUE list = rb_ary_new(), phase, prolog, phase_i, data;
+  int i;
+  uint32_t msglen;
+  uint8_t msgcode;
+  while(!done){
+    DecodeProtobuff(res, string);
+    done = res.has_done() && res.done();
+    phase_i = res.has_phase() ? UINT2NUM(res.phase()) : Qnil;
+    data = res.has_response() ? rb_funcall(mJSON, rb_intern("decode"), 1, rb_str_new2(res.response().c_str())) : Qnil;
+    if(!done) {
+      if(rb_block_given_p()){
+        // yield phase counter and phase data
+        if(!NIL_P(data))
+          rb_yield_values(2, phase_i, data);
+      } else {
+        // add to a specific phase accumulator
+        phase = (NIL_P(phase_i)) ? list : rb_ary_entry(list, phase_i);
+        if(NIL_P(phase)){
+          phase = rb_ary_new();
+          rb_ary_store(list, NUM2LONG(phase_i), phase);
+        }
+        if(TYPE(data) == T_ARRAY)
+          rb_ary_concat(phase, data);
+        else // Based on riak_kv_pb_socket, probably won't happen
+          rb_ary_push(phase, data);
+      }
+      res.Clear(); // reuse the pbuf
+      prolog = ReadSocket(socket, 5);
+      msglen = ntohl(((uint32_t*)RSTRING_PTR(prolog))[0]) - 1;
+      msgcode = (uint8_t)(RSTRING_PTR(prolog)[4]);
+      string = ReadSocket(socket, msglen);
+      if(msgcode == ErrorResp)
+        rpb_decode_error(string);
+      else if(msgcode != MapRedResp) // TODO: throw an exception, don't exit
+        rb_fatal("Unexpected response code from mapred operation: %d", msgcode);
+    }
+  }
+  return list;
+}
+
 VALUE rpb_decode_content(RpbContent *c){
   VALUE contents = rb_hash_new(), links, link, usermeta;
   int j,k;

File riak-client/ext/mri/riakpb-ruby.cc

 // We're using google's protobufs, so we have to compile in C++. MRI
 // is in C, so we need C calling conventions in our extension
 // functions for it all to hook up.
-
-
 extern "C" {
 #include "riakpb-ruby.h"
   VALUE cTCPSocket;
   VALUE ivar_socket;
   VALUE eFailedRequest;
+  VALUE mJSON;
 
   VALUE rpb_ping(VALUE self){
     WriteProtobuff(SOCKET, PingReq, NULL);
     return rpb_decode_response(self);
   }
 
+  VALUE rpb_mapred(VALUE self, VALUE json){
+    RpbMapRedReq req = RpbMapRedReq();
+    VALUE ctype = rb_str_new2("application/json");
+    req.set_content_type((void*)RSTRING_PTR(ctype), (size_t)RSTRING_LEN(ctype));
+    req.set_request((void*)RSTRING_PTR(json), (size_t)RSTRING_LEN(json));
+    WriteProtobuff(SOCKET, MapRedReq, &req);
+    return rpb_decode_response(self);
+  }
+
   // TODO: This might should be done in Ruby, not getting any benefit from C.
   VALUE rpb_init(VALUE self, VALUE client){
     VALUE socket, host, port;
     eFailedRequest = rb_const_get(cRiakClient, rb_intern("FailedRequest"));
     cRiakClient = rb_define_class_under(cRiakClient, "Client", rb_cObject);
     mProtobufs = rb_define_module_under(cRiakClient, "Protobufs");
+    mJSON = rb_const_get(rb_const_get(rb_cObject, rb_intern("ActiveSupport")), rb_intern("JSON"));
+
     rb_define_method(mProtobufs, "initialize", RUBY_METHOD_FUNC(rpb_init), 1);
     rb_define_method(mProtobufs, "ping", RUBY_METHOD_FUNC(rpb_ping), 0);
     rb_define_method(mProtobufs, "get_client_id", RUBY_METHOD_FUNC(rpb_get_client_id), 0);
     rb_define_method(mProtobufs, "list_keys", RUBY_METHOD_FUNC(rpb_list_keys), 1);
     rb_define_method(mProtobufs, "get_bucket", RUBY_METHOD_FUNC(rpb_get_bucket), 1);
     rb_define_method(mProtobufs, "set_bucket", RUBY_METHOD_FUNC(rpb_set_bucket), 2);
+    rb_define_method(mProtobufs, "mapred", RUBY_METHOD_FUNC(rpb_mapred), 1);
   }
 }

File riak-client/ext/mri/riakpb-ruby.h

 
 enum RpbMessageCodes {
-  ErrorResp,            //
-  PingReq,              //
-  PingResp,             //
-  GetClientIdReq,       //
-  GetClientIdResp,      //
-  SetClientIdReq,       //
-  SetClientIdResp,      //
-  GetServerInfoReq,     //
-  GetServerInfoResp,    //
+  ErrorResp,
+  PingReq,
+  PingResp,
+  GetClientIdReq,
+  GetClientIdResp,
+  SetClientIdReq,
+  SetClientIdResp,
+  GetServerInfoReq,
+  GetServerInfoResp,
   GetReq,
   GetResp,
   PutReq,
   PutResp,
-  DelReq,               //
-  DelResp,              //
-  ListBucketsReq,       //
-  ListBucketsResp,      //
-  ListKeysReq,          //
-  ListKeysResp,         //
-  GetBucketReq,         //
-  GetBucketResp,        //
-  SetBucketReq,         //
-  SetBucketResp,        //
+  DelReq,
+  DelResp,
+  ListBucketsReq,
+  ListBucketsResp,
+  ListKeysReq,
+  ListKeysResp,
+  GetBucketReq,
+  GetBucketResp,
+  SetBucketReq,
+  SetBucketResp,
   MapRedReq,
   MapRedResp
 };
   VALUE rpb_list_keys(VALUE, VALUE);
   VALUE rpb_get_bucket(VALUE, VALUE);
   VALUE rpb_set_bucket(VALUE, VALUE, VALUE);
+  VALUE rpb_mapred(VALUE, VALUE);
   VALUE rpb_init(VALUE, VALUE);
 }
 
 VALUE rpb_decode_list_buckets(VALUE);
 VALUE rpb_decode_list_keys(VALUE, VALUE);
 VALUE rpb_decode_get_bucket(VALUE);
+VALUE rpb_decode_mapred(VALUE, VALUE);
 VALUE rpb_decode_content(RpbContent *);