Commits

Kaspar Schiess committed 85748f4

Sample for both normal and loopback device

  • Participants
  • Parent commits cbfa1bc

Comments (0)

Files changed (5)

   * Adds SURVEYOR/RESPONDENT socket types. (SurveySocket, RespondSocket)
   * Adds PUSH/PULL socket types. (PushSocket, PullSocket)
   * Adds BUS socket type. (BusSocket)
+  * There's an experimental API for devices (.run_loopback, .run_device). 
+    Devices block a Ruby thread until .terminate is called.
   * Exceptions raised are now subclasses of NanoMsg::Errno
   * Added NanoMsg.terminate, calls nn_term. 
 

File examples/bus_device.rb

 
 a = 'tcp://127.0.0.1:5432'
 b = 'tcp://127.0.0.1:5433'
+c = 'tcp://127.0.0.1:5434'
 
-one, two, three = BusSocket.new, BusSocket.new, 
-  BusSocket.new(AF_SP_RAW)
+one, two, three = 3.times.map { BusSocket.new }
+four, five, six = 3.times.map { BusSocket.new(AF_SP_RAW) }
 
 one.bind a
 two.bind b
+three.bind c
 
-three.connect a
-three.connect b
+# one -> four -> four -> two
+four.connect a
+four.connect b
 Thread.start do
-  Device.new(a, b)
+  NanoMsg.run_loopback(four)
 end.abort_on_exception = true
 
-sleep 0.1
+# one -> five -> six -> three
+five.connect a
+six.connect c
+Thread.start do
+  NanoMsg.run_device(five, six)
+end.abort_on_exception = true
+
+sleep 0.2
 one.send 'A'
+
 # Gets sent to three, which forwards it to two. 
-puts two.recv
+puts two.recv
+
+# Gets sent to five, which forwards it through six to three.
+puts three.recv
+
+NanoMsg.terminate
   return Qnil; 
 }
 
+struct device_op {
+  int sa, sb; 
+  int err; 
+};
+
+static VALUE
+nanomsg_run_device_no_gvl(void* data)
+{
+  struct device_op *pop = (struct device_op*) data;
+
+  pop->err = nn_device(pop->sa, pop->sb);
+
+  return Qnil; 
+}
+
+static VALUE
+nanomsg_run_device(VALUE self, VALUE a, VALUE b)
+{
+  struct device_op dop; 
+
+  dop.sa = sock_get(a);
+  dop.sb = sock_get(b);
+
+  rb_thread_blocking_region(nanomsg_run_device_no_gvl, &dop, NULL, NULL);
+  if (dop.err < 0)
+    RAISE_SOCK_ERROR;
+
+  return Qnil; 
+}
+
+static VALUE
+nanomsg_run_loopback(VALUE self, VALUE a)
+{
+  struct device_op dop; 
+
+  dop.sa = sock_get(a);
+  dop.sb = -1;          // invalid socket, see documentation
+
+  rb_thread_blocking_region(nanomsg_run_device_no_gvl, &dop, NULL, NULL);
+  if (dop.err < 0)
+    RAISE_SOCK_ERROR;
+
+  return Qnil; 
+}
+
 void
 Init_nanomsg(void)
 {
   ceSocketError = rb_define_class_under(mNanoMsg, "SocketError", rb_eIOError);
 
   rb_define_singleton_method(mNanoMsg, "terminate", nanomsg_terminate, 0);
+  rb_define_singleton_method(mNanoMsg, "run_device", nanomsg_run_device, 2);
+  rb_define_singleton_method(mNanoMsg, "run_loopback", nanomsg_run_loopback, 1);
 
   rb_define_alloc_func(cSocket, sock_alloc);
   rb_define_method(cSocket, "bind", sock_bind, 1);

File spec/lib/nanomsg/device_spec.rb

 require 'spec_helper'
 
+require 'timeout'
+
 describe 'Devices' do
   describe "connecting two BUSes" do
-    let(:a) { NanoMsg::BusSocket.new(NanoMsg::AF_SP_RAW).bind('inproc://a') }
-    let(:b) { NanoMsg::BusSocket.new(NanoMsg::AF_SP_RAW).bind('inproc://b') }
-    let(:c) { NanoMsg::BusSocket.new.connect('inproc://b') }
+    let(:a) { NanoMsg::BusSocket.new(NanoMsg::AF_SP_RAW) }
+    let(:b) { NanoMsg::BusSocket.new(NanoMsg::AF_SP_RAW) }
+    let(:c) { NanoMsg::BusSocket.new }
+
+    before(:each) do
+      a
+      b.bind('inproc://b')
+      c.connect('inproc://b')
+    end
 
     after(:each) do
       [a, b, c].each(&:close)
         NanoMsg.run_device(a, b)
       end.abort_on_exception = true
     }
-    after(:each) do
-      thread.kill
-    end
-
+    
     it "forwards messages" do
+      sleep 0.01
       c.send 'test'
       timeout(1) do
         a.recv.should == 'test'

File spec/spec_helper.rb

 
 
 $:.unshift File.dirname(__FILE__) + "/../ext"
-require 'nanomsg'
+require 'nanomsg'
+
+RSpec.configure do |rspec|
+  rspec.after(:all) {
+    NanoMsg.terminate
+  }
+end