Tino de Bruijn avatar Tino de Bruijn committed 9dae152

Finished implementering servo handling as defined in Firmata 2.2

Comments (0)

Files changed (2)

pyfirmata/pyfirmata.py

 import serial
 import inspect
 import time
-from util import two_byte_iter_to_str
+import itertools
+from util import two_byte_iter_to_str, to_two_bytes
 
 # Message command bytes - straight from Firmata.h
 DIGITAL_MESSAGE = 0x90      # send data for a digital pin
         Sends a SysEx msg.
         
         :arg sysex_cmd: A sysex command byte
-        :arg data: A list of 7-bit bytes of arbitrary data
+        :arg data: A list of 7-bit bytes of arbitrary data (bytes may be 
+            already converted to chr's)
         """
         self.sp.write(chr(START_SYSEX))
         self.sp.write(chr(sysex_cmd))
         for byte in data:
             try:
                 byte = chr(byte)
+            except TypeError:
+                pass # byte is already a chr
             except ValueError:
                 raise ValueError('Sysex data can be 7-bit bytes only. '
                     'Consider using utils.to_two_bytes for bigger bytes.')
         """
         return self.firmata_version
         
+    def servo_config(self, pin, min_pulse=544, max_pulse=2400, angle=0):
+        """
+        Configure a pin as servo with min_pulse, max_pulse and first angle.
+        ``min_pulse`` and ``max_pulse`` default to the arduino defaults.
+        """
+        if pin > len(self.digital) or self.digital[pin].mode == UNAVAILABLE:
+            raise IOError("Pin %s is not a valid servo pin")
+        data = itertools.chain([pin], to_two_bytes(min_pulse),
+                                        to_two_bytes(max_pulse))
+        self.send_sysex(SERVO_CONFIG, data)
+        
+        # set pin._mode to SERVO so that it sends analog messages
+        # don't set pin.mode as that calls this method
+        self.digital[pin]._mode = SERVO
+        self.digital[pin].write(angle)
+        
     def exit(self):
         """ Call this to exit cleanly. """
         # First detach all servo's, otherwise it somehow doesn't want to close...
         return "%s pin %d" % (type, self.pin_number)
 
     def _set_mode(self, mode):
-        """
-        Set the mode of operation for the pin
-        :arg mode: Can be one of the pin modes: INPUT, OUTPUT, ANALOG, PWM or
-            SERVO
-        
-        """
         if mode is UNAVAILABLE:
             self._mode = UNAVAILABLE
             return
+        if self._mode is UNAVAILABLE:
+            raise IOError("%s can not be used through Firmata" % self)
         if mode is PWM and not self.PWM_CAPABLE:
-            raise IOError, "%s does not have PWM capabilities" % self
-        if self._mode is UNAVAILABLE:
-            raise IOError, "%s can not be used through Firmata" % self
-        if mode == SERVO and self.type != DIGITAL:
-            raise IOError, "Only digital pins can dirve servos! %s is not \
-                digital" % self
+            raise IOError("%s does not have PWM capabilities" % self)
+        if mode == SERVO:
+            if self.type != DIGITAL:
+                raise IOError("Only digital pins can drive servos! %s is not"
+                    "digital" % self)
+            self._mode = SERVO
+            self.board.servo_config(self.pin_number)
+            return
+        
+        # Set mode with SET_PIN_MODE message
         self._mode = mode
         command = chr(SET_PIN_MODE)
         command += chr(self.pin_number)
         return self._mode
         
     mode = property(_get_mode, _set_mode)
+    """
+    Mode of operation for the pin. Can be one of the pin modes: INPUT, OUTPUT,
+    ANALOG, PWM or SERVO (or UNAVAILABLE)
+    """
     
     def enable_reporting(self):
         """ Set an input pin to report values """
                 msg = chr(ANALOG_MESSAGE + self.pin_number)
                 msg += chr(value % 128)
                 msg += chr(value >> 7)
-                self.board.sp.write(msg)
-        
-    def servo_config(self, min_pulse, max_pulse, angle=0):
-        """
-        Servo config on this pin.
-        
-        # TODO use send_sysex for this.
-        # TODO document
-        # TODO should this move to board?
-        """
-        pulse_min = chr(min_pulse % 128)
-        pulse_min += chr(min_pulse >> 7)
-        pulse_max = chr(max_pulse % 128)
-        pulse_max += chr(max_pulse >> 7)
-        angle_byte = chr(angle % 128)
-        angle_byte += chr(angle >> 7)
-        
-        self.board.sp.write(chr(START_SYSEX))
-        self.board.sp.write(chr(SERVO_CONFIG))
-        self.board.sp.write(chr(self.pin_number))
-        self.board.sp.write(pulse_min)
-        self.board.sp.write(pulse_max)
-        self.board.sp.write(chr(END_SYSEX))
+                self.board.sp.write(msg)
 import unittest
 import doctest
 import serial
+from itertools import chain
 import pyfirmata
 from pyfirmata import mockup
 from pyfirmata.boards import BOARDS
-from pyfirmata.util import str_to_two_byte_iter
+from pyfirmata.util import str_to_two_byte_iter, to_two_bytes
 
 # Messages todo left:
 
         while res:
             res = self.board.sp.read()
             serial_msg += res
-        self.assertEqual(''.join(list_of_chrs), serial_msg)
+        self.assertEqual(''.join(list(list_of_chrs)), serial_msg)
 
     # First test the handlers
     def test_handle_analog_message(self):
         while len(self.board.sp):
             self.board.iterate()
         self.assertEqual(self.board.analog[4].read(), 1.0)
-        
-        
+    
+    # Servo config
+    # --------------------
+    # 0  START_SYSEX (0xF0)
+    # 1  SERVO_CONFIG (0x70)
+    # 2  pin number (0-127)
+    # 3  minPulse LSB (0-6)
+    # 4  minPulse MSB (7-13)
+    # 5  maxPulse LSB (0-6)
+    # 6  maxPulse MSB (7-13)
+    # 7  END_SYSEX (0xF7)
+    #
+    # then sets angle
+    # 8  analog I/O message (0xE0)
+    # 9  angle LSB
+    # 10 angle MSB
+    def test_servo_config(self):
+        self.board.servo_config(2)
+        data = chain([chr(0xF0), chr(0x70), chr(2)], to_two_bytes(544), 
+            to_two_bytes(2400), chr(0xF7), chr(0xE0 + 2), chr(0), chr(0))
+        self.assert_serial(*data)
+
+    def test_servo_config_min_max_pulse(self):
+        self.board.servo_config(2, 600, 2000)
+        data = chain([chr(0xF0), chr(0x70), chr(2)], to_two_bytes(600), 
+            to_two_bytes(2000), chr(0xF7), chr(0xE0 + 2), chr(0), chr(0))
+        self.assert_serial(*data)
+
+    def test_servo_config_min_max_pulse_angle(self):
+        self.board.servo_config(2, 600, 2000, angle=90)
+        data = chain([chr(0xF0), chr(0x70), chr(2)], to_two_bytes(600), 
+            to_two_bytes(2000), chr(0xF7))
+        angle_set = [chr(0xE0 + 2), chr(90 % 128), 
+            chr(90 >> 7)] # Angle set happens through analog message
+        data = list(data) + angle_set
+        self.assert_serial(*data)
+
+    def test_servo_config_invalid_pin(self):
+        self.assertRaises(IOError, self.board.servo_config, 1)
+
+    def test_set_mode_servo(self):
+        p = self.board.digital[2]
+        p.mode = pyfirmata.SERVO
+        data = chain([chr(0xF0), chr(0x70), chr(2)], to_two_bytes(544), 
+            to_two_bytes(2400), chr(0xF7), chr(0xE0 + 2), chr(0), chr(0))
+        self.assert_serial(*data)
+
+
 class TestBoardLayout(BoardBaseTest):
 
     def test_pwm_layout(self):
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.