Commits

Craig Swank committed a85ee22

added shift register switch tests

Comments (0)

Files changed (11)

gadgets/__init__.py

 
 from gadgets.devices.cooler.cooler_factory import cooler_factory
 from gadgets.devices.heater.electric_heater_factory import electric_heater_factory
-from gadgets.devices.switch.switch_factory import switch_factory, shift_register_switch_factory
+from gadgets.devices.switch import switch_factory, shift_register_switch_factory
 from gadgets.devices.motor.motor_factory import motor_factory
 from gadgets.devices.valve.valve_factory import ValveFactory
 

gadgets/devices/switch/__init__.py

-
-    
+from gadgets.devices.switch.shift_register_switch import shift_register_switch_factory
+from gadgets.devices.switch.switch_factory import switch_factory

gadgets/devices/switch/shift_register_server.py

-from gadgets import Gadget
-from gadgets.io import GPIO
-try:
-    from spi import SPI
-except ImportError:
-    print 'no spi'
-    
-
-class ShiftRegisterServer(Gadget):
-
-    def __init__(self, location, name, addresses, pins):
-        self._pins
-        self._serial = None
-        self._state = 0
-        self._spi = None
-        self._operators = {
-            True: operator.or_
-            False: operator.and_
-            }
-        super(ShiftRegisterServer, self).__init__(location, name, addresses)
-
-    @property
-    def events(self):
-        return ['shift register server']
-
-    def event_received(self, event, message):
-        operator = self._operators[message['value']]
-        self._state = operator(self._state, 1 << message['pin'])
-        self.spi.writebytes([self._state])
-        
-    @property
-    def spi(self):
-        if self._spi is None:
-            self._spi = SPI(2, 0)

gadgets/devices/switch/shift_register_switch.py

-from gadgets.devices.switch.triggers.gpio_timer import GPIOTimer
-from gadgets.devices.device import Device
-
-
-class ShiftRegisterIO(object):
-
-    def __init__(self, addresses, pin):
-        self._sockets = Sockets(addresses)
-        self._pin = pin
-
-    def on(self):
-        self._send(True)
-
-    def off(self):
-        self._send(False)
-
-    def _send(self, value):
-        self._sockets.send('shift register server', {'value': value, 'pin': self._pin})
-
-class ShiftRegisterIOFactory(object):
-    
-    def __init__(self, addresses, pin):
-        self._addresses = addresses
-        self._pin = pin
-    
-    def __call__(self):
-        return ShiftRegisterIO(self._addresses, self._pin)
-
-        
-class ShiftRegisterSwitch(Device):
-    """
-    Controls multiple TPIC6595 8 bit shift registers.  The use of
-    this chip replaces the use of the gpio pins on the board to
-    control physical devices.  It can power relatively high-voltage
-    and high-current devices such as solenoids without having to
-    use additional transistors or mosfets.
-    
-    """
-
-    _trigger_factory = GPIOTimer
-
-    _units = ['minutes', 'minute', 'seconds', 'second', 'hours', 'hour']
-
-    def _get_trigger(self, message):
-        return self._trigger_factory(self._location, self._on_event, self._off_event, message, self._addresses, target=self.off)
-
-        

gadgets/devices/switch/shift_register_switch/__init__.py

+from gadgets.devices.switch.shift_register_switch.shift_register_switch import ShiftRegisterSwitch, ShiftRegisterIOFactory
+from gadgets.devices.switch.shift_register_switch.shift_register_server import ShiftRegisterServer
+
+
+def shift_register_switch_factory(location, name, arguments, addresses, io_factory=None):
+    if shift_register_switch_factory.server is None:
+        shift_register_switch_factory.server = ShiftRegisterServer('', 'shift register server', addresses)
+        shift_register_switch_factory.server.start()
+    return ShiftRegisterSwitch(
+        location=location,
+        name=name,
+        addresses=addresses,
+        io_factory=ShiftRegisterIOFactory(addresses, arguments['channel']),
+        on=arguments.get('on'),
+        off=arguments.get('off'),
+    )
+
+shift_register_switch_factory.server = None
+
+    

gadgets/devices/switch/shift_register_switch/shift_register_server.py

+import operator
+from gadgets import Gadget
+from gadgets.io import GPIO
+try:
+    from spi import SPI
+except ImportError:
+    SPI = None
+    
+
+class ShiftRegisterServer(Gadget):
+
+    _spi_class = SPI
+
+    def __init__(self, location, name, addresses):
+        self._state = 0
+        self._spi = None
+        super(ShiftRegisterServer, self).__init__(location, name, addresses)
+
+    @property
+    def events(self):
+        return ['shift register server']
+
+    def event_received(self, event, message):
+        channel = message['channel']
+        if message['value']:
+            self._state |= 1 << channel
+        else:
+            self._state &= ~(1 << channel)
+        self.spi.writebytes([self._state])
+        
+    @property
+    def spi(self):
+        if self._spi is None:
+            self._spi = self._spi_class(2, 0)
+        return self._spi

gadgets/devices/switch/shift_register_switch/shift_register_switch.py

+from gadgets.devices.switch.triggers.gpio_timer import GPIOTimer
+from gadgets.devices.device import Device
+from gadgets import Sockets
+
+class ShiftRegisterIO(object):
+
+    def __init__(self, addresses, channel):
+        self._sockets = Sockets(addresses)
+        self._channel = channel
+        self.status = False
+        
+    def on(self):
+        self._send(True)
+
+    def off(self):
+        self._send(False)
+
+    def _send(self, value):
+        
+        self._sockets.send(
+            'shift register server',
+            {'value': value, 'channel': self._channel}
+        )
+        self.status = value
+
+class ShiftRegisterIOFactory(object):
+    
+    def __init__(self, addresses, channel):
+        self._addresses = addresses
+        self._channel = channel
+    
+    def __call__(self):
+        return ShiftRegisterIO(self._addresses, self._channel)
+
+        
+class ShiftRegisterSwitch(Device):
+    """
+    Controls multiple TPIC6595 8 bit shift registers.  The use of
+    this chip replaces the use of the gpio pins on the board to
+    control physical devices.  It can power relatively high-voltage
+    and high-current devices such as solenoids without having to
+    use additional transistors or mosfets.
+    
+    """
+
+    _trigger_factory = GPIOTimer
+
+    _units = ['minutes', 'minute', 'seconds', 'second', 'hours', 'hour']
+
+    def _get_trigger(self, message):
+        return self._trigger_factory(
+            self._location,
+            self._on_event,
+            self._off_event,
+            message,
+            self._addresses,
+            target=self.off
+        )
+
+        

gadgets/devices/switch/switch_factory.py

 from gadgets.io import GPIOFactory
 from gadgets.devices.switch.switch import Switch
-from gadgets.devices.switch.shift_register_switch import ShiftRegisterSwitch, ShiftRegisterIOFactory
-from gadgets.devices.switch.shift_register_server import ShiftRegisterServer
+
 
 def switch_factory(location, name, arguments, addresses, io_factory=None):
     if io_factory is None:
         off=arguments.get('off')
     )
 
+    
 
-def shift_register_switch_factory(location, name, arguments, addresses, io_factory=None):
-    if shift_register_switch_factory.server is None:
-        shift_register_switch_factory.server = ShiftRegisterServer('', 'shift register server', addresses)
-        shift_register_switch_factory.server.start()
-    return ShiftRegisterSwitch(
-        location=location,
-        name=name,
-        addresses=addresses,
-        io_factory=ShiftRegisterIOFactory(addresses, arguments['pin']),
-        on=arguments.get('on'),
-        off=arguments.get('off'),
-    )
-
-shift_register_switch_factory.server = None

gadgets/devices/switch/triggers/gpio_timer.py

         return seconds
 
     def wait(self):
-        print 'waiting'
         time.sleep(self._seconds)
-        print 'done waiting'
 
     def invalidate(self):
-        "can't invalidate"
+        """can't invalidate this trigger"""
         pass

gadgets/tests/test_shift_register_switch.py

 import time, threading, random
 from nose.tools import eq_
 from gadgets import Gadgets, Addresses, Sockets, Broker
-from gadgets.devices.switch.switch_factory import shift_register_switch_factory
+from gadgets.devices.switch import shift_register_switch_factory
+from gadgets.devices.switch.shift_register_switch.shift_register_server import ShiftRegisterServer
 
 path = '/private/tmp/switch'
 
+port = 0
 
-class FakeGPIO(object):
-
-    def __init__(self):
-        self.status = False
 
+def get_fake_gpio():
+    return FakeGPIO()
 
-    def on(self):
-        with open(path, 'w') as f:
-            f.write('on')
-        self.status = True
+class FakeSPI(object):
 
-    def off(self):
-        with open(path, 'w') as f:
-            f.write('off')
-        self.status = False
+    def __init__(self, *args, **kw):
+        addresses = Addresses(in_port=port, out_port=port + 1)
+        self.sockets = Sockets(addresses)
 
-def get_fake_gpio():
-    return FakeGPIO()
+    def writebytes(self, value):
+        self.sockets.send('fake spi', value)
 
+        
 class TestShiftRegisterSwitch(object):
 
     def setup(self):
+        global port
         port = random.randint(5000, 50000)
+        ShiftRegisterServer._spi_class = FakeSPI
         self.addresses = Addresses(in_port=port, out_port=port + 1)
-        self.sockets = Sockets(self.addresses, events=['update'])
+        self.sockets = Sockets(self.addresses, events=['fake spi'])
+        arguments = {
+            'channel': 2
+            }
         self.switch = shift_register_switch_factory(
             'living room',
             'light',
+            arguments,
             self.addresses,
         )
         self.gadgets = Gadgets([self.switch], self.addresses)
-        
+
+    def teardown(self):
+        self.sockets.send('shutdown')
+        time.sleep(0.5)
+        shift_register_switch_factory.server = None
+
     def test_create(self):
-        pass
+        t = threading.Thread(target=self.gadgets.start)
+        t.start()
+        time.sleep(0.5)
 
     def test_on_and_off(self):
         t = threading.Thread(target=self.gadgets.start)
         t.start()
         time.sleep(1)
         self.sockets.send('turn on living room light')
-        print self.sockets.recv()
-        with open(path, 'r') as f:
-            eq_(f.read(), 'on')
+        event, message = self.sockets.recv()
+        eq_(message, [2**2])
         self.sockets.send('turn off living room light')
-        print self.sockets.recv()
-        time.sleep(0.2)
-        with open(path, 'r') as f:
-            eq_(f.read(), 'off')
-        self.sockets.send('shutdown')
-
-    def test_timed_on_and_off(self):
-        t = threading.Thread(target=self.gadgets.start)
-        t.start()
-        time.sleep(1)
-        self.sockets.send('turn on living room light', {'units': 'seconds', 'value': 1})
-        print self.sockets.recv()
-        with open(path, 'r') as f:
-            eq_(f.read(), 'on')
-        print self.sockets.recv()
-        with open(path, 'r') as f:
-            eq_(f.read(), 'off')
-        self.sockets.send('shutdown')
+        event, message = self.sockets.recv()
+        eq_(message, [0])
+        
+    
 
-    def test_on_and_off_with_time_and_rcl(self):
+    def test_on_and_off_with_time(self):
         t = threading.Thread(target=self.gadgets.start)
         t.start()
         time.sleep(1)
         self.sockets.send('turn on living room light for 0.5 seconds')
-        print self.sockets.recv()
-        with open(path, 'r') as f:
-            eq_(f.read(), 'on')
-        print self.sockets.recv()
-        with open(path, 'r') as f:
-            eq_(f.read(), 'off')
-        self.sockets.send('shutdown')
+        time.sleep(1)
+        event, message = self.sockets.recv()
+        eq_(message, [2**2])
+        event, message = self.sockets.recv()
+        eq_(message, [0])
+
+    
+
+    
 
-    def test_switch_factory(self):
-        switch = switch_factory('basement', 'fan', {'pin': None}, self.addresses)
-        assert isinstance(switch, Switch)
-        
 
       install_requires=[
           # -*- Extra requirements: -*-
           'pyzmq',
-          'mmap',
       ],
       entry_points="""
       # -*- Entry points: -*-
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.