Commits

Tino de Bruijn committed e649e5c

Adapted to work with the correct message strings. Updated tests as well.

Comments (0)

Files changed (3)

pyfirmata/mockup.py

     >>> s.close()
     """
     def __init__(self, port, baudrate, timeout=0.02):
-        pass
+        self.port = port or 'somewhere'
         
     def read(self, count=1):
         if count > 1:
         return val
             
     def write(self, value):
-        self.append(value)
+        """
+        Appends items flat to the deque. So iterables will be unpacked.
+        """
+        if hasattr(value, '__iter__'):
+            self.extend(value)
+        else:
+            self.append(value)
             
     def close(self):
         self.clear()

pyfirmata/pyfirmata.py

 import serial
-import threading
-import util
+import inspect
 import time
 from boards import BOARDS
 
     Base class for any board
     """
     firmata_version = None
-    command_handlers = {}
+    _command_handlers = {}
+    _command = None
+    _stored_data = []
+    _parsing_sysex = False
     
     def __init__(self, port, type="arduino", baudrate=57600):
         self.sp = serial.Serial(port, baudrate)
         self.taken = { 'analog' : dict(map(lambda p: (p.pin_number, False), self.analog)),
                        'digital' : dict(map(lambda p: (p.pin_number, False), self.digital)) }
         # Setup default handlers for standard incoming commands
-        self.command_handlers[ANALOG_MESSAGE] = self._handle_analog_message
-        self.command_handlers[DIGITAL_MESSAGE] =  self._handle_digital_message
-        self.command_handlers[REPORT_VERSION] = self._handle_report_version
-        
+        self.add_cmd_handler(ANALOG_MESSAGE, self._handle_analog_message)
+        self.add_cmd_handler(DIGITAL_MESSAGE, self._handle_digital_message)
+        self.add_cmd_handler(REPORT_VERSION, self._handle_report_version)
+    
+    def add_cmd_handler(self, cmd, func):
+        """ 
+        Adds a command handler for a command.
+        """
+        len_args = len(inspect.getargspec(func)[0])
+        def add_meta(f):
+            def decorator(*args, **kwargs):
+                f(*args, **kwargs)
+            decorator.bytes_needed = len_args - 1 # exclude self
+            decorator.__name__ = f.__name__
+            return decorator
+        func = add_meta(func)
+        self._command_handlers[cmd] = func
         
     def get_pin(self, pin_def):
         """
         """
         if not byte:
             return
-        byte = ord(byte)
+        data = ord(byte)
         if self._parsing_sysex:
-            if byte == END_SYSEX:
+            if data == END_SYSEX:
                 self._parsing_sysex = False
                 self._process_sysex_message(self._stored_data)
                 self._stored_data = []
             else:
-                self._stored_data.append(byte)
+                self._stored_data.append(data)
         elif not self._command:
-            if byte not in self.command_handlers:
+            # Commands can have 'channel data' like a pin nummber appended. 
+            # This is for commands smaller than 0xF0
+            if data < 0xF0:
+                #Multibyte command
+                command = data & 0xF0
+                self._stored_data.append(data & 0x0F)
+            else:
+                command = data
+            if command not in self._command_handlers:
                 # We received a byte not denoting a command with handler 
                 # while we are not processing any commands data. Nothing we
-                # can do about it so discard and we'll see what comes next.
+                # can do about it so discard everything and we'll see what 
+                # comes next.
+                self._stored_data = []
                 return
-            self._command = byte
+            self._command = command
         else:
             # This is a data command either belonging to a sysex message, or
             # to a multibyte command. Append it to the data and see if we can
             # process the command. If _process_command returns False, it
             # needs more data.
-            self._stored_data.append(byte)
-            try:
-                if self._process_command(self._command, self._stored_data):
-                    self._command = None
-                    self._stored_data = []
-            except ValueError:
+            self._stored_data.append(data)
+            if self._process_command(self._command, self._stored_data):
                 self._command = None
                 self._stored_data = []
     
-    def _process_command(command, data):
+    def _process_command(self, command, data):
         """
-        Tries to get a handler for this command from the self.cmds helper and will
-        return its return status.
+        Tries to get a handler for this command from the self.cmds helper.
+        Will return True if command is handled. This means either the handler
+        handled the data correctly, or it raised a ValueError for not getting
+        in the correct data. It will return False if there wasn't enough data
+        for the handler
         """
+        # TODO document that a handler should 
+        handler = self._command_handlers[command]
+        if len(data) < handler.bytes_needed:
+            return False
         try:
-             handle_cmd = self.command_handlers[command]
-             return handle_cmd(self, data)
-        except (KeyError, ValueError):
-            # something got corrupted
-            raise ValueError
+            handler(*data)
+        except ValueError:
+            return True
+        return True
             
     def _process_sysex_message(self, data):
         # TODO implement or make _process_command deal with it
-        pass
+        raise NotImplemented
             
     def get_firmata_version(self):
         """
         self.sp.close()
         
     # Command handlers
-    def _handle_analog_message(self, data):
-        if len(data) < 3:
-            return False
-        pin_number, lsb, msb = data
-        value = float(msb << 7 | lsb) / 1023
-        self.analog[pin_number].value = value
+    def _handle_analog_message(self, pin_nr, lsb, msb):
+        value = float((msb << 7) + lsb) / 1023
+        # Only set the value if we are actually reporting
+        if self.analog[pin_nr].reporting:
+            self.analog[pin_nr].value = value
         return True
 
-    def _handle_digital_message(self, data):
-        if len(data) < 3:
-            return False
-        pin_number, lsb, msb = data
-        value = msb << 7 | lsb
-        self.digital[pin_number].value = value
+    def _handle_digital_message(self, port_nr, lsb, msb):
+        """
+        Digital messages always go by the whole port. This means we have a
+        bitmask wich we update the port.
+        """
+        mask = (msb << 7) + lsb
+        self.digital_ports[port_nr]._update(mask)
         return True
 
-    def _handle_report_version(self, data):
-        if len(data) < 2:
-            return False
-        major, minor = data
+    def _handle_report_version(self, major, minor):
         self.firmata_version = (major, minor)
         return True
 
         self.reporting = False
         msg = chr(REPORT_DIGITAL + self.port_number + 0)
         self.board.sp.write(msg)
-        
-    def set_value(self, mask):
-        """Record the value of each of the input pins belonging to the port"""
-        
-        for pin in self.pins:
-            if pin.mode is INPUT:
-                pin.set_value((mask & (1 << pin.pin_number)) > 1)
                 
     def write(self):
         """Set the output pins of the port to the correct state"""
         msg += chr(mask % 128)
         msg += chr(mask >> 7)
         self.board.sp.write(msg)
+        
+    def _update(self, mask):
+        """
+        Update the values for the pins marked as input with the mask.
+        """
+        if self.reporting:
+            for pin in self.pins:
+                if pin.mode is INPUT:
+                    pin.value = (mask & (1 << pin.pin_number)) > 1
 
 class Pin(object):
     """ A Pin representation """
         
     def __str__(self):
         type = {ANALOG : 'Analog', DIGITAL : 'Digital'}[self.type]
-        if self.board_name:
-            return "%s pin %d on %s" % (type, self.pin_number, self.board_name)
         return "%s pin %d" % (type, self.pin_number)
 
     def _set_mode(self, mode):
         command += chr(self.pin_number)
         command += chr(mode)
         self.board.sp.write(command)
+        if mode == INPUT:
+            self.enable_reporting()
         
     def _get_mode(self):
         return self._mode
         """ Set an input pin to report values """
         if self.mode is not INPUT:
             raise IOError, "%s is not an input and can therefore not report" % self
-        self.reporting = True
-        msg = chr(REPORT_ANALOG + self.pin_number)
-        msg += chr(1)
-        self.board.sp.write(msg)
+        if self.type == ANALOG:
+            self.reporting = True
+            msg = chr(REPORT_ANALOG + self.pin_number)
+            msg += chr(1)
+            self.board.sp.write(msg)
+        else:
+            self.port.enable_reporting() # TODO This is not going to work for non-optimized boards like Mega
         
     def disable_reporting(self):
         """ Disable the reporting of an input pin """
 from pyfirmata.boards import BOARDS
 from pyfirmata.util import to_7_bits
 
-# This should be covered:
-#
-# This protocol uses the MIDI message format, but does not use the whole
-# protocol.  Most of the command mappings here will not be directly usable in
-# terms of MIDI controllers and synths.  It should co-exist with MIDI without
-# trouble and can be parsed by standard MIDI interpreters.  Just some of the
-# message data is used differently.
-# 
-# MIDI format: http://www.harmony-central.com/MIDI/Doc/table1.html
-# 
-#                              MIDI       
+# Messages todo left:
+
 # type                command  channel    first byte            second byte 
 # ---------------------------------------------------------------------------
-# analog I/O message    0xE0   pin #      LSB(bits 0-6)         MSB(bits 7-13)
-# digital I/O message   0x90   port       LSB(bits 0-6)         MSB(bits 7-13)
 # report analog pin     0xC0   pin #      disable/enable(0/1)   - n/a -
 # report digital port   0xD0   port       disable/enable(0/1)   - n/a -
 # 
 # system reset          0xFF
 #
 # SysEx-based commands (0x00-0x7F) are used for an extended command set.
-# 
-# type                command  first byte       second byte      ...
-# ----------------------------------------------------------------------------
-# string                0x71   char *string ...
-# firmware name/version 0x79   major version   minor version     char *name...
 
 
 class BoardBaseTest(unittest.TestCase):
     def setUp(self):
         # Test with the MockupSerial so no real connection is needed
         pyfirmata.pyfirmata.serial.Serial = mockup.MockupSerial
-        self.board = pyfirmata.Board('test')
-
+        self.board = pyfirmata.Board('')
+        self.board._stored_data = [] # FIXME How can it be that a fresh instance sometimes still contains data?
+        
+    def iterate(self, count):
+        for i in range(count):
+            self.board.iterate()
+            
 class TestBoardMessages(BoardBaseTest):
     # TODO Test layout of Board Mega
-    # TODO Test if messages written are correct...
 
     # First test the handlers
     def test_handle_analog_message(self):
+        self.board.analog[3].reporting = True
         self.assertEqual(self.board.analog[3].read(), None)
-        # Test it returns false with not enough params
-        self.assertFalse(self.board._handle_analog_message([3, 127]))
         # This sould set it correctly. 1023 (127, 7 in to 7 bit bytes) is the
         # max value an analog pin will send and it should result in a value 1
-        self.assertTrue(self.board._handle_analog_message([3, 127, 7]))
-        self.assertEqual(self.board.analog[3].read(), 1)
+        self.assertTrue(self.board._handle_analog_message(3, 127, 7))
+        self.assertEqual(self.board.analog[3].read(), 1.0)
         
     def test_handle_digital_message(self):
+        # A digital message sets the value for a whole port. We will set pin
+        # 5 (That is on port 0) to 1 to test if this is working.
+        self.board.digital_ports[0].reporting = True
+        self.board.digital[5]._mode = 0 # Set it to input
+        # Create the mask
+        mask = 0
+        mask |= 1 << 5 # set the bit for pin 5 to to 1
         self.assertEqual(self.board.digital[5].read(), None)
-        # Test it returns false with not enough params
-        self.assertFalse(self.board._handle_digital_message([5, 1]))
-        # This should set it correctly.
-        self.assertTrue(self.board._handle_digital_message([5, 1, 0]))
-        self.assertEqual(self.board.digital[5].read(), 1)
+        self.assertTrue(self.board._handle_digital_message(0, mask % 128, mask >> 7))
+        self.assertEqual(self.board.digital[5].read(), True)
         
-    def test_handle_firmata_version(self):
+    def test_handle_report_version(self):
         self.assertEqual(self.board.firmata_version, None)
-        self.assertFalse(self.board._handle_report_version([1]))
-        self.assertTrue(self.board._handle_report_version([2, 1]))
+        self.assertTrue(self.board._handle_report_version(2, 1))
         self.assertEqual(self.board.firmata_version, (2, 1))
         
-    # Now test the whole structure.
+    # type                command  channel    first byte            second byte 
+    # ---------------------------------------------------------------------------
+    # analog I/O message    0xE0   pin #      LSB(bits 0-6)         MSB(bits 7-13)
+    def test_incoming_analog_message(self):
+        self.assertEqual(self.board.analog[4].read(), None)
+        self.assertEqual(self.board.analog[4].reporting, False)
+        # Should do nothing as the pin isn't set to report
+        self.board.sp.write([chr(pyfirmata.ANALOG_MESSAGE + 4), chr(127), chr(7)])
+        self.iterate(3)
+        self.assertEqual(self.board.analog[4].read(), None)
+        self.board.analog[4].enable_reporting()
+        self.board.sp.clear()
+        # This should set analog port 4 to 1
+        self.board.sp.write([chr(pyfirmata.ANALOG_MESSAGE + 4), chr(127), chr(7)])
+        self.iterate(3)
+        self.assertEqual(self.board.analog[4].read(), 1.0)
+        self.board._stored_data = []
     
+    # type                command  channel    first byte            second byte 
+    # ---------------------------------------------------------------------------
+    # digital I/O message   0x90   port       LSB(bits 0-6)         MSB(bits 7-13)
+    def test_incoming_digital_message(self):
+        # A digital message sets the value for a whole port. We will set pin
+        # 2 (on port 0) to 1 to test if this is working.
+        self.board.digital[2].mode = pyfirmata.INPUT
+        self.board.sp.clear() # clear mode sent over the wire.
+        # Create the mask
+        mask = 0
+        mask |= 1 << 2 # set the bit for pin 2 to to 1
+        self.assertEqual(self.board.digital[2].read(), None)
+        self.board.sp.write([chr(pyfirmata.DIGITAL_MESSAGE + 0), chr(mask % 128), chr(mask >> 7)])
+        self.iterate(3)
+        self.assertEqual(self.board.digital[2].read(), True)
+        
+    # version report format
+    # -------------------------------------------------
+    # 0  version report header (0xF9) (MIDI Undefined)
+    # 1  major version (0-127)
+    # 2  minor version (0-127)
+    def test_incoming_report_version(self):
+        self.assertEqual(self.board.firmata_version, None)
+        self.board.sp.write([chr(pyfirmata.REPORT_VERSION), chr(2), chr(1)])
+        self.iterate(3)
+        self.assertEqual(self.board.firmata_version, (2, 1))
+        self.board._stored_data = []
         
 class TestBoardLayout(BoardBaseTest):