Source

gadgets / gadgets / tests / test_shift_register_server.py

import time, threading, random
from nose.tools import eq_
from gadgets import Gadgets, Addresses, Sockets, Broker, get_gadgets
from gadgets.devices.switch import shift_register_switch_factory
from gadgets.devices.switch.shift_register.server import ShiftRegisterServer

port = 0

def get_fake_gpio():
    return FakeGPIO()

class FakeSPI(object):

    def __init__(self, *args, **kw):
        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
        self.sockets = Sockets(self.addresses)

    def writebytes(self, value):
        self.sockets.send('fake spi', value)
        
class TestShiftRegisterServer(object):

    def setup(self):
        global port
        port = random.randint(5000, 50000)
        self.port = port
        self.channel = 2
        ShiftRegisterServer._SPI_Class = FakeSPI
        self.addresses = Addresses(in_port=port, out_port=port+1, req_port=port+2)
        self.sockets = Sockets(self.addresses, events=['fake spi'])
        self.server = ShiftRegisterServer(
            'null',
            'shift register server',
            self.addresses,
            {}
            )
        self.server.devices = 1

    def teardown(self):
        if self.server.is_alive():
            self.sockets.send('shutdown')
            time.sleep(0.5)
            self.sockets.close()

    def test_create(self):
        pass

    def test_toggle_bit_on(self):
        eq_(self.server._state, 0)
        self.server._toggle_bit(True, 2)
        eq_(self.server._state, 2 ** 2)

    def test_toggle_bit_off(self):
        self.server._state = 0b1111111
        self.server._toggle_bit(False, 2)
        eq_(self.server._state, 0b1111011)

    def test_get_bytes(self):
        self.server._highest_channel = 2
        self.server._state = 0b11
        values = self.server._get_bytes()
        eq_(values, [0b11])

    def test_get_no_bytes(self):
        self.server._highest_channel = 2
        values = self.server._get_bytes()
        eq_(values, [0])

    def test_get_no_bytes_again(self):
        self.server._highest_channel = 7
        values = self.server._get_bytes()
        eq_(values, [0])

    def test_get_no_bytes_once_again(self):
        self.server._highest_channel = 4
        values = self.server._get_bytes()
        eq_(values, [0])

    def test_get_no_bytes_with_multple_devices(self):
        self.server._highest_channel = 15
        values = self.server._get_bytes()
        eq_(values, [0, 0])

    def test_get_no_bytes_with_multple_devices_again(self):
        self.server._highest_channel = 9
        values = self.server._get_bytes()
        eq_(values, [0, 0])

    def test_get_no_bytes_with_multple_devices_once_more(self):
        self.server._highest_channel = 13
        values = self.server._get_bytes()
        eq_(values, [0, 0])

    def test_get_more_bytes(self):
        self.server._highest_channel = 11
        self.server._state = 0b11100000011
        values = self.server._get_bytes()
        eq_(values, [0b111, 0b11])

    def test_run(self):
        b = Broker(self.addresses)
        b.start()
        time.sleep(0.2)
        self.server.start()
        time.sleep(0.3)
        self.sockets.send('shift register server', {'value': True, 'channel': 3})
        event, message = self.sockets.recv()
        eq_(event, 'fake spi')
        eq_(message, [1 << 3])
        self.sockets.send('shift register server', {'value': False, 'channel': 3})
        event, message = self.sockets.recv()
        eq_(event, 'fake spi')
        eq_(message, [0])

    def test_run_with_other_state(self):
        self.server._state = 0b10101
        b = Broker(self.addresses)
        b.start()
        time.sleep(0.2)
        self.server.start()
        time.sleep(0.3)
        self.sockets.send('shift register server', {'value': True, 'channel': 3})
        event, message = self.sockets.recv()
        eq_(event, 'fake spi')
        eq_(message, [0b11101])
        self.sockets.send('shift register server', {'value': False, 'channel': 3})
        event, message = self.sockets.recv()
        eq_(event, 'fake spi')
        eq_(message, [0b10101])