Commits

Tino de Bruijn committed 971a828

Added sysex handling, report_firmware, and tests for enabling/disabling reporting

  • Participants
  • Parent commits e649e5c

Comments (0)

Files changed (2)

pyfirmata/pyfirmata.py

 END_SYSEX = 0xF7            # end a MIDI SysEx msg
 REPORT_VERSION = 0xF9       # report firmware version
 SYSTEM_RESET = 0xFF         # reset from MIDI
+QUERY_FIRMWARE = 0x79       # query the firmware name
+
+# extended command set using sysex (0-127/0x00-0x7F)
+# 0x00-0x0F reserved for user-defined commands */
+SERVO_CONFIG = 0x70         # set max angle, minPulse, maxPulse, freq
+STRING_DATA = 0x71          # a string message with 14-bits per char
+SHIFT_DATA = 0x75           # a bitstream to/from a shift register
+I2C_REQUEST = 0x76          # send an I2C read/write request
+I2C_REPLY = 0x77            # a reply to an I2C read request
+I2C_CONFIG = 0x78           # config I2C settings such as delay times and power pins
+REPORT_FIRMWARE = 0x79      # report name and version of the firmware
+SAMPLING_INTERVAL = 0x7A    # set the poll rate of the main loop
+SYSEX_NON_REALTIME = 0x7E   # MIDI Reserved for non-realtime messages
+SYSEX_REALTIME = 0x7F       # MIDI Reserved for realtime messages
+
 
 # Pin modes.
 # except from UNAVAILABLE taken from Firmata.h
     Base class for any board
     """
     firmata_version = None
+    firmware = None
     _command_handlers = {}
     _command = None
     _stored_data = []
         self.add_cmd_handler(ANALOG_MESSAGE, self._handle_analog_message)
         self.add_cmd_handler(DIGITAL_MESSAGE, self._handle_digital_message)
         self.add_cmd_handler(REPORT_VERSION, self._handle_report_version)
+        self.add_cmd_handler(REPORT_FIRMWARE, self._handle_report_firmware)
     
     def add_cmd_handler(self, cmd, func):
         """ 
         if self._parsing_sysex:
             if data == END_SYSEX:
                 self._parsing_sysex = False
-                self._process_sysex_message(self._stored_data)
+                self._process_command(self._stored_data[0], self._stored_data[1:])
                 self._stored_data = []
             else:
                 self._stored_data.append(data)
         elif not self._command:
-            # Commands can have 'channel data' like a pin nummber appended. 
-            # This is for commands smaller than 0xF0
-            if data < 0xF0:
-                #Multibyte command
+            if data == START_SYSEX:
+                self._parsing_sysex = True
+                return
+            elif data < 0xF0:
+                # Commands can have 'channel data' like a pin nummber appended. 
+                # This is for commands smaller than 0xF0
                 command = data & 0xF0
                 self._stored_data.append(data & 0x0F)
             else:
             return True
         return True
             
-    def _process_sysex_message(self, data):
-        # TODO implement or make _process_command deal with it
-        raise NotImplemented
-            
     def get_firmata_version(self):
         """
         Returns a version tuple (major, mino) for the firmata firmware on the
     def _handle_report_version(self, major, minor):
         self.firmata_version = (major, minor)
         return True
+        
+    def _handle_report_firmware(self, *data):
+        major = data[0]
+        minor = data[1]
+        self.firmata_version = (major, minor)
+        self.firmware = ''.join([chr(x) for x in data[2:]]) # TODO this is more complicated, values is send as 7 bit bytes
+        return True
 
 class Port(object):
     """ An 8-bit port on the board """
     def enable_reporting(self):
         """ Enable reporting of values for the whole port """
         self.reporting = True
-        msg = chr(REPORT_DIGITAL + self.port_number + 1)
+        msg = chr(REPORT_DIGITAL + self.port_number)
+        msg += chr(1)
         self.board.sp.write(msg)
         
     def disable_reporting(self):
         """ Disable the reporting of the port """
         self.reporting = False
-        msg = chr(REPORT_DIGITAL + self.port_number + 0)
+        msg = chr(REPORT_DIGITAL + self.port_number)
+        msg += chr(0)
         self.board.sp.write(msg)
                 
     def write(self):
         
     def disable_reporting(self):
         """ Disable the reporting of an input pin """
-        self.reporting = False
-        msg = chr(REPORT_ANALOG + self.pin_number)
-        msg += chr(0)
-        self.board.sp.write(msg)
+        if self.type == ANALOG:
+            self.reporting = False
+            msg = chr(REPORT_ANALOG + self.pin_number)
+            msg += chr(0)
+            self.board.sp.write(msg)
+        else:
+            self.port.disable_reporting() # TODO This is not going to work for non-optimized boards like Mega
     
     def read(self):
         """
 
 # type                command  channel    first byte            second byte 
 # ---------------------------------------------------------------------------
-# report analog pin     0xC0   pin #      disable/enable(0/1)   - n/a -
-# report digital port   0xD0   port       disable/enable(0/1)   - n/a -
-# 
 # sysex start           0xF0   
 # set pin mode(I/O)     0xF4              pin # (0-127)         pin state(0=in)
 # sysex end             0xF7   
             
 class TestBoardMessages(BoardBaseTest):
     # TODO Test layout of Board Mega
+    def assert_serial(self, *list_of_chrs):
+        res = self.board.sp.read()
+        serial_msg = res
+        while res:
+            res = self.board.sp.read()
+            serial_msg += res
+        self.assertEqual(''.join(list_of_chrs), serial_msg)
 
     # First test the handlers
     def test_handle_analog_message(self):
         self.assertTrue(self.board._handle_report_version(2, 1))
         self.assertEqual(self.board.firmata_version, (2, 1))
         
+    def test_handle_report_firmware(self):
+        self.assertEqual(self.board.firmware, None)
+        data = [2, 1] + [ord(x) for x in 'Firmware_name']
+        self.assertTrue(self.board._handle_report_firmware(*data))
+        self.assertEqual(self.board.firmware, 'Firmware_name')
+        self.assertEqual(self.board.firmata_version, (2, 1))
+        
     # type                command  channel    first byte            second byte 
     # ---------------------------------------------------------------------------
     # analog I/O message    0xE0   pin #      LSB(bits 0-6)         MSB(bits 7-13)
         self.board.sp.write([chr(pyfirmata.REPORT_VERSION), chr(2), chr(1)])
         self.iterate(3)
         self.assertEqual(self.board.firmata_version, (2, 1))
-        self.board._stored_data = []
+    
+    # Receive Firmware Name and Version (after query)
+    # 0  START_SYSEX (0xF0)
+    # 1  queryFirmware (0x79)
+    # 2  major version (0-127)
+    # 3  minor version (0-127)
+    # 4  first 7-bits of firmware name
+    # 5  second 7-bits of firmware name
+    # x  ...for as many bytes as it needs)
+    # 6  END_SYSEX (0xF7)
+    def test_incoming_report_firmware(self):
+        self.assertEqual(self.board.firmware, None)
+        self.assertEqual(self.board.firmata_version, None)
+        msg = [chr(pyfirmata.START_SYSEX), 
+               chr(pyfirmata.REPORT_FIRMWARE), 
+               chr(2), 
+               chr(1)] + [i for i in 'Firmware_name'] + \
+              [chr(pyfirmata.END_SYSEX)]
+        self.board.sp.write(msg)
+        self.iterate(25)
+        self.assertEqual(self.board.firmware, 'Firmware_name')
+        self.assertEqual(self.board.firmata_version, (2, 1))
+        
+    # type                command  channel    first byte            second byte 
+    # ---------------------------------------------------------------------------
+    # report analog pin     0xC0   pin #      disable/enable(0/1)   - n/a -
+    def test_report_analog(self):
+        self.board.analog[1].enable_reporting()
+        self.assert_serial(chr(0xC0 + 1), chr(1))
+        self.assertTrue(self.board.analog[1].reporting)
+        self.board.analog[1].disable_reporting()
+        self.assert_serial(chr(0xC0 + 1), chr(0))
+        self.assertFalse(self.board.analog[1].reporting)
+        
+    # type                command  channel    first byte            second byte 
+    # ---------------------------------------------------------------------------
+    # report digital port   0xD0   port       disable/enable(0/1)   - n/a -
+    def test_report_digital(self):
+        # This should enable reporting of whole port 1
+        self.board.digital[8]._mode = pyfirmata.INPUT # Outputs can't report
+        self.board.digital[8].enable_reporting()
+        self.assert_serial(chr(0xD0 + 1), chr(1))
+        self.assertTrue(self.board.digital_ports[1].reporting)
+        self.board.digital[8].disable_reporting()
+        self.assert_serial(chr(0xD0 + 1), chr(0))
+        
+    # Generic Sysex Message
+    # 0     START_SYSEX (0xF0)
+    # 1     sysex command (0x00-0x7F)
+    # x     between 0 and MAX_DATA_BYTES 7-bit bytes of arbitrary data
+    # last  END_SYSEX (0xF7)
+    def test_sysex_message(self):
+        # 0x7 is queryFirmware, but that doesn't matter for now
+        self.board.send_sysex(0x7, [1, 2, 3])
+        sysex = (chr(0xF0), chr(0x7), chr(1), chr(2), chr(3), chr(0xF7))
+        self.assert_serial(*sysex)
+        
         
 class TestBoardLayout(BoardBaseTest):