Commits

Kaspar Schiess committed e538642

Improves #recv blocking, Adds setsockopt

Comments (0)

Files changed (2)

examples/aborting.rb

+$:.unshift File.dirname(__FILE__) + "/../lib"
+require 'nanomsg'
+include NanoMsg
+
+class AbortBehaviourChecker
+  attr_reader :t
+  attr_reader :q, :p
+
+  def initialize
+    @q, @p = PairSocket.new, PairSocket.new
+    @adr = 'tcp://127.0.0.1:4000'
+
+    @q.bind(@adr)
+    @p.connect(@adr)
+
+    @q.setsockopt(NanoMsg::NN_SOL_SOCKET, NanoMsg::NN_RCVTIMEO, 5000)
+  end
+
+  def run_recv_thread
+    @t = Thread.start { 
+      begin 
+        @q.recv 
+      rescue NanoMsg::Errno::EAGAIN
+        puts "EAGAIN"
+        # Gets raised when the read operation times out after 5 seconds
+      end
+    }
+    @t.abort_on_exception = false
+  end
+
+  def kill
+    @t.kill
+  end
+  def join
+    s = Time.now
+    @t.join
+    Time.now - s
+  end
+
+  def send msg
+    @p.send msg
+  end
+  def recv 
+    @q.recv
+  end
+end 
+
+
+# ------------------------------------------------- NN_RCVTIMEO behaviour test
+
+abc = AbortBehaviourChecker.new
+s = Time.now
+
+abc.run_recv_thread
+
+# During these first 10 seconds, CPU usage needs to stay low, since we're 
+# just waiting for a message to come in. 
+loop do
+  p abc.t
+  sleep 1
+  break if Time.now - s > 7
+end
+
+# -------------------------------------------------------- kill behaviour test
+
+# Resets the EAGAIN behaviour
+abc.send 'test'
+p abc.recv
+
+abc.run_recv_thread
+abc.kill
+
+puts "Joining..."
+t = abc.join.round
+puts "Join took #{t} seconds."
   return psock->socket;
 }
 
+static int
+nn_get_rcv_fd(int sock)
+{
+  int fd; 
+  size_t size = sizeof(fd);
+  int err; 
+
+  err = nn_getsockopt(sock, NN_SOL_SOCKET, NN_RCVFD, &fd, &size);
+  if (err < 0)
+    return err; 
+
+  return fd; 
+}
+
 static VALUE
 sock_alloc(VALUE klass) 
 {
   char* buffer; 
   long len; 
   int abort; 
+  int fd; 
 };
 
 static VALUE
 }
 
 static VALUE
-sock_recv_no_gvl(void* data)
+sock_recv_blocking(void* data)
 {
   struct ioop *pio = data; 
 
-  // TODO This is buggy. I cannot make the difference between 
-  // 'socket gone away' (=EAGAIN) and 'socket busy' (=EAGAIN). So I err on the
-  // side of 'socket busy' and do not raise the error. As a consequence, we'll
-  // get stuck in an endless loop when the socket is just not answering. 
+  pio->return_code = nn_recv(pio->sock, &pio->buffer, NN_MSG, 0 /* flags */);
 
-  while (pio->nn_errno == EAGAIN && !pio->abort) {
-    pio->return_code = nn_recv(pio->sock, &pio->buffer, NN_MSG, NN_DONTWAIT /* flags */);
-
-    if (pio->return_code < 0)
-      pio->nn_errno = nn_errno(); 
-    else
-      break;
-  }
+  if (pio->return_code < 0)
+    pio->nn_errno = nn_errno(); 
 
   return Qnil; 
 }
 
-static void
-sock_recv_abort(void* data) 
-{
-  struct ioop *pio = data; 
-
-  pio->abort = Qtrue; 
-}
-
 static VALUE
 sock_recv(VALUE socket)
 {
   io.abort  = Qfalse; 
   io.nn_errno = EAGAIN;
 
-  rb_thread_blocking_region(sock_recv_no_gvl, &io, sock_recv_abort, &io); 
+  io.fd = nn_get_rcv_fd(io.sock);
+  if (io.fd < 0)
+    RAISE_SOCK_ERROR;
 
-  if (io.abort) 
-    return Qnil; 
+  rb_thread_blocking_region(sock_recv_blocking, &io, RUBY_UBF_IO, 0);
 
   if (io.return_code < 0)
     sock_raise_error(io.nn_errno);
   return Qnil;
 }
 
+static VALUE
+sock_setsockopt(VALUE self, VALUE level, VALUE option, VALUE val)
+{
+  int sock = sock_get(self);
+  int level_arg = FIX2INT(level); 
+  int option_arg = FIX2INT(option);
+  int err; 
+  int i; 
+  void* v; 
+  size_t vlen = 0;
+
+  switch (TYPE(val)) {
+    case T_FIXNUM:
+      i = FIX2INT(val);
+      goto numval;
+    case T_FALSE:
+      i = 0;
+      goto numval;
+    case T_TRUE:
+      i = 1;
+    numval: 
+      v = (void*)&i; vlen = sizeof(i);
+      break;
+    
+    default:
+      StringValue(val);
+      v = RSTRING_PTR(val);
+      vlen = RSTRING_LEN(val);
+      break;
+  }
+
+  err = nn_setsockopt(sock, level_arg, option_arg, v, vlen);
+  if (err < 0)
+    RAISE_SOCK_ERROR; 
+
+  return self; 
+}
+
 #define SOCK_INIT_FUNC(name, type) \
 static VALUE \
 name(int argc, VALUE *argv, VALUE self) \
   rb_define_method(cSocket, "send", sock_send, 1);
   rb_define_method(cSocket, "recv", sock_recv, 0);
   rb_define_method(cSocket, "close", sock_close, 0);
+  rb_define_method(cSocket, "setsockopt", sock_setsockopt, 3);
 
   rb_define_method(cPairSocket, "initialize", pair_sock_init, -1);