Tino de Bruijn avatar Tino de Bruijn committed d687fae

Inital commit of pyFirmata

Comments (0)

Files changed (8)

+syntax:glob
+
+*.pyc
+.DS_Store
+cov
+.coverage
+applet
+Copyright (c) 2010, Tino de Bruijn, The Netherlands
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this 
+  list of conditions and the following disclaimer.
+* Redistributions in binary form must reproduce the above copyright notice, 
+  this list of conditions and the following disclaimer in the documentation 
+  and/or other materials provided with the distribution.
+* Neither the name of the GEN Project group nor the names of its contributors
+  may be used to endorse or promote products derived from this software
+  without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+from pyfirmata import *
+
+__version__ = '0.1'

pyfirmata/boards.py

+BOARDS = {
+    'normal' : {
+        'digital' : tuple(x for x in range(14)),
+        'analog' : tuple(x for x in range(6)),
+        'pwm' : (3, 5, 6, 9, 10, 11),
+        'use_ports' : True,
+        'disabled' : (0, 1, 14, 15) # Rx, Tx, Crystal
+    },
+    'mega' : {
+        'digital' : tuple(x for x in range(54)),
+        'analog' : tuple(x for x in range(16)),
+        'pwm' : tuple(x for x in range(2,14)),
+        'use_ports' : True,
+        'disabled' : (0, 1, 14, 15) # Rx, Tx, Crystal
+    }
+}

pyfirmata/mockup.py

+import pyfirmata
+
+class MockupSerial(object):
+    """ A Mockup object for python's Serial. Can only return Firmata version. """
+    def __init__(self, port, baudrate, timeout=1):
+        self.return_id = False
+        self.echoed = False
+        
+    def read(self, len=1):
+        if self.return_id:
+            if self.echoed:
+                self.return_id = False
+                return chr(1)
+            else:
+                self.echoed = True
+                return chr(pyfirmata.REPORT_ARDUINO_ID)
+        else:
+            return ''
+            
+    def write(self, value):
+        if value == chr(pyfirmata.REPORT_ARDUINO_ID):
+            self.return_id = True
+            
+    def close(self):
+        pass
+
+class MockupArduinos(pyfirmata.Arduinos):
+    def __init__(self, identifier='', arduinos_map={}, values_dict={}):
+        self.values_dict = values_dict
+        for a in arduinos_map.values():
+            self[a['name']] = MockupArduino(name=a['name'], type=a['board'], values_dict=values_dict)
+        
+    def update_values_dict(self, values_dict=None):
+        if values_dict is not None:
+            self.values_dict.update(values_dict)
+        if values_dict == {}: # reset
+            self.values_dict = {}
+        self['gen'].values_dict = self.values_dict
+        self['gen'].update_values_dict()
+    
+    def reset_taken(self):
+        self['gen'].reset_taken()
+        
+class MockupArduino(pyfirmata.Arduino):
+
+    def __init__(self, port='', type="normal", values_dict={}, name=''):
+        self.name = name
+        self.sp = MockupSerial(port, 57600, timeout=0)
+        self.setup_layout(pyfirmata.BOARDS[type])
+        self.values_dict = values_dict
+        self.id = 1
+        
+    def reset_taken(self):
+        for key in self.taken['analog']:
+            self.taken['analog'][key] = False
+        for key in self.taken['digital']:
+            self.taken['digital'][key] = False
+        
+    def update_values_dict(self):
+        for port in self.digital_ports:
+            port.values_dict = self.values_dict
+            port.update_values_dict()
+        for pin in self.analog:
+            pin.values_dict = self.values_dict
+        
+class MockupPort(pyfirmata.DigitalPort):
+    def __init__(self, sp, port_number):
+        self.sp = sp
+        self.port_number = port_number
+        self.reporting = False
+        
+        self.pins = []
+        for i in range(8):
+            pin_nr = i + self.port_number * 8
+            self.pins.append(MockupPin(sp, pin_nr, type=pyfirmata.DIGITAL, port=self))
+
+    def update_values_dict(self):
+        for pin in self.pins:
+            pin.values_dict = self.values_dict
+        
+class MockupPin(pyfirmata.Pin):
+    def __init__(self, *args, **kwargs):
+        self.values_dict = kwargs.get('values_dict', {})
+        try:
+            del kwargs['values_dict']
+        except KeyError:
+            pass
+        super(MockupPin, self).__init__(*args, **kwargs)
+    
+    def read(self):
+        if self.value is None:
+            try:
+                type = self.port and 'd' or 'a'
+                return self.values_dict[type][self.pin_number]
+            except KeyError:
+                return None
+        else:
+            return self.value
+            
+    def get_in_output(self):
+        if not self.port and not self.mode: # analog input
+            return 'i'
+        else:
+            return 'o'
+            
+    def set_active(self, active):
+        self.is_active = active
+        
+    def get_active(self):
+        return self.is_active
+        
+    def write(self, value):
+        if self.mode == pyfirmata.UNAVAILABLE:
+            raise IOError, "Cannot read from pin %d" \
+                           % (self.pin_number)
+        if self.mode == pyfirmata.INPUT:
+            raise IOError, "%s pin %d is not an output" \
+                            % (self.port and "Digital" or "Analog", self.get_pin_number())
+        if not self.port:
+            raise AttributeError, "AnalogPin instance has no attribute 'write'"
+        # if value != self.read():
+        self.value = value
+        
+class Iterator(object):
+    def __init__(self, *args, **kwargs):
+        pass
+    def start(self):
+        pass
+    def stop(self):
+        pass
+
+pyfirmata.DigitalPort = MockupPort
+pyfirmata.Pin = MockupPin
+
+Arduinos = MockupArduinos
+Arduino = MockupArduino

pyfirmata/pyfirmata.py

+import time
+import serial
+import os
+from serial import SerialException
+import threading
+import util
+
+# Message command bytes - straight from Firmata.h
+COMMANDS = dict(
+    DIGITAL_MESSAGE = 0x90,      # send data for a digital pin
+    ANALOG_MESSAGE = 0xE0,       # send data for an analog pin (or PWM)
+    DIGITAL_PULSE = 0x91,        # SysEx command to send a digital pulse
+
+    # PULSE_MESSAGE = 0xA0,      # proposed pulseIn/Out msg (SysEx)
+    # SHIFTOUT_MESSAGE = 0xB0,   # proposed shiftOut msg (SysEx)
+    REPORT_ANALOG = 0xC0,        # enable analog input by pin #
+    REPORT_DIGITAL = 0xD0,       # enable digital input by port pair
+    START_SYSEX = 0xF0,          # start a MIDI SysEx msg
+    SET_PIN_MODE = 0xF4,         # set a pin to INPUT/OUTPUT/PWM/etc
+    END_SYSEX = 0xF7,            # end a MIDI SysEx msg
+    REPORT_VERSION = 0xF9,       # report firmware version
+    SYSTEM_RESET = 0xFF,         # reset from MIDI
+)
+
+# Setup a helper class to deal with commands
+CMDS = util.Commands(COMMANDS)
+
+# Pin types
+DIGITAL = 1         # same as OUTPUT below
+ANALOG = 2          # same as below
+
+# Pin modes.
+# except from UNAVAILABLE taken from Firmata.h
+UNAVAILABLE = -1 
+INPUT = 0          # as defined in wiring.h
+OUTPUT = 1         # as defined in wiring.h
+ANALOG = 2         # analog pin in analogInput mode
+PWM = 3            # digital pin in PWM output mode
+
+class PinAlreadyTakenError(Exception):
+    pass
+    
+class UnknownArduinoError(KeyError):
+    pass
+
+class InvalidPinDefError(Exception):
+    pass
+    
+class NoInputWarning(RuntimeWarning):
+    pass
+
+# Command handlers
+def update_analog(board, data):
+    if len(data) < 3:
+        return False
+    pin_number, lsb, msb = data
+    value = float(msb << 7 | lsb) / 1023
+    board.analog[pin_number].value = value
+    return True
+    
+CMDS.add_handler('ANALOG_MESSAGE', update_analog)
+
+def update_digital(board, data):
+    if len(data) < 3:
+        return False
+    pin_number, lsb, msb = data
+    value = msb << 7 | lsb
+    board.digital[pin_number].value = value
+    return True
+    
+CMDS.add_handler('DIGITAL_MESSAGE', update_digital)
+
+def update_version(board, data):
+    if len(data) < 3:
+        return False
+    major, minor = data
+    board.firmata_version = (major, minor)
+    
+CMDS.add_handler('REPORT_VERSION', update_version)
+
+class Arduinos(dict):
+    """
+    A dictonary to manage multiple Arduinos on the same machine. Tries to
+    connect to all defined usb ports, and then tries to fetch the arduino's
+    name. Result is a dictionary of pyduino.Arduino classes mapped to their
+    names.
+    """
+    def __init__(self, base_dir='/dev/', identifier='tty.usbserial', arduinos_map=None):
+        """
+        Searches all possible USB devices in ``base_dir`` that start with
+        ``identifier``. If given an ``arduinos_map`` dictionary it will setup
+        the arduinos in the map according to their type. Finding an arduino
+        that does not identify itself at all, that are not in the map, or
+        arduinos that are missing fail silenly. Use the ``verify_found_arduinos``
+        method to check this.
+        
+        :base_dir arg: A absolute path for the base directory to search for USB connections
+        :identifier arg: A string a USB connection startswith and should be tried
+        :arduinos_map arg: A dictionary in the form of::
+            arduinos_map = { 1 : {'name' : 'myduino', 'board' : 'normal'},
+                             2 : {'name' : 'mymega', 'board' : 'mega'} }
+        """
+        for device in os.listdir(base_dir):
+            if device.startswith(identifier):
+                try:
+                    arduino = Arduino(os.path.join(base_dir, device))
+                except SerialException:
+                    pass
+                else:
+                    if arduino.id:
+                        try:
+                            arduino.name = arduinos_map[arduino.id]['name']
+                            arduino.setup_layout(BOARDS[arduinos_map[arduino.id]['board']])
+                        except (KeyError, TypeError): # FIXME Possible KeyErrors from setup_layout are caught here...
+                            # FIXME Better to check if arduinos_map is valid if given, otherwise raise error
+                            pass
+                    if not arduino.name:
+                        arduino.name = 'unknown-%s' % device[-5:-1]
+                    self[arduino.name] = arduino
+                            
+    def verify_found_arduinos(self, arduinos_map):
+        """
+        Returns True if all and only arduinos in the map are found. Returns
+        False otherwise
+        """
+        names = set()
+        for arduino in arduinos_map:
+            names.add(arduino['name'])
+        return names is set(self.keys())
+        
+    def get_pin(self, pin_def):
+        parts = pin_def.split(':')
+        try:
+            return self[parts[0]].get_pin([parts[1], parts[2], parts[3]])
+        except KeyError:
+            raise UnknownArduinoError("Arduino named %s not found" % parts[0])
+        except IndexError:
+            raise InvalidPinDefError("%s is not a valid pin definition" % pin_def)
+            
+    def exit(self):
+        for arduino in self.values():
+            arduino.exit()
+            
+    def __del__(self):
+        ''' 
+        The connection with the arduino can get messed up when a script is
+        closed without calling arduino.exit() (which closes the serial
+        connection). Therefore do it here and hope it helps.
+        '''
+        self.exit()
+
+class Arduino(object):
+    """Base class for the arduino board"""
+    name = None
+    
+    def __init__(self, port, type="normal"):
+        self.sp = serial.Serial(port, 4800, timeout=0)
+        # Allow 2 secs for Diecimila auto-reset to happen
+        self.pass_time(2)
+        # setup as a 'normal' arduino so we can at least get an id
+        self.setup_layout(BOARDS[type])
+        # Try to get the id
+        self.id = None
+        self.refresh_id()
+        self.refresh_id() # Mega needs it... # FIXME Mega sends string first, why?
+    
+    def __del__(self):
+        ''' 
+        The connection with the arduino can get messed up when a script is
+        closed without calling arduino.exit() (which closes the serial
+        connection). Therefore do it here and hope it helps.
+        '''
+        self.exit()
+        
+    def send_as_two_bytes(self, val):
+        self.sp.write(chr(val % 128) + chr(val >> 7))
+        
+    def received_as_two_bytes(self, bytes):
+        lsb, msb = bytes
+        return msb << 7 | lsb
+        
+    def setup_layout(self, board_layout):
+        """ 
+        Setup the Pin instances based on the given board-layout. Maybe it will
+        be possible to do this automatically in the future, by polling the
+        board for its type.
+        """        
+        # Create pin instances based on board layout
+        self.analog = []
+        for i in board_layout['analog']:
+            self.analog.append(Pin(self.sp, i, arduino_name=self.name))
+        # Only create digital ports if the Firmata can use them (ie. not on the Mega...)
+        # TODO Why is (TOTAL_FIRMATA_PINS + 7) / 8 used in Firmata?
+        if board_layout['use_ports']:
+            self.digital = []
+            self.digital_ports = []
+            for i in range(len(board_layout['digital']) / 7):
+                self.digital_ports.append(Port(self.sp, i, self))
+            # Allow to access the Pin instances directly
+            for port in self.digital_ports:
+                self.digital += port.pins
+            for i in board_layout['pwm']:
+                self.digital[i].PWM_CAPABLE = True
+        else:
+            self.digital = []
+            for i in board_layout['digital']:
+                self.digital.append(Pin(self.sp, i, type=DIGITAL, arduino_name=self.name))
+        # Disable certain ports like Rx/Tx and crystal ports
+        for i in board_layout['disabled']:
+            self.digital[i].mode = UNAVAILABLE
+        # Create a dictionary of 'taken' pins. Used by the get_pin method
+        self.taken = { 'analog' : dict(map(lambda p: (p.pin_number, False), self.analog)),
+                       'digital' : dict(map(lambda p: (p.pin_number, False), self.digital)) }
+        
+    def refresh_id(self):
+        # Obtain id
+        self.send_sysex(REPORT_ARDUINO_ID)
+        # Trying to make sure identifier gets set before the end of the function
+        # But stop the script if it takes longer than 5 seconds
+        count = 0
+        while self.id is None and count < 5:
+            count += 1
+            self.pass_time(1)
+            res = True
+            while res:
+                res = self.iterate()
+                
+    def __str__(self):
+        return "Arduino: %s"% self.sp.port
+        
+    def get_pin(self, pin_def):
+        """
+        Returns the activated pin given by the pin definition.
+        May raise an ``InvalidPinDefError`` or a ``PinAlreadyTakenError``.
+        
+        :arg pin_def: Pin definition as described in :ref:`arduino-pin-definitions`,
+        but without the arduino name. So for example ``a:1:i``.
+        """
+        if type(pin_def) == list:
+            bits = pin_def
+        else:
+            bits = pin_def.split(':')
+        a_d = bits[0] == 'a' and 'analog' or 'digital'
+        part = getattr(self, a_d)
+        pin_nr = int(bits[1])
+        if pin_nr >= len(part):
+            raise InvalidPinDefError('Invalid pin definition: %s at position 3 on %s' % (pin_def, self.name))
+        if getattr(part[pin_nr], 'mode', None)  == UNAVAILABLE:
+            raise InvalidPinDefError('Invalid pin definition: UNAVAILABLE pin %s at position on %s' % (pin_def, self.name))
+        if self.taken[a_d][pin_nr]:
+            raise PinAlreadyTakenError('%s pin %s is already taken on %s' % (a_d, bits[1], self.name))
+        # ok, should be available
+        pin = part[pin_nr]
+        self.taken[a_d][pin_nr] = True
+        if pin.type is DIGITAL:
+            if bits[2] == 'p':
+                pin.mode = PWM
+            elif bits[2] is not 'o':
+                pin.mode = INPUT
+        else:
+            pin.enable_reporting()
+        return pin
+        
+    def pass_time(self, t):
+        """ 
+        Non-blocking time-out for ``t`` seconds with a resolution of 1/10 of a
+        second
+        """
+        cont = time.time() + t
+        while time.time() < cont:
+            time.sleep(0)
+            
+    def send_sysex(self, sysex_cmd, data=[]):
+        """
+        Sends a SysEx msg.
+        
+        :arg sysex_cmd: A sysex command byte
+        :arg data: A list of data values
+        """
+        self.sp.write(chr(START_SYSEX))
+        self.sp.write(chr(sysex_cmd))
+        for byte in data:
+            try:
+                byte = chr(byte)
+            except ValueError:
+                byte = chr(byte >> 7) # TODO send multiple bytes
+            self.sp.write(byte)
+        self.sp.write(chr(END_SYSEX))
+        
+    def iterate(self):
+        """ 
+        Reads and handles data from the microcontroller over the serial port.
+        """
+        self._process_data(self.sp.read())
+        
+    def _process_data(self, byte):
+        """
+        Does the actual processing of the data from the microcontroller and
+        delegates the command processing to _process_command
+        """
+        if not byte:
+            return
+        byte = ord(byte)
+        if self._parsing_sysex:
+            if byte == END_SYSEX:
+                self._parsing_sysex = False
+                self._process_sysex_message(self._stored_data)
+                self._stored_data = []
+            else:
+                self._stored_data.append(byte)
+        elif not self._command:
+            if byte not in CMDS:
+                # We received a byte not denoting a known command while we
+                # are not processing any commands data. Nothing we can do
+                # about it so discard and we'll see what comes next.
+                return
+            self._command = byte
+        else:
+            # This is a data command either belonging to a sysex message, or
+            # to a multibyte command. Append it to the data and see if we can
+            # process the command. If _process_command returns False, it
+            # needs more data.
+            self._stored_data.append(byte)
+            try:
+                if self._process_command(self._command, self._stored_data):
+                    self._command = None
+                    self._stored_data = []
+            except ValueError:
+                self._command = None
+                self._stored_data = []
+    
+    def _process_command(command, data):
+        """
+        Tries to get a handler for this command from the CMDS helper and will
+        return its return status.
+        """
+        try:
+             handle_cmd = CMDS.get_handler(command)
+             return handle_cmd(self, data)
+        except (KeyError, ValueError):
+            # something got corrupted
+            raise ValueError
+            
+    def get_firmata_version(self):
+        """
+        Returns a version tuple (major, mino)  for the firmata firmware 
+        """
+        return self.firmata_version
+        
+    def reset_taken(self):
+        for key in self.taken['analog'].keys():
+            self.taken['analog'][key] = False
+        for key in self.taken['digital'].keys():
+            self.taken['digital'][key] = False
+        
+    def exit(self):
+        """ Cleanly exits """
+        self.sp.close()
+        
+
+class Port(object):
+    """Port on the arduino board"""
+    def __init__(self, sp, port_number, arduino):
+        self.arduino = arduino
+        self.sp = sp
+        self.port_number = port_number
+        self.reporting = False
+        
+        self.pins = []
+        for i in range(8):
+            pin_nr = i + self.port_number * 8
+            self.pins.append(Pin(sp, pin_nr, type=DIGITAL, port=self, arduino_name=self.arduino.name))
+            
+    def __str__(self):
+        return "Digital Port %i on %s" % (self.port_number, self.arduino)
+        
+    def enable_reporting(self):
+        """ Set the port to report values """
+        self.reporting = True
+        msg = chr(REPORT_DIGITAL + self.port_number + 1)
+        self.sp.write(msg)
+        
+    def disable_reporting(self):
+        """ Disable the reporting of the port """
+        self.reporting = False
+        msg = chr(REPORT_DIGITAL + self.port_number + 0)
+        self.sp.write(msg)
+        
+    def set_value(self, mask):
+        """Record the value of each of the input pins belonging to the port"""
+        
+        for pin in self.pins:
+            if pin.mode is INPUT:
+                pin.set_value((mask & (1 << pin.pin_number)) > 1)
+                
+    def write(self):
+        """Set the output pins of the port to the correct state"""
+        mask = 0
+        for pin in self.pins:
+            if pin.mode == OUTPUT:
+                if pin.value == 1:
+                    pin_nr = pin.pin_number - self.port_number * 8
+                    mask |= 1 << pin_nr
+        msg = chr(DIGITAL_MESSAGE + self.port_number)
+        msg += chr(mask % 128)
+        msg += chr(mask >> 7)
+        self.sp.write(msg)
+
+class Pin(object):
+    """ A Pin representation """
+    def __init__(self, sp, pin_number, type=ANALOG, arduino_name=None, port=None):
+        self.sp = sp
+        self.pin_number = pin_number
+        self.type = type
+        self.arduino_name = arduino_name
+        self.port = port
+        self.PWM_CAPABLE = False
+        self._mode = (type == DIGITAL and OUTPUT or INPUT)
+        self.reporting = False
+        self.value = None
+        
+    def __str__(self):
+        type = {ANALOG : 'Analog', DIGITAL : 'Digital'}[self.type]
+        if self.arduino_name:
+            return "%s pin %d on %s" % (type, self.pin_number, self.arduino_name)
+        return "%s pin %d" % (type, self.pin_number)
+
+    def _set_mode(self, mode):
+        """
+        Set the mode of operation for the pin
+        :mode arg: Can be one of the pin modes: INPUT, OUTPUT, ANALOG or PWM
+        """
+        if mode is UNAVAILABLE:
+            self._mode = UNAVAILABLE
+            return
+        if mode is PWM and not self.PWM_CAPABLE:
+            raise IOError, "%s does not have PWM capabilities" % self
+        if self._mode is UNAVAILABLE:
+            raise IOError, "%s can not be used through Firmata" % self
+        self._mode = mode
+        command = chr(SET_PIN_MODE)
+        command += chr(self.pin_number)
+        command += chr(mode)
+        self.sp.write(command)
+        
+    def _get_mode(self):
+        return self._mode
+        
+    mode = property(_get_mode, _set_mode)
+    
+    def enable_reporting(self):
+        """ Set an input pin to report values """
+        if self.mode is not INPUT:
+            raise IOError, "%s is not an input and can therefore not report" % self
+        self.reporting = True
+        msg = chr(REPORT_ANALOG + self.pin_number)
+        msg += chr(1)
+        self.sp.write(msg)
+        
+    def disable_reporting(self):
+        """ Disable the reporting of an input pin """
+        self.reporting = False
+        msg = chr(REPORT_ANALOG + self.pin_number)
+        msg += chr(0)
+        self.sp.write(msg)
+    
+    def read(self):
+        """
+        Returns the output value of the pin. This value is updated by the
+        arduios' ``iterate`` method. Value is alway in the range 0.0 - 1.0
+        """
+        if self.mode == UNAVAILABLE:
+            raise IOError, "Cannot read pin %s"% self.__str__()
+        return self.value
+        
+    def write(self, value):
+        """
+        Output a voltage from the pin
+
+        :value arg:
+        Uses value as a boolean if the pin is in output mode, or expects a
+        float from 0 to 1 if the pin is in PWM mode.
+        """
+        if self.mode is UNAVAILABLE:
+            raise IOError, "%s can not be used through Firmata" % self
+        if self.mode is INPUT:
+            raise IOError, "%s is set up as an INPUT and can therefore not be written to" % self
+        if value is not self.value:
+            self.value = value
+            if self.mode is OUTPUT:
+                if self.port:
+                    self.port.write()
+                else:
+                    msg = chr(DIGITAL_MESSAGE)
+                    msg += chr(self.pin_number)
+                    msg += chr(value)
+                    self.sp.write(msg)
+            elif self.mode is PWM:
+                value = int(round(value * 255))
+                msg = chr(ANALOG_MESSAGE + self.pin_number)
+                msg += chr(value % 128)
+                msg += chr(value >> 7)
+                self.sp.write(msg)
+                
+    def send_sysex(self, sysex_cmd, data=[]):
+        """
+        Sends a SysEx msg.
+        
+        :arg sysex_cmd: A sysex command byte
+        :arg data: A list of data values
+        """
+        # TODO make the arduios send_sysex available to the pin
+        self.sp.write(chr(START_SYSEX))
+        self.sp.write(chr(sysex_cmd))
+        for byte in data:
+            try:
+                byte = chr(byte)
+            except ValueError:
+                byte = chr(byte >> 7) # TODO send multiple bytes
+            self.sp.write(byte)
+        self.sp.write(chr(END_SYSEX))
+                
+    def send_pulse(self, pulse_width):
+        data = [self.pin_number]
+        data += list(util.break_to_bytes(pulse_width))
+        self.send_sysex(DIGITAL_PULSE, data)
+
+        
+class Iterator(threading.Thread):
+    def __init__(self, arduino):
+        super(Iterator, self).__init__()
+        self.arduino = arduino
+        
+    def run(self):
+        while 1:
+            try:
+                while self.arduino.iterate():
+                    self.arduino.iterate()
+                time.sleep(0.001)
+            except (AttributeError, SerialException, OSError), e:
+                # this way we can kill the thread by setting the arduino object
+                # to None, or when the serial port is closed by arduino.exit()
+                break
+            except Exception, e:
+                # catch 'error: Bad file descriptor'
+                # iterate may be called while the serial port is being closed,
+                # causing an "error: (9, 'Bad file descriptor')"
+                if getattr(e, 'errno', None) == 9:
+                    break
+                try:
+                    if e[0] == 9:
+                        break
+                except (TypeError, IndexError):
+                    pass
+                raise

pyfirmata/util.py

+
+class Commands(dict):
+    """
+    A helper class to deal with firmata command bytes. Set it up with a
+    dictionary of commandnames and there bytes. Then add handlers for certain
+    commands with ``add_handler``. Allows for accessing the commands as
+    attributes.
+    """
+    _handlers = {}
+    def __init__(self, bytes_dict):
+        if not type(bytes_dict):
+            bytes_dicts = dict(bytes_dict)
+        self = bytes_dict
+        
+    def __getattr__(self, name):
+        return self[name]
+    
+    def add_handler(self, command, handler):
+        if not command in self:
+            raise ValueError, "This command does not exist yet. Add it first with add_command"
+        self._handlers[command] = handler
+
+    def add_command(self, command, byte):
+        self[command] = byte
+    
+    def get_handler(self, command):
+        try:
+            return self._handlers[command]
+        except IndexError:
+            raise NotImplementedError, "A handler for %s has not been defined yet." % chr(command)
+    
+def break_to_bytes(value):
+    """
+    Breaks a value into values of less than 255 that form value when multiplied.
+    (Or almost do so with primes)
+    Returns a tuple
+    
+    >>> break_to_bytes(200)
+    (200,)
+    >>> break_to_bytes(800)
+    (200, 4)
+    >>> break_to_bytes(802)
+    (2, 2, 200)
+    """
+    if value < 256:
+        return (value,)
+    c = 256
+    least = (0, 255)
+    for i in range(254):
+        c -= 1
+        rest = value % c
+        if rest == 0 and value / c < 256:
+            return (c, value / c)
+        elif rest == 0 and value / c > 255:
+            parts = list(break_to_bytes(value / c))
+            parts.insert(0, c)
+            return tuple(parts)
+        else:
+            if rest < least[1]:
+                least = (c, rest)
+    return (c, value / c)
+
+if __name__ == '__main__':
+    import doctest
+    doctest.testmod()
+import unittest
+import serial
+import time
+from optparse import OptionParser
+
+import pyfirmata
+import mockup
+
+# TODO Test Arduinos class
+
+class TestLiveArduinos(unittest.TestCase):
+    """
+    Test two live arduinos. On the 'transmitter' arduino:
+    
+    - connect 5v and analog port 0 directly
+    - connect digital port 2 to the 'receiver' arduino's analog port 2 directly
+    - connect digital port 5 to the 'receiver' arduino's analog port 5 directly
+    - connect 3v to the 'receiver' arduino's analog port 1 directly
+    
+    On the 'receiver' arduino:
+    
+    - connect ground and analog port 0 directly
+    
+    ..note::
+        This test can take quite a while, as it may take about 4 seconds each
+        time the setUp function is called.
+    """
+    
+    def setUp(self):
+        self.arduinos = pyfirmata.Arduinos()
+        assert len(self.arduinos) >= 2, "Only %d arduino(s) found. I need at least 2!" % len(arduinos)
+        x, y = self.arduinos.values()[0], self.arduinos.values()[1]
+        it1, it2 = pyfirmata.Iterator(x), pyfirmata.Iterator(y)
+        it1.start(), it2.start()
+        # give iterator time to iterate
+        self.pass_time(0.1)
+        x0, y0 = x.get_pin('a:0:i'), y.get_pin('a:0:i')
+        self.pass_time(0.1)
+        if x0.read() > 0.9 and y0.read() < 0.1: # x is transmitter
+            self.transmitter = x
+            self.receiver = y
+        elif y0.read() > 0.9 and x0.read() < 0.1: # y is transmitter
+            self.transmitter = x
+            self.receiver = y
+        else:
+            self.fail("Could not complete setup. One, and only one of the \
+                arduino's analog ports should be set high by connecting it to \
+                its 5v output. That arduino will be considered the transmitter. \
+                Values received: %f and %f" % (x0.read(), y0.read()))
+        self.Ta0 = self.transmitter.analog[0] # already taken by line 36
+        self.Ra1 = self.receiver.get_pin('a:1:i')
+        self.Ra2 = self.receiver.get_pin('a:2:i')
+        self.Ra5 = self.receiver.get_pin('a:5:i')
+        self.Td2 = self.transmitter.get_pin('d:2:o')
+        self.Td5 = self.transmitter.get_pin('d:5:p')
+        self.Td13 = self.transmitter.get_pin('d:13:o')
+        
+    def iterate(self):
+        self.transmitter.iterate()
+        self.receiver.iterate()
+        
+    def pass_time(self, t):
+        cont = time.time() + t
+        while time.time() < cont:
+            pass
+            
+    def test_led(self):
+        self.Td13.write(1)
+        print "Transmitters d13 should be high for 2 seconds..."
+        self.pass_time(2)
+        self.assertEqual(self.Td13.read(), 1)
+        
+    def test_high(self):
+        self.pass_time(0.1)
+        value = self.Ta0.read()
+        self.failIf(value < 0.99, msg="Too low: %f" % value) # Connected to 5v, it should be almost 1
+        value = self.Ra1.read()
+        self.failUnless(0.65 < value < 0.69, msg="not getting 3v! (%f not between 0.65 and 0.69)" % value) # Connected to 3v
+        
+    def test_in_out(self):
+        self.Td2.write(1)
+        self.pass_time(0.1)
+        value = self.Ra2.read()
+        self.failIf(value < 0.99, msg="Not high enough: %f" % value)
+        self.Td2.write(0)
+        self.assert_(self.Td2.read() == 0)
+        self.pass_time(0.1)
+        value = self.Ra2.read()
+        self.failIf(value > 0.01, msg="Too high: %f" % value)
+
+    # pwm into analog doesn't work...
+    def test_pwm(self):
+        self.pass_time(0.5)
+        # test high
+        self.Td5.write(1.0)
+        self.pass_time(1)
+        value = self.Ra5.read()
+        self.failUnless(0.99 < value < 1.0, msg="%f not between 0.99 and 1.0" % value)
+        # test middle
+        self.Td5.write(0.5)
+        self.pass_time(0.5)
+        value = self.Ra5.read()
+        self.failUnless(0.49 < value < 0.51, msg="%f not between 0.49 and 0.51" % value)
+        # test low
+        self.Td5.write(0.1)
+        self.pass_time(0.5)
+        value = self.Ra5.read()
+        self.failUnless(0.09 < value < 0.11, msg="%f not between 0.09 and 0.11" % value)
+        
+    def tearDown(self):
+        self.arduinos.exit()
+
+class TestArduino(unittest.TestCase):
+    # TODO Test layout of Arduino Mega
+    # TODO Test if messages are correct...
+    
+    def setUp(self):
+        pyfirmata.serial.Serial = mockup.MockupSerial
+        self.arduino = pyfirmata.Arduino('test')
+        self.arduino.setup_layout(pyfirmata.BOARDS['normal'])
+    
+    def test_identifier(self):
+        self.assertEqual(self.arduino.id, ord(chr(1)))
+
+    def test_pwm_layout(self):
+        pins = []
+        for pin in self.arduino.digital:
+            if pin.PWM_CAPABLE:
+                pins.append(self.arduino.get_pin('d:%d:p' % pin.pin_number))
+        for pin in pins:
+            self.assertEqual(pin.mode, pyfirmata.PWM)
+        
+    def test_get_pin_digital(self):
+        pin = self.arduino.get_pin('d:13:o')
+        self.assertEqual(pin.pin_number, 13)
+        self.assertEqual(pin.mode, pyfirmata.OUTPUT)
+        self.assertEqual(pin.sp, self.arduino.sp)
+        self.assertEqual(pin.port.port_number, 1)
+        self.assertEqual(pin.port.get_active(), 1)
+        
+    def test_get_pin_analog(self):
+        pin = self.arduino.get_pin('a:5:i')
+        self.assertEqual(pin.pin_number, 5)
+        self.assertEqual(pin.sp, self.arduino.sp)
+        self.assertEqual(pin.reporting, True)
+        self.assertEqual(pin.value, None)
+        
+    def tearDown(self):
+        self.arduino.exit()
+        pyfirmata.serial.Serial = serial.Serial
+        
+class TestMockupArduino(unittest.TestCase):
+    
+    def setUp(self):
+        self.arduino = mockup.MockupArduino('test')
+    
+    def test_identifier(self):
+        self.assertEqual(self.arduino.identifier, ord(chr(ID)))
+    
+    def test_pwm_layout(self):
+        pins = []
+        for pin_nr in self.arduino.layout['p']:
+            pins.append(self.arduino.get_pin('d:%d:p' % pin_nr))
+        for pin in pins:
+            mode = pin.get_mode()
+            self.assertEqual(mode, pyfirmata.DIGITAL_PWM)
+        
+    def test_get_pin_digital(self):
+        pin = self.arduino.get_pin('d:13:o')
+        self.assertEqual(pin.pin_number, 5)
+        self.assertEqual(pin.mode, pyfirmata.DIGITAL_OUTPUT)
+        self.assertEqual(pin.sp, self.arduino.sp)
+        self.assertEqual(pin.port.port_number, 1)
+        self.assertEqual(pin._get_board_pin_number(), 13)
+        self.assertEqual(pin.port.get_active(), 1)
+        
+    def test_get_pin_analog(self):
+        pin = self.arduino.get_pin('a:5:i')
+        self.assertEqual(pin.pin_number, 5)
+        self.assertEqual(pin.sp, self.arduino.sp)
+        self.assertEqual(pin.get_active(), 1)
+        self.assertEqual(pin.value, -1)
+        
+    def tearDown(self):
+        self.arduino.exit()
+        pyfirmata.serial.Serial = serial.Serial
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestArduino)
+live_suite = unittest.TestLoader().loadTestsFromTestCase(TestLiveArduinos)
+mockup_suite = unittest.TestLoader().loadTestsFromTestCase(TestMockupArduino)
+
+if __name__ == '__main__':
+    parser = OptionParser()
+    parser.add_option("-l", "--live", dest="live", action="store_true",
+        help="Also run the live tests. Make sure the hardware is connected properly")
+    parser.add_option("-m", "--mockup", dest="mockup", action="store_true",
+        help="Also run the mockup tests")
+    options, args = parser.parse_args()
+    if not options.live and not options.mockup:
+        print "Running normal suite. Also consider running the live (-l, --live) \
+                and mockup (-m, --mockup) suites"
+        unittest.TextTestRunner(verbosity=3).run(suite)
+    if options.live:
+        print "Running the live test suite"
+        unittest.TextTestRunner(verbosity=2).run(live_suite)
+    if options.mockup:
+        print "Running the mockup test suite"
+        unittest.TextTestRunner(verbosity=2).run(mockup_suite)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.