Commits

Craig Swank  committed 7e4006a

made request socket respond to status and events

  • Participants
  • Parent commits a2fe497

Comments (0)

Files changed (20)

File gadgets/coordinator.py

     """
 
     def __init__(self, addresses, name):
-        addresses.bind_to_request = True
         self._addresses = addresses
         self._name = name
         self._ids = []
     @property
     def sockets(self):
         if self._sockets is None:
-            self._sockets = Sockets(self._addresses, events=self._events)
+            self._sockets = Sockets(self._addresses, events=self._events, bind_to_request=True)
         return self._sockets
 
     def _recv(self):
         event, message, socket = self.sockets.recv_all()
-        print socket
+        return_value = False
         if socket == 'subscriber':
-            return_value = False
             if event in self._event_handlers:
                 f = self._event_handlers[event]
                 return_value = f(message)
-            return return_value
         elif socket == 'request': #as of now, request is only for getting the status of the system
-            self.sockets.respond('status', self._status)
+            self._handle_request(event, message)
+            
+        return return_value
+
+    def _handle_request(self, event, message):
+        if event == 'status':
+            response = self._state
+        elif event == 'events':
+            response = self._external_events
+        else:
+            response = {'error': 'you can only request "events", or "status"'}
+        self.sockets.respond(event, response)
 
     def _run_method(self, message):
         """

File gadgets/sockets.py

     as one system.
     """
 
-    def __init__(self, host='localhost', in_port=6111, out_port=6112, req_port=6113, bind_to_request=False):
+    def __init__(self, host='localhost', in_port=6111, out_port=6112, req_port=6113):
         """
         host: the host name of the master gadgets instance
               (the one that is running with localhost as the
         out_port: the port number that Socket uses for its
                   publisher socket
         """
-        self.bind_to_request = bind_to_request
         self.out_address = 'tcp://{0}:{1}'.format(host, out_port)
         self.in_bind_address = 'tcp://*:{0}'.format(out_port)
         self.in_address = 'tcp://{0}:{1}'.format(host, in_port)
     what events will be received.
     """
     
-    def __init__(self, addresses=None, events=[]):
+    def __init__(self, addresses=None, events=[], bind_to_request=False):
         """
         addresses: an instance of gadgets.address:Address
         events: a list of events to subscribe to.  Only 
                       you will receive messages like
                       'update temperature'
         """
+        self.bind_to_request = bind_to_request
         self._poller = None
         self.context = zmq.Context.instance()
         if addresses is None:
             self.subscriber.setsockopt(zmq.SUBSCRIBE, event)
         self.publisher = self.context.socket(zmq.PUB)
         self.publisher.connect(addresses.out_address)
-        if addresses.bind_to_request:
-            self.request = self.context.socket(zmq.REQ)
-            self.request.bind(addresses.req_bind_address)
+        
+        if bind_to_request:
+            self.req = self.context.socket(zmq.REP)
+            self.req.bind(addresses.req_bind_address)
         else:
-            self.request = self.context.socket(zmq.REQ)
-            self.request.connect(addresses.req_address)
+            self.req = self.context.socket(zmq.REQ)
+            self.req.connect(addresses.req_address)
         time.sleep(0.2)
         
     def send(self, event, message={}):
         return event, json.loads(message)
     
     def request(self, event, message={}):
-        self.request.send_multipart([event, json.dumps(message, ensure_ascii=True)])
-        parts = self.request.recv_multipart()
+        self.req.send_multipart([event, json.dumps(message, ensure_ascii=True)])
+        parts = self.req.recv_multipart()
         if len(parts) != 2:
             raise Exception(str(parts))
         event, message = parts
         return event, json.loads(message)
 
     def respond(self, event, message={}):
-        self.request.send_multipart([event, json.dumps(message, ensure_ascii=True)])
+        self.req.send_multipart([event, json.dumps(message, ensure_ascii=True)])
 
     def recv_all(self):
         socks = dict(self.poller.poll())
         if self.subscriber in socks and socks[self.subscriber] == zmq.POLLIN:
             event, message = self.recv()
             return event, message, 'subscriber'
-        if self.request in socks and socks[self.request] == zmq.POLLIN:
-            parts = self.request.recv_multipart()
+        if self.req in socks and socks[self.req] == zmq.POLLIN:
+            parts = self.req.recv_multipart()
             if len(parts) != 2:
                 raise Exception(str(parts))
             event, message = parts
             return event, message, 'request'
+        return None, None, None
 
     @property
     def poller(self):
         if self._poller is None:
             poller = zmq.Poller()
-            poller.register(self.publisher, zmq.POLLIN)
-            poller.register(self.request, zmq.POLLIN)
+            poller.register(self.subscriber, zmq.POLLIN)
+            poller.register(self.req, zmq.POLLIN)
             self._poller = poller
         return self._poller
         
     def close(self):
         self.publisher.close()
         self.subscriber.close()
-        self.request.close()
+        self.req.close()
 

File gadgets/tests/test_cooler.py

 
 
 path = '/tmp/cooler'
-port = random.randint(3000, 50000)
+
+port = 0
 
 
 class FakeGPIO(object):
 
     def __init__(self):
-        self.addresses = Addresses(in_port=port, out_port=port+1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = Sockets(self.addresses)
         self.status = False
 
 class TestCooler(object):
 
     def setup(self):
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        global port
+        port = random.randint(3000, 50000)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = Sockets(self.addresses, events=['test update'])
         self.cooler = cooler_factory('tank', 'cooler', {}, self.addresses, io_factory=get_fake_gpio)
         self.gadgets = Gadgets([self.cooler], self.addresses)
 
     def teardown(self):
+        print 'sending shutdown'
         self.sockets.send('shutdown')
         time.sleep(0.2)
         self.sockets.close()

File gadgets/tests/test_coordinator.py

 
     def setup(self):
         port = random.randint(5000, 50000)
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.coordinator = Coordinator(self.addresses, 'testsys')
 
     def test_create(self):
         time.sleep(0.2)
         eq_(self.coordinator._ids, ['garage opener'])
         self.coordinator.sockets.send('shutdown')
+
+    def test_request_socket(self):
+        sockets = Sockets(self.addresses)
+        broker = Broker(self.addresses)
+        broker.start()
+        time.sleep(0.5)
+        self.coordinator.start()
+        time.sleep(1)
         
+        status = sockets.request('status')
+        eq_(status, ('status', {u'locations': {}, u'errors': [], u'method': {}, u'name': u'testsys'}))
+        sockets.send('shutdown')
+
+        while self.coordinator.is_alive():
+            time.sleep(0.1)
+        sockets.close()
+
+    def test_request_socket_events(self):
+        sockets = Sockets(self.addresses)
+        broker = Broker(self.addresses)
+        broker.start()
+        time.sleep(0.5)
+        self.coordinator.start()
+        time.sleep(1)
+        
+        status = sockets.request('events')
+        eq_(status, ('events', {}))
+        sockets.send('shutdown')
+
+        while self.coordinator.is_alive():
+            time.sleep(0.1)
+        sockets.close()
     

File gadgets/tests/test_device.py

 
     def setup(self):
         port = random.randint(5000, 50000)
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = Sockets(self.addresses, events=['update'])
         self.device = Device(
             'living room',

File gadgets/tests/test_float_trigger.py

 import time
+import random
 from gadgets.devices.valve.triggers import FloatTrigger
 from gadgets import Addresses
 
 class TestFloatTrigger(object):
 
     def setup(self):
-        self.addresses = Addresses()
+        port = random.randint(5000, 50000)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         FloatTrigger._poller_class = FakePoller
         self.trigger = FloatTrigger(
             'tank',

File gadgets/tests/test_gadgets.py

-import time, threading, random, uuid
+import time, threading, random, uuid, platform
 from nose.tools import eq_, raises
 from gadgets import Gadgets, Addresses, Sockets, Broker
 from gadgets.devices.device import Device
 
     def setup(self):
         port = random.randint(5000, 50000)
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.gadgets = Gadgets([], self.addresses)
 
     def test_create(self):
         event, message = sockets.recv()
         sockets.send('shutdown')
         eq_(event, uid + ' status')
-        eq_(message, {u'name': u'pjoe', u'errors': [], u'locations': {u'back yard': {u'sprinklers': {u'value': True}}}, u'method': {}})
+        name = platform.node()
+        eq_(message, {u'name': name, u'errors': [], u'locations': {u'back yard': {u'sprinklers': {u'value': True}}}, u'method': {}})
 

File gadgets/tests/test_gadgets_factory.py

-import threading, time
+import threading, time, random
 from gadgets import Addresses, GadgetsFactory, Sockets
 from gadgets.devices import Switch, Valve
 from gadgets.pins.beaglebone import pins
 class TestGadgetsFactory(object):
 
     def setup(self):
-        self.addresses = Addresses()
+        port = random.randint(5000, 50000)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.factory = GadgetsFactory(self.addresses)
 
     def test_create(self):
         assert isinstance(devices[0], Switch)
 
     def test_create_float_valve(self):
+        sockets = Sockets(self.addresses)
         args = {
             'locations': {
                 'fish tank': {
         t.start()
         time.sleep(1)
         eq_(dict(gadgets.coordinator._external_events), {u'fish tank': {u'valve': {'on': u'fill fish tank', 'off': u'stop filling fish tank'}}})
-        sockets = Sockets()
         sockets.send('shutdown')
         assert isinstance(devices[0], Valve)
+        while gadgets.coordinator.is_alive():
+            time.sleep(0.1)
         sockets.close()

File gadgets/tests/test_gravity_trigger.py

     def setup(self):
         self._off = False
         port = random.randint(5000, 50000)
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = Sockets(self.addresses)
         self.gadgets = Gadgets([], self.addresses)
         

File gadgets/tests/test_heater.py

 class FakeGPIO(object):
 
     def __init__(self):
-        self.addresses = Addresses(in_port=port, out_port=port+1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = Sockets(self.addresses)
         self.status = False
         self.closed = False
 class TestHeater(object):
 
     def setup(self):
-        self.addresses = Addresses(in_port=port, out_port=port+1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = Sockets(self.addresses, events=['test pwm off'])
         
         self.heater = ElectricHeater(

File gadgets/tests/test_input.py

 
     def setup(self):
         port = random.randint(5000, 50000)
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         pin = None
         self.sockets = Sockets(self.addresses, events=['update'])
         self.input = input_factory('left', 'input', {'pin':None}, self.addresses)

File gadgets/tests/test_input_adc.py

 
     def setup(self):
         port = random.randint(5000, 50000)
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         pin = None
         self.sockets = Sockets(self.addresses, events=['update'])
         self.input = input_factory('left', 'input', {'pin':None, 'input_type': 'adc'}, self.addresses)

File gadgets/tests/test_method.py

 
     def setup(self):
         port = random.randint(5000, 50000)
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = Sockets(self.addresses, events=['turn', 'heat', 'drain'])
         self.gadgets = Gadgets([], self.addresses)
 

File gadgets/tests/test_shift_register_server.py

 class FakeSPI(object):
 
     def __init__(self, *args, **kw):
-        addresses = Addresses(in_port=port, out_port=port + 1)
-        self.sockets = Sockets(addresses)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
+        self.sockets = Sockets(self.addresses)
 
     def writebytes(self, value):
         self.sockets.send('fake spi', value)
         self.port = port
         self.channel = 2
         ShiftRegisterServer._SPI_Class = FakeSPI
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = Sockets(self.addresses, events=['fake spi'])
         self.server = ShiftRegisterServer(
             'null',

File gadgets/tests/test_shift_register_switch.py

 class FakeSPI(object):
 
     def __init__(self, *args, **kw):
-        addresses = Addresses(in_port=port, out_port=port + 1)
-        self.sockets = Sockets(addresses)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
+        self.sockets = Sockets(self.addresses)
 
     def writebytes(self, value):
         self.sockets.send('fake spi', value)
         self.port = port
         self.channel = 2
         ShiftRegisterServer._SPI_Class = FakeSPI
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = Sockets(self.addresses, events=['fake spi'])
 
     def make_gadgets(self):

File gadgets/tests/test_sockets.py

 class TestSockets(object):
 
     def test_broker(self):
-        p = random.randint(3000, 50000)
-        addresses = Addresses(in_port=p, out_port=p+1)
+        port = random.randint(3000, 50000)
+        addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         broker = Broker(addresses)
         sockets = Sockets(addresses)
         broker.start()

File gadgets/tests/test_switch.py

 
     def setup(self):
         port = random.randint(5000, 50000)
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = Sockets(self.addresses, events=['update'])
         self.switch = Switch(
             'living room',

File gadgets/tests/test_temperature_trigger.py

 import time
+import random
 from nose.tools import eq_
 from gadgets.devices.heater.triggers.temperature import TemperatureTrigger
 from gadgets import Addresses, Sockets, Broker
 class TestTemperatureTrigger(object):
 
     def setup(self):
-        self.addresses = Addresses()
+        port = random.randint(5000, 50000)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         comparitor = lambda x, y: x >= y
         self.trigger = TemperatureTrigger(
             'tank',

File gadgets/tests/test_thermometer.py

-import time, threading, random, uuid, tempfile, os
+import time, threading, random, uuid, tempfile, os, platform
 from nose.tools import eq_
 from gadgets import Addresses, get_gadgets, Sockets
 from gadgets.sensors import Thermometer
 
     def setup(self):
         port = random.randint(5000, 50000)
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.sockets = None
         self.thermometer = Thermometer('living room', 'temperature', self.addresses, uid='x')
 
         self.sockets.send('status', {'id': 'test thermometer'})
         event, message = self.sockets.recv()
         eq_(event, 'test thermometer status')
+        name = platform.node()
         expected = {
+            u'name': name,
             u'errors': [],
-            u'name': u'pjoe',
+            u'name': name,
             u'sender': 'living room temperature',
             u'locations': {
                 u'living room': {

File gadgets/tests/test_valve.py

     def setup(self):
         self._off = False
         port = random.randint(5000, 50000)
-        self.addresses = Addresses(in_port=port, out_port=port + 1)
+        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
         self.uid = str(uuid.uuid1())
         self.sockets = Sockets(self.addresses, events=[self.uid])