Commits

Tino de Bruijn committed ac455ec Merge

merge servo branch

Comments (0)

Files changed (2)

pyfirmata/pyfirmata.py

 import serial
 import inspect
 import time
-from util import two_byte_iter_to_str
+import itertools
+from util import two_byte_iter_to_str, to_two_bytes
 
 # Message command bytes - straight from Firmata.h
 DIGITAL_MESSAGE = 0x90      # send data for a digital pin
 OUTPUT = 1         # as defined in wiring.h
 ANALOG = 2         # analog pin in analogInput mode
 PWM = 3            # digital pin in PWM output mode
+SERVO = 4          # digital pin in SERVO mode
 
 # Pin types
 DIGITAL = OUTPUT   # same as OUTPUT below
         if pin.type is DIGITAL:
             if bits[2] == 'p':
                 pin.mode = PWM
+            elif bits[2] == 's':
+                pin.mode = SERVO
             elif bits[2] is not 'o':
                 pin.mode = INPUT
         else:
         Sends a SysEx msg.
         
         :arg sysex_cmd: A sysex command byte
-        :arg data: A list of data values
+        :arg data: A list of 7-bit bytes of arbitrary data (bytes may be 
+            already converted to chr's)
         """
         self.sp.write(chr(START_SYSEX))
         self.sp.write(chr(sysex_cmd))
         for byte in data:
             try:
                 byte = chr(byte)
+            except TypeError:
+                pass # byte is already a chr
             except ValueError:
-                byte = chr(byte >> 7) # TODO send multiple bytes
+                raise ValueError('Sysex data can be 7-bit bytes only. '
+                    'Consider using utils.to_two_bytes for bigger bytes.')
             self.sp.write(byte)
         self.sp.write(chr(END_SYSEX))
         
         """
         return self.firmata_version
         
+    def servo_config(self, pin, min_pulse=544, max_pulse=2400, angle=0):
+        """
+        Configure a pin as servo with min_pulse, max_pulse and first angle.
+        ``min_pulse`` and ``max_pulse`` default to the arduino defaults.
+        """
+        if pin > len(self.digital) or self.digital[pin].mode == UNAVAILABLE:
+            raise IOError("Pin %s is not a valid servo pin")
+        data = itertools.chain([pin], to_two_bytes(min_pulse),
+                                        to_two_bytes(max_pulse))
+        self.send_sysex(SERVO_CONFIG, data)
+        
+        # set pin._mode to SERVO so that it sends analog messages
+        # don't set pin.mode as that calls this method
+        self.digital[pin]._mode = SERVO
+        self.digital[pin].write(angle)
+        
     def exit(self):
         """ Call this to exit cleanly. """
+        # First detach all servo's, otherwise it somehow doesn't want to close...
+        # FIXME
+        for pin in self.digital:
+            if pin.mode == SERVO:
+                pin.mode = OUTPUT
         if hasattr(self, 'sp'):
             self.sp.close()
         
         return "%s pin %d" % (type, self.pin_number)
 
     def _set_mode(self, mode):
-        """
-        Set the mode of operation for the pin
-        :arg mode: Can be one of the pin modes: INPUT, OUTPUT, ANALOG or PWM
-        
-        """
         if mode is UNAVAILABLE:
             self._mode = UNAVAILABLE
             return
+        if self._mode is UNAVAILABLE:
+            raise IOError("%s can not be used through Firmata" % self)
         if mode is PWM and not self.PWM_CAPABLE:
-            raise IOError, "%s does not have PWM capabilities" % self
-        if self._mode is UNAVAILABLE:
-            raise IOError, "%s can not be used through Firmata" % self
+            raise IOError("%s does not have PWM capabilities" % self)
+        if mode == SERVO:
+            if self.type != DIGITAL:
+                raise IOError("Only digital pins can drive servos! %s is not"
+                    "digital" % self)
+            self._mode = SERVO
+            self.board.servo_config(self.pin_number)
+            return
+        
+        # Set mode with SET_PIN_MODE message
         self._mode = mode
         command = chr(SET_PIN_MODE)
         command += chr(self.pin_number)
         return self._mode
         
     mode = property(_get_mode, _set_mode)
+    """
+    Mode of operation for the pin. Can be one of the pin modes: INPUT, OUTPUT,
+    ANALOG, PWM or SERVO (or UNAVAILABLE)
+    """
     
     def enable_reporting(self):
         """ Set an input pin to report values """
         """
         Output a voltage from the pin
 
-        :arg value: Uses value as a boolean if the pin is in output mode, or 
-            expects a float from 0 to 1 if the pin is in PWM mode.
+        :arg value: Uses value as a boolean if the pin is in output mode, or
+            expects a float from 0 to 1 if the pin is in PWM mode. If the pin 
+            is in SERVO the value should be in degrees.
         
         """
         if self.mode is UNAVAILABLE:
                 msg += chr(value % 128)
                 msg += chr(value >> 7)
                 self.board.sp.write(msg)
-                
-    def send_sysex(self, sysex_cmd, data=[]):
-        """
-        Sends a SysEx msg.
-        
-        :arg sysex_cmd: A sysex command byte
-        :arg data: A list of data values
-        """
-        # TODO make the boards send_sysex available to the pin
-        self.board.sp.write(chr(START_SYSEX))
-        self.board.sp.write(chr(sysex_cmd))
-        for byte in data:
-            try:
-                byte = chr(byte)
-            except ValueError:
-                byte = chr(byte >> 7) # TODO send multiple bytes
-            self.board.sp.write(byte)
-        self.board.sp.write(chr(END_SYSEX))
+            elif self.mode is SERVO:
+                value = int(value)
+                msg = chr(ANALOG_MESSAGE + self.pin_number)
+                msg += chr(value % 128)
+                msg += chr(value >> 7)
+                self.board.sp.write(msg)
 import unittest
 import doctest
 import serial
+from itertools import chain
 import pyfirmata
 from pyfirmata import mockup
 from pyfirmata.boards import BOARDS
-from pyfirmata.util import str_to_two_byte_iter
+from pyfirmata.util import str_to_two_byte_iter, to_two_bytes
 
 # Messages todo left:
 
         while res:
             res = self.board.sp.read()
             serial_msg += res
-        self.assertEqual(''.join(list_of_chrs), serial_msg)
+        self.assertEqual(''.join(list(list_of_chrs)), serial_msg)
 
     # First test the handlers
     def test_handle_analog_message(self):
         sysex = (chr(0xF0), chr(0x79), chr(1), chr(2), chr(3), chr(0xF7))
         self.assert_serial(*sysex)
         
+    def test_send_sysex_to_big_data(self):
+        self.assertRaises(ValueError, self.board.send_sysex, 0x79, [256, 1])
+        
     def test_receive_sysex_message(self):
         sysex = (chr(0xF0), chr(0x79), chr(2), chr(1), 'a', '\x00', 'b', 
             '\x00', 'c', '\x00', chr(0xF7))
         while len(self.board.sp):
             self.board.iterate()
         self.assertEqual(self.board.analog[4].read(), 1.0)
-        
-        
+    
+    # Servo config
+    # --------------------
+    # 0  START_SYSEX (0xF0)
+    # 1  SERVO_CONFIG (0x70)
+    # 2  pin number (0-127)
+    # 3  minPulse LSB (0-6)
+    # 4  minPulse MSB (7-13)
+    # 5  maxPulse LSB (0-6)
+    # 6  maxPulse MSB (7-13)
+    # 7  END_SYSEX (0xF7)
+    #
+    # then sets angle
+    # 8  analog I/O message (0xE0)
+    # 9  angle LSB
+    # 10 angle MSB
+    def test_servo_config(self):
+        self.board.servo_config(2)
+        data = chain([chr(0xF0), chr(0x70), chr(2)], to_two_bytes(544), 
+            to_two_bytes(2400), chr(0xF7), chr(0xE0 + 2), chr(0), chr(0))
+        self.assert_serial(*data)
+
+    def test_servo_config_min_max_pulse(self):
+        self.board.servo_config(2, 600, 2000)
+        data = chain([chr(0xF0), chr(0x70), chr(2)], to_two_bytes(600), 
+            to_two_bytes(2000), chr(0xF7), chr(0xE0 + 2), chr(0), chr(0))
+        self.assert_serial(*data)
+
+    def test_servo_config_min_max_pulse_angle(self):
+        self.board.servo_config(2, 600, 2000, angle=90)
+        data = chain([chr(0xF0), chr(0x70), chr(2)], to_two_bytes(600), 
+            to_two_bytes(2000), chr(0xF7))
+        angle_set = [chr(0xE0 + 2), chr(90 % 128), 
+            chr(90 >> 7)] # Angle set happens through analog message
+        data = list(data) + angle_set
+        self.assert_serial(*data)
+
+    def test_servo_config_invalid_pin(self):
+        self.assertRaises(IOError, self.board.servo_config, 1)
+
+    def test_set_mode_servo(self):
+        p = self.board.digital[2]
+        p.mode = pyfirmata.SERVO
+        data = chain([chr(0xF0), chr(0x70), chr(2)], to_two_bytes(544), 
+            to_two_bytes(2400), chr(0xF7), chr(0xE0 + 2), chr(0), chr(0))
+        self.assert_serial(*data)
+
+
 class TestBoardLayout(BoardBaseTest):
 
     def test_pwm_layout(self):