1. Ben Bass
  2. pylibftdi

Commits

Ben Bass  committed 58f8f40

refactor to a separate device module; test fixups

  • Participants
  • Parent commits b74c82a
  • Branches default

Comments (0)

Files changed (14)

File CHANGES.txt

View file
 pylibftdi changes
 =================
 
+0.14
+~~~~
+
+* refactor - split the `Device` class from the driver.py module into its
+  own 'device.py' module
+* refactor - fix tests so they work both standalone and when run via e.g.
+  python -m unittest discover
+* change - now resets the flow control and baudrate during device
+  initialisation
+
 0.13
 ~~~~
 

File docs/conf.py

View file
 # built documents.
 #
 # The short X.Y version.
-version = '0.13'
+version = '0.14pre'
 # The full version, including alpha/beta/rc tags.
-release = '0.13'
+release = '0.14pre'
 
 # List of patterns, relative to source directory, that match files and
 # directories to ignore when looking for source files.

File pylibftdi/__init__.py

View file
 rather than a problem with the libftdi library.
 """
 
-__VERSION__ = "0.13"
+__VERSION__ = "0.14pre"
 __AUTHOR__ = "Ben Bass"
 
 
            'ALL_OUTPUTS', 'ALL_INPUTS', 'BB_OUTPUT', 'BB_INPUT',
            'USB_VID_LIST', 'USB_PID_LIST']
 
-from pylibftdi import _base, driver, util, bitbang
+from pylibftdi import _base, driver, device, util, bitbang
 
 # Bring them in to package scope so we can treat pylibftdi
 # as a module if we want.
 FtdiError = _base.FtdiError
 Bus = util.Bus
 Driver = driver.Driver
-Device = driver.Device
+Device = device.Device
 BitBangDevice = bitbang.BitBangDevice
 USB_VID_LIST = driver.USB_VID_LIST
 USB_PID_LIST = driver.USB_PID_LIST

File pylibftdi/bitbang.py

View file
 pylibftdi: http://bitbucket.org/codedstructure/pylibftdi
 
 """
+from pylibftdi.device import Device
 
-from pylibftdi.driver import Device, FtdiError, BITMODE_BITBANG
+from pylibftdi.driver import FtdiError, BITMODE_BITBANG
 from ctypes import c_ubyte, byref
 
 ALL_OUTPUTS = 0xFF

File pylibftdi/device.py

View file
+
+import functools
+import itertools
+import os
+
+from ctypes import byref, create_string_buffer, c_char_p
+
+from pylibftdi._base import FtdiError
+from pylibftdi.driver import Driver, USB_VID_LIST, USB_PID_LIST, FTDI_ERROR_DEVICE_NOT_FOUND, BITMODE_RESET, FLUSH_BOTH, FLUSH_INPUT, FLUSH_OUTPUT
+
+import pylibftdi.driver
+
+import sys
+
+
+class Device(object):
+    """
+    Represents a connection to a single FTDI device
+    """
+
+    def __init__(self, device_id=None, mode="b",
+                 encoding="latin1", lazy_open=False,
+                 chunk_size=0, interface_select=None,
+                 **kwargs):
+        """
+        Device([device_id[, mode, [OPTIONS ...]]) -> Device instance
+
+        represents a single FTDI device accessible via the libftdi driver.
+        Supports a basic file-like interface (open/close/read/write, context
+        manager support).
+
+        device_id - an optional serial number of the device to open.
+            if omitted, this refers to the first device found, which is
+            convenient if only one device is attached, but otherwise
+            fairly useless.
+
+        mode - either 'b' (binary) or 't' (text). This primarily affects
+            Python 3 calls to read() and write(), which will accept/return
+            unicode strings which will be encoded/decoded according to the given...
+
+        encoding - the codec name to be used for text operations.
+
+        lazy_open - if True, then the device will not be opened immediately -
+            the user must perform an explicit open() call prior to other
+            operations.
+
+        chunk_size - if non-zero, split read and write operations into chunks
+            of this size. With large or slow accesses, interruptions (i.e.
+            KeyboardInterrupt) may not happen in a timely fashion.
+
+        interface_select - select interface to use on multi-interface devices
+        """
+        self._opened = False
+        self.driver = Driver(**kwargs)
+        self.fdll = self.driver.fdll
+        # device_id is an optional serial number of the requested device.
+        self.device_id = device_id
+        # mode can be either 'b' for binary, or 't' for text.
+        # if set to text, the values returned from read() will
+        # be decoded using encoding before being returned as
+        # strings; for binary the raw bytes will be returned.
+        # This will only affect Python3.
+        self.mode = mode
+        # when giving a str to Device.write(), it is encoded.
+        # default is latin1, because it provides
+        # a one-to-one correspondence for code points 0-FF
+        self.encoding = encoding
+        # ftdi_usb_open_dev initialises the device baudrate
+        # to 9600, which certainly seems to be a de-facto
+        # standard for serial devices.
+        self._baudrate = 9600
+        # defining softspace allows us to 'print' to this device
+        self.softspace = 0
+        # chunk_size (if not 0) chunks the reads and writes
+        # to allow interruption
+        self.chunk_size = chunk_size
+        # interface can be set for devices which have multiple interface
+        # ports (e.g. FT4232, FT2232)
+        self.interface_select = interface_select
+        # lazy_open tells us not to open immediately.
+        if not lazy_open:
+            self.open()
+
+    def __del__(self):
+        "free the ftdi_context resource"
+        if self._opened:
+            self.close()
+
+    def open(self):
+        """
+        open connection to a FTDI device
+        """
+        if self._opened:
+            return
+
+        # create context for this device
+        self.ctx = create_string_buffer(1024)
+        res = self.fdll.ftdi_init(byref(self.ctx))
+        if res != 0:
+            msg = "%s (%d)" % (self.get_error_string(), res)
+            del self.ctx
+            raise FtdiError(msg)
+
+        if self.interface_select is not None:
+            res = self.fdll.ftdi_set_interface(byref(self.ctx),
+                                               self.interface_select)
+            if res != 0:
+                msg = "%s (%d)" % (self.get_error_string(), res)
+                del self.ctx
+                raise FtdiError(msg)
+
+        # Try to open the device.  If this fails, reset things to how
+        # they were, but we can't use self.close as that assumes things
+        # have already been setup.
+        # FTDI vendor/product ids required here.
+        for usb_vid, usb_pid in itertools.product(USB_VID_LIST, USB_PID_LIST):
+            open_args = [byref(self.ctx), usb_vid, usb_pid]
+            if self.device_id is None:
+                res = self.fdll.ftdi_usb_open(*tuple(open_args))
+            else:
+                # attempt to match device_id to serial number
+                open_args.extend([0, c_char_p(self.device_id.encode('latin1'))])
+                res = self.fdll.ftdi_usb_open_desc(*tuple(open_args))
+                if res != 0:
+                    # swap last two parameters and try again
+                    #  - attempt to match device_id to description
+                    open_args[-2], open_args[-1] = open_args[-1], open_args[-2]
+                    res = self.fdll.ftdi_usb_open_desc(*tuple(open_args))
+            if res != FTDI_ERROR_DEVICE_NOT_FOUND:
+                # if we succeed (0) or get a specific error, don't continue
+                # otherwise (-3) - look for another device
+                break
+
+        if res != 0:
+            msg = "%s (%d)" % (self.get_error_string(), res)
+            # free the context
+            self.fdll.ftdi_deinit(byref(self.ctx))
+            del self.ctx
+            raise FtdiError(msg)
+
+        # explicitly reset the device to serial mode with standard settings
+        # - no flow control, 9600 baud - in case it had previously been used
+        # in bitbang mode (some later driver versions might do bits of this
+        # automatically)
+        self.fdll.ftdi_set_bitmode(byref(self.ctx), 0, BITMODE_RESET)
+        self.fdll.ftdi_setflowctrl(byref(self.ctx), 0)
+        self.baudrate = 9600
+        self._opened = True
+
+    def close(self):
+        "close our connection, free resources"
+        if self._opened:
+            self.fdll.ftdi_usb_close(byref(self.ctx))
+            self.fdll.ftdi_deinit(byref(self.ctx))
+            del self.ctx
+        self._opened = False
+
+    @property
+    def baudrate(self):
+        """
+        get or set the baudrate of the FTDI device. Re-read after setting
+        to ensure baudrate was accepted by the driver.
+        """
+        return self._baudrate
+
+    @baudrate.setter
+    def baudrate(self, value):
+        result = self.fdll.ftdi_set_baudrate(byref(self.ctx), value)
+        if result == 0:
+            self._baudrate = value
+
+    def _read(self, length):
+        """
+        actually do the low level reading
+
+        returns a 'bytes' object
+        """
+        buf = create_string_buffer(length)
+        rlen = self.fdll.ftdi_read_data(byref(self.ctx), byref(buf), length)
+        if rlen < 0:
+            raise FtdiError(self.get_error_string())
+        byte_data = buf.raw[:rlen]
+
+        return byte_data
+
+    def read(self, length):
+        """
+        read(length) -> bytes/string of up to `length` bytes.
+
+        read upto `length` bytes from the FTDI device
+        :param length: maximum number of bytes to read
+        :return: value read from device
+        :rtype: bytes if self.mode is 'b', else decode with self.encoding
+        """
+        if not self._opened:
+            raise FtdiError("read() on closed Device")
+
+        # read the data
+        if self.chunk_size != 0:
+            remaining = length
+            byte_data_list = []
+            while remaining > 0:
+                rx_bytes = self._read(min(remaining, self.chunk_size))
+                if not rx_bytes:
+                    break
+                byte_data_list.append(rx_bytes)
+                remaining -= len(rx_bytes)
+            byte_data = b''.join(byte_data_list)
+        else:
+            byte_data = self._read(length)
+        if self.mode == 'b':
+            return byte_data
+        else:
+            # TODO: for some codecs, this may choke if we haven't read the
+            # full required data. If this is the case we should probably trim
+            # a byte at a time from the output until the decoding works, and
+            # buffer the remainder for next time.
+            return byte_data.decode(self.encoding)
+
+    def _write(self, byte_data):
+        """
+        actually do the low level writing
+        :param byte_data: data to be written
+        :type byte_data: bytes
+        :return: number of bytes written
+        """
+        buf = create_string_buffer(byte_data)
+        written = self.fdll.ftdi_write_data(byref(self.ctx),
+                                            byref(buf), len(byte_data))
+        if written < 0:
+            raise FtdiError(self.get_error_string())
+        return written
+
+    def write(self, data):
+        """
+        write(data) -> count of bytes actually written
+
+        write given `data` string to the FTDI device
+
+        :param data: string to be written
+        :type data: string or bytes
+        :return: count of bytes written, which may be less than `len(data)`
+        """
+        if not self._opened:
+            raise FtdiError("write() on closed Device")
+
+        try:
+            byte_data = bytes(data)
+        except TypeError:
+            # this will happen if we are Python3 and data is a str.
+            byte_data = data.encode(self.encoding)
+
+        # actually write it
+        if self.chunk_size != 0:
+            remaining = len(byte_data)
+            written = 0
+            while remaining > 0:
+                start = written
+                length = min(remaining, self.chunk_size)
+                result = self._write(byte_data[start: start + length])
+                if result == 0:
+                    # don't continue to try writing forever if nothing
+                    # is actually being written
+                    break
+                else:
+                    written += result
+                    remaining -= result
+        else:
+            written = self._write(byte_data)
+        return written
+
+    def flush(self, flush_what=FLUSH_BOTH):
+        """
+        Instruct the FTDI device to flush its FIFO buffers
+
+        By default both the input and output buffers will be
+        flushed, but the caller can selectively chose to only
+        flush the input or output buffers using `flush_what`:
+          FLUSH_BOTH - (default)
+          FLUSH_INPUT - (just the rx buffer)
+          FLUSH_OUTPUT - (just the tx buffer)
+        """
+        if flush_what == FLUSH_BOTH:
+            fn = self.fdll.ftdi_usb_purge_buffers
+        elif flush_what == FLUSH_INPUT:
+            fn = self.fdll.ftdi_usb_purge_rx_buffer
+        elif flush_what == FLUSH_OUTPUT:
+            fn = self.fdll.ftdi_usb_purge_tx_buffer
+        else:
+            raise ValueError("Invalid value passed to %s.flush()" %
+                             self.__class__.__name__)
+        res = fn(byref(self.ctx))
+        if res != 0:
+            msg = "%s (%d)" % (self.get_error_string(), res)
+            raise FtdiError(msg)
+
+    def flush_input(self):
+        """
+        flush the device input buffer
+        """
+        self.flush(FLUSH_INPUT)
+
+    def flush_output(self):
+        """
+        flush the device output buffer
+        """
+        self.flush(FLUSH_OUTPUT)
+
+    def get_error_string(self):
+        """
+        :return: error string from libftdi driver
+        """
+        return self.fdll.ftdi_get_error_string(byref(self.ctx))
+
+    @property
+    def ftdi_fn(self):
+        """
+        this allows the vast majority of libftdi functions
+        which are called with a pointer to a ftdi_context
+        struct as the first parameter to be called here
+        preventing the need to leak self.ctx into the user
+        code (and import byref from ctypes):
+
+        >>> with Device() as dev:
+        ...     # set 8 bit data, 2 stop bits, no parity
+        ...     dev.ftdi_fn.ftdi_set_line_property(8, 2, 0)
+        ...
+        """
+        # note this class is constructed on each call, so this
+        # won't be particularly quick.  It does ensure that the
+        # fdll and ctx objects in the closure are up-to-date, though.
+        class FtdiForwarder(object):
+
+            def __getattr__(innerself, key):
+                return functools.partial(getattr(self.fdll, key),
+                                         byref(self.ctx))
+        return FtdiForwarder()
+
+    def __enter__(self):
+        """
+        support for context manager.
+
+        Note the device is opened and closed automatically
+        when used in a with statement, and the device object
+        itself is returned:
+        >>> with Device(mode='t') as dev:
+        ...     dev.write('Hello World!')
+        ...
+        """
+        self.open()
+        return self
+
+    def __exit__(self, exc_type, exc_val, tb):
+        "support for context manager"
+        self.close()
+
+    #
+    # following are various properties and functions to make
+    # this emulate a file-object more closely.
+    #
+
+    @property
+    def closed(self):
+        """
+        The Python file API defines a read-only 'closed' attribute
+        """
+        return not self._opened
+
+    def readline(self, size=0):
+        """
+        readline() for file-like compatibility.
+
+        :param size: maximum amount of data to read looking for a line
+        :return: a line of text, or size bytes if no line-ending found
+
+        This only works for mode='t' on Python3
+        """
+        lsl = len(os.linesep)
+        line_buffer = []
+        while True:
+            next_char = self.read(1)
+            if next_char == '' or (size > 0 and len(line_buffer) > size):
+                break
+            line_buffer.append(next_char)
+            if (len(line_buffer) >= lsl and
+                    line_buffer[-lsl:] == list(os.linesep)):
+                break
+        return ''.join(line_buffer)
+
+    def readlines(self, sizehint=None):
+        """
+        readlines() for file-like compatibility.
+        """
+        lines = []
+        if sizehint is not None:
+            string_blob = self.read(sizehint)
+            lines.extend(string_blob.split(os.linesep))
+
+        while True:
+            line = self.readline()
+            if not line:
+                break
+            lines.append(line)
+        return lines
+
+    def writelines(self, lines):
+        """
+        writelines for file-like compatibility.
+        """
+        for line in lines:
+            self.write(line)
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        while True:
+            line = self.readline()
+            if line:
+                return line
+            else:
+                raise StopIteration
+    next = __next__

File pylibftdi/driver.py

View file
 
 """
 
-import os
 import itertools
-import functools
 
 # be disciplined so pyflakes can check us...
 from ctypes import (CDLL, byref, c_int, c_char_p, c_void_p, cast,
         return devices
 
 
-class Device(object):
-    """
-    Represents a connection to a single FTDI device
-    """
-
-    def __init__(self, device_id=None, mode="b",
-                 encoding="latin1", lazy_open=False,
-                 chunk_size=0, interface_select=None,
-                 **kwargs):
-        """
-        Device([device_id[, mode, [OPTIONS ...]]) -> Device instance
-
-        represents a single FTDI device accessible via the libftdi driver.
-        Supports a basic file-like interface (open/close/read/write, context
-        manager support).
-
-        device_id - an optional serial number of the device to open.
-            if omitted, this refers to the first device found, which is
-            convenient if only one device is attached, but otherwise
-            fairly useless.
-
-        mode - either 'b' (binary) or 't' (text). This primarily affects
-            Python 3 calls to read() and write(), which will accept/return
-            unicode strings which will be encoded/decoded according to the given...
-
-        encoding - the codec name to be used for text operations.
-
-        lazy_open - if True, then the device will not be opened immediately -
-            the user must perform an explicit open() call prior to other
-            operations.
-
-        chunk_size - if non-zero, split read and write operations into chunks
-            of this size. With large or slow accesses, interruptions (i.e.
-            KeyboardInterrupt) may not happen in a timely fashion.
-
-        interface_select - select interface to use on multi-interface devices
-        """
-        self._opened = False
-        self.driver = Driver(**kwargs)
-        self.fdll = self.driver.fdll
-        # device_id is an optional serial number of the requested device.
-        self.device_id = device_id
-        # mode can be either 'b' for binary, or 't' for text.
-        # if set to text, the values returned from read() will
-        # be decoded using encoding before being returned as
-        # strings; for binary the raw bytes will be returned.
-        # This will only affect Python3.
-        self.mode = mode
-        # when giving a str to Device.write(), it is encoded.
-        # default is latin1, because it provides
-        # a one-to-one correspondence for code points 0-FF
-        self.encoding = encoding
-        # ftdi_usb_open_dev initialises the device baudrate
-        # to 9600, which certainly seems to be a de-facto
-        # standard for serial devices.
-        self._baudrate = 9600
-        # defining softspace allows us to 'print' to this device
-        self.softspace = 0
-        # chunk_size (if not 0) chunks the reads and writes
-        # to allow interruption
-        self.chunk_size = chunk_size
-        # interface can be set for devices which have multiple interface
-        # ports (e.g. FT4232, FT2232)
-        self.interface_select = interface_select
-        # lazy_open tells us not to open immediately.
-        if not lazy_open:
-            self.open()
-
-    def __del__(self):
-        "free the ftdi_context resource"
-        if self._opened:
-            self.close()
-
-    def open(self):
-        """
-        open connection to a FTDI device
-        """
-        if self._opened:
-            return
-
-        # create context for this device
-        self.ctx = create_string_buffer(1024)
-        res = self.fdll.ftdi_init(byref(self.ctx))
-        if res != 0:
-            msg = "%s (%d)" % (self.get_error_string(), res)
-            del self.ctx
-            raise FtdiError(msg)
-
-        if self.interface_select is not None:
-            res = self.fdll.ftdi_set_interface(byref(self.ctx),
-                                               self.interface_select)
-            if res != 0:
-                msg = "%s (%d)" % (self.get_error_string(), res)
-                del self.ctx
-                raise FtdiError(msg)
-
-        # Try to open the device.  If this fails, reset things to how
-        # they were, but we can't use self.close as that assumes things
-        # have already been setup.
-        # FTDI vendor/product ids required here.
-        for usb_vid, usb_pid in itertools.product(USB_VID_LIST, USB_PID_LIST):
-            open_args = [byref(self.ctx), usb_vid, usb_pid]
-            if self.device_id is None:
-                res = self.fdll.ftdi_usb_open(*tuple(open_args))
-            else:
-                # attempt to match device_id to serial number
-                open_args.extend([0, c_char_p(self.device_id.encode('latin1'))])
-                res = self.fdll.ftdi_usb_open_desc(*tuple(open_args))
-                if res != 0:
-                    # swap last two parameters and try again
-                    #  - attempt to match device_id to description
-                    open_args[-2], open_args[-1] = open_args[-1], open_args[-2]
-                    res = self.fdll.ftdi_usb_open_desc(*tuple(open_args))
-            if res != FTDI_ERROR_DEVICE_NOT_FOUND:
-                # if we succeed (0) or get a specific error, don't continue
-                # otherwise (-3) - look for another device
-                break
-
-        if res != 0:
-            msg = "%s (%d)" % (self.get_error_string(), res)
-            # free the context
-            self.fdll.ftdi_deinit(byref(self.ctx))
-            del self.ctx
-            raise FtdiError(msg)
-
-        # explicitly reset the device to serial mode in case
-        # it had previously been used in bitbang mode
-        # (some later driver versions might do this automatically)
-        self.fdll.ftdi_set_bitmode(byref(self.ctx), 0, BITMODE_RESET)
-        self._opened = True
-
-    def close(self):
-        "close our connection, free resources"
-        if self._opened:
-            self.fdll.ftdi_usb_close(byref(self.ctx))
-            self.fdll.ftdi_deinit(byref(self.ctx))
-            del self.ctx
-        self._opened = False
-
-    @property
-    def baudrate(self):
-        """
-        get or set the baudrate of the FTDI device. Re-read after setting
-        to ensure baudrate was accepted by the driver.
-        """
-        return self._baudrate
-
-    @baudrate.setter
-    def baudrate(self, value):
-        result = self.fdll.ftdi_set_baudrate(self.ctx, value)
-        if result == 0:
-            self._baudrate = value
-
-    def _read(self, length):
-        """
-        actually do the low level reading
-
-        returns a 'bytes' object
-        """
-        buf = create_string_buffer(length)
-        rlen = self.fdll.ftdi_read_data(byref(self.ctx), byref(buf), length)
-        if rlen < 0:
-            raise FtdiError(self.get_error_string())
-        byte_data = buf.raw[:rlen]
-
-        return byte_data
-
-    def read(self, length):
-        """
-        read(length) -> bytes/string of up to `length` bytes.
-
-        read upto `length` bytes from the FTDI device
-        :param length: maximum number of bytes to read
-        :return: value read from device
-        :rtype: bytes if self.mode is 'b', else decode with self.encoding
-        """
-        if not self._opened:
-            raise FtdiError("read() on closed Device")
-
-        # read the data
-        if self.chunk_size != 0:
-            remaining = length
-            byte_data_list = []
-            while remaining > 0:
-                rx_bytes = self._read(min(remaining, self.chunk_size))
-                if not rx_bytes:
-                    break
-                byte_data_list.append(rx_bytes)
-                remaining -= len(rx_bytes)
-            byte_data = b''.join(byte_data_list)
-        else:
-            byte_data = self._read(length)
-        if self.mode == 'b':
-            return byte_data
-        else:
-            # TODO: for some codecs, this may choke if we haven't read the
-            # full required data. If this is the case we should probably trim
-            # a byte at a time from the output until the decoding works, and
-            # buffer the remainder for next time.
-            return byte_data.decode(self.encoding)
-
-    def _write(self, byte_data):
-        """
-        actually do the low level writing
-        :param byte_data: data to be written
-        :type byte_data: bytes
-        :return: number of bytes written
-        """
-        buf = create_string_buffer(byte_data)
-        written = self.fdll.ftdi_write_data(byref(self.ctx),
-                                            byref(buf), len(byte_data))
-        if written < 0:
-            raise FtdiError(self.get_error_string())
-        return written
-
-    def write(self, data):
-        """
-        write(data) -> count of bytes actually written
-
-        write given `data` string to the FTDI device
-
-        :param data: string to be written
-        :type data: string or bytes
-        :return: count of bytes written, which may be less than `len(data)`
-        """
-        if not self._opened:
-            raise FtdiError("write() on closed Device")
-
-        try:
-            byte_data = bytes(data)
-        except TypeError:
-            # this will happen if we are Python3 and data is a str.
-            byte_data = data.encode(self.encoding)
-
-        # actually write it
-        if self.chunk_size != 0:
-            remaining = len(byte_data)
-            written = 0
-            while remaining > 0:
-                start = written
-                length = min(remaining, self.chunk_size)
-                result = self._write(byte_data[start: start + length])
-                if result == 0:
-                    # don't continue to try writing forever if nothing
-                    # is actually being written
-                    break
-                else:
-                    written += result
-                    remaining -= result
-        else:
-            written = self._write(byte_data)
-        return written
-
-    def flush(self, flush_what=FLUSH_BOTH):
-        """
-        Instruct the FTDI device to flush its FIFO buffers
-
-        By default both the input and output buffers will be
-        flushed, but the caller can selectively chose to only
-        flush the input or output buffers using `flush_what`:
-          FLUSH_BOTH - (default)
-          FLUSH_INPUT - (just the rx buffer)
-          FLUSH_OUTPUT - (just the tx buffer)
-        """
-        if flush_what == FLUSH_BOTH:
-            fn = self.fdll.ftdi_usb_purge_buffers
-        elif flush_what == FLUSH_INPUT:
-            fn = self.fdll.ftdi_usb_purge_rx_buffer
-        elif flush_what == FLUSH_OUTPUT:
-            fn = self.fdll.ftdi_usb_purge_tx_buffer
-        else:
-            raise ValueError("Invalid value passed to %s.flush()" %
-                             self.__class__.__name__)
-        res = fn(byref(self.ctx))
-        if res != 0:
-            msg = "%s (%d)" % (self.get_error_string(), res)
-            raise FtdiError(msg)
-
-    def flush_input(self):
-        """
-        flush the device input buffer
-        """
-        self.flush(FLUSH_INPUT)
-
-    def flush_output(self):
-        """
-        flush the device output buffer
-        """
-        self.flush(FLUSH_OUTPUT)
-
-    def get_error_string(self):
-        """
-        :return: error string from libftdi driver
-        """
-        return self.fdll.ftdi_get_error_string(byref(self.ctx))
-
-    @property
-    def ftdi_fn(self):
-        """
-        this allows the vast majority of libftdi functions
-        which are called with a pointer to a ftdi_context
-        struct as the first parameter to be called here
-        preventing the need to leak self.ctx into the user
-        code (and import byref from ctypes):
-
-        >>> with Device() as dev:
-        ...     # set 8 bit data, 2 stop bits, no parity
-        ...     dev.ftdi_fn.ftdi_set_line_property(8, 2, 0)
-        ...
-        """
-        # note this class is constructed on each call, so this
-        # won't be particularly quick.  It does ensure that the
-        # fdll and ctx objects in the closure are up-to-date, though.
-        class FtdiForwarder(object):
-
-            def __getattr__(innerself, key):
-                return functools.partial(getattr(self.fdll, key),
-                                         byref(self.ctx))
-        return FtdiForwarder()
-
-    def __enter__(self):
-        """
-        support for context manager.
-
-        Note the device is opened and closed automatically
-        when used in a with statement, and the device object
-        itself is returned:
-        >>> with Device(mode='t') as dev:
-        ...     dev.write('Hello World!')
-        ...
-        """
-        self.open()
-        return self
-
-    def __exit__(self, exc_type, exc_val, tb):
-        "support for context manager"
-        self.close()
-
-    #
-    # following are various properties and functions to make
-    # this emulate a file-object more closely.
-    #
-
-    @property
-    def closed(self):
-        """
-        The Python file API defines a read-only 'closed' attribute
-        """
-        return not self._opened
-
-    def readline(self, size=0):
-        """
-        readline() for file-like compatibility.
-
-        :param size: maximum amount of data to read looking for a line
-        :return: a line of text, or size bytes if no line-ending found
-
-        This only works for mode='t' on Python3
-        """
-        lsl = len(os.linesep)
-        line_buffer = []
-        while True:
-            next_char = self.read(1)
-            if next_char == '' or (size > 0 and len(line_buffer) > size):
-                break
-            line_buffer.append(next_char)
-            if (len(line_buffer) >= lsl and
-                    line_buffer[-lsl:] == list(os.linesep)):
-                break
-        return ''.join(line_buffer)
-
-    def readlines(self, sizehint=None):
-        """
-        readlines() for file-like compatibility.
-        """
-        lines = []
-        if sizehint is not None:
-            string_blob = self.read(sizehint)
-            lines.extend(string_blob.split(os.linesep))
-
-        while True:
-            line = self.readline()
-            if not line:
-                break
-            lines.append(line)
-        return lines
-
-    def writelines(self, lines):
-        """
-        writelines for file-like compatibility.
-        """
-        for line in lines:
-            self.write(line)
-
-    def __iter__(self):
-        return self
-
-    def __next__(self):
-        while True:
-            line = self.readline()
-            if line:
-                return line
-            else:
-                raise StopIteration
-    next = __next__

File pylibftdi/examples/serial_loopback.py

View file
 from pylibftdi import Device
 
 
+# TODO:
+# make this threaded - a writer stream, which sends known data with
+# occasional sync blocks, and a reader which reads and waits for a sync-block
+# then signals the writer if it gets stuck.
+
 def test_string(length):
     return os.urandom(length)
 
         time.sleep(0.1)
         for l in lengths:
             test_str = test_string(l)
-            self.device.write(test_str)
+            if self.device.write(test_str) != len(test_str):
+                sys.stdout.write('*')
             time.sleep(0.1)
             result = ''
             for _ in range(3):
             yield result == test_str
 
     def main(self):
-        for bd in [9600, 115200, 1152000]:
+        for bd in [9600, 31250, 115200, 1152000]:
             self.device.baudrate = bd
 
-            for result in self.test_loopback(range(1, 50) +
+            for result in self.test_loopback(range(1, 50) + range(100, 500, 100) +
                                              range(1000, 5000, 1000)):
                 if result:
                     sys.stdout.write('+')

File setup.py

View file
 
 setup(
     name="pylibftdi",
-    version="0.13",
+    version="0.14pre",
     description="Pythonic interface to FTDI devices using libftdi",
     long_description=open('README.rst').read(),
     author="Ben Bass",

File tests/call_log.py

View file
+__author__ = 'ben'
+
+
+class CallLog(object):
+
+    fn_log = []
+
+    @classmethod
+    def reset(cls):
+        del cls.fn_log[:]
+
+    @classmethod
+    def append(cls, value):
+        cls.fn_log.append(value)
+
+    @classmethod
+    def get(cls):
+        return cls.fn_log[:]

File tests/test_bitbang.py

View file
                 pass
         self.assertCallsExact(_,
                 ['ftdi_init', 'ftdi_usb_open',
-                 'ftdi_set_bitmode', 'ftdi_set_bitmode',
+                 'ftdi_set_bitmode', 'ftdi_setflowctrl',
+                 'ftdi_set_baudrate', 'ftdi_set_bitmode',
                  'ftdi_usb_close', 'ftdi_deinit'])
 
     def testOpen(self):

File tests/test_bus.py

View file
         assert test_bus.a == 2
         assert test_bus.b == 0
         assert test_bus.c == 21
+
+
+if __name__ == "__main__":
+    unittest.main()

File tests/test_common.py

View file
 import logging
 
 import sys
+from tests.call_log import CallLog
+
 if sys.version_info < (2, 7):
     try:
         import unittest2 as unittest
 else:
     import unittest  # NOQA
 
-
 class SimpleMock(object):
     """
     This is a simple mock plugin for fdll which logs any calls
         return 0
 
 
-class CallLog(object):
-
-    fn_log = []
-
-    @classmethod
-    def reset(cls):
-        del cls.fn_log[:]
-
-    @classmethod
-    def append(cls, value):
-        cls.fn_log.append(value)
-
-    @classmethod
-    def get(cls):
-        return cls.fn_log[:]
-
-
 class CallCheckMixin(object):
     """
     this should be used as a mixin for unittest.TestCase classes,
 
 
 import pylibftdi.driver
-from pylibftdi.driver import Device
+# importing this _does_ things...
+pylibftdi.driver.Driver = MockDriver
+pylibftdi.device.Driver = MockDriver
 
+from pylibftdi.device import Device
 
 class LoopDevice(Device):
     """
         self.__buffer.extend(bytearray(data))
         return len(data)
 
-# importing this _does_ things...
-pylibftdi.driver.Driver = MockDriver
-
 verbose = set(['-v', '--verbose']) & set(sys.argv)
 logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)

File tests/test_device.py

View file
+"""
+pylibftdi - python wrapper for libftdi
+
+Copyright (c) 2010-2013 Ben Bass <benbass@codedstructure.net>
+See LICENSE file for details and (absence of) warranty
+
+pylibftdi: http://bitbucket.org/codedstructure/pylibftdi
+
+This module contains some basic tests for the higher-level
+functionality without requiring an actual hardware device
+to be attached.
+"""
+
+from tests.test_common import (LoopDevice, CallCheckMixin, unittest)
+from pylibftdi.device import Device
+from pylibftdi import FtdiError
+
+# and now some test cases...
+
+
+class DeviceFunctions(CallCheckMixin, unittest.TestCase):
+
+    def testContextManager(self):
+        def _():
+            with Device():
+                pass
+        self.assertCallsExact(_,
+                ['ftdi_init', 'ftdi_usb_open', 'ftdi_set_bitmode',
+                 'ftdi_setflowctrl', 'ftdi_set_baudrate',
+                 'ftdi_usb_close', 'ftdi_deinit'])
+
+    def testOpen(self):
+        # a lazy_open open() shouldn't do anything
+        self.assertCallsExact(lambda: Device(lazy_open=True), [])
+        # a non-lazy_open open() should open the port...
+        self.assertCalls(lambda: Device(), 'ftdi_usb_open')
+        # and given a device_id, it should do a open_desc
+        self.assertCalls(lambda: Device('bogus'), 'ftdi_usb_open_desc')
+        # check that opening a specific interface does that
+
+    def testOpenInterface(self):
+        self.assertCalls(lambda: Device(interface_select=1),
+                         'ftdi_set_interface')
+        # check that opening a specific interface does that
+        self.assertNotCalls(lambda: Device(), 'ftdi_set_interface')
+
+    def testReadWrite(self):
+        with Device() as dev:
+            self.assertCalls(lambda: dev.write('xxx'), 'ftdi_write_data')
+            self.assertCalls(lambda: dev.read(10), 'ftdi_read_data')
+
+    def testFlush(self):
+        with Device() as dev:
+            self.assertCalls(dev.flush_input, 'ftdi_usb_purge_rx_buffer')
+            self.assertCalls(dev.flush_output, 'ftdi_usb_purge_tx_buffer')
+            self.assertCalls(dev.flush, 'ftdi_usb_purge_buffers')
+
+    def testClose(self):
+        d = Device()
+        d.close()
+        self.assertRaises(FtdiError, d.write, 'hello')
+        d = Device()
+        d.close()
+        self.assertRaises(FtdiError, d.read, 1)
+
+
+class LoopbackTest(unittest.TestCase):
+    """
+    these all require mode='t' to pass in Python3
+    """
+
+    def testPrint(self):
+        d = LoopDevice(mode='t')
+        d.write('Hello')
+        d.write(' World\n')
+        d.write('Bye')
+        self.assertEqual(d.readline(), 'Hello World\n')
+        self.assertEqual(d.readline(), 'Bye')
+
+    def testLines(self):
+        d = LoopDevice(mode='t')
+        lines = ['Hello\n', 'World\n', 'And\n', 'Goodbye\n']
+        d.writelines(lines)
+        self.assertEqual(d.readlines(), lines)
+
+    def testIterate(self):
+        d = LoopDevice(mode='t')
+        lines = ['Hello\n', 'World\n', 'And\n', 'Goodbye\n']
+        d.writelines(lines)
+        for idx, line in enumerate(d):
+            self.assertEqual(line, lines[idx])
+
+    def testBuffer(self):
+        d = LoopDevice(mode='t', chunk_size=3)
+        d.write('Hello')
+        d.write(' World\n')
+        d.write('Bye')
+        self.assertEqual(d.readline(), 'Hello World\n')
+        self.assertEqual(d.readline(), 'Bye')
+
+
+if __name__ == "__main__":
+    unittest.main()

File tests/test_driver.py

-"""
-pylibftdi - python wrapper for libftdi
-
-Copyright (c) 2010-2013 Ben Bass <benbass@codedstructure.net>
-See LICENSE file for details and (absence of) warranty
-
-pylibftdi: http://bitbucket.org/codedstructure/pylibftdi
-
-This module contains some basic tests for the higher-level
-functionality without requiring an actual hardware device
-to be attached.
-"""
-
-from tests.test_common import (LoopDevice, Device, CallCheckMixin, unittest)
-from pylibftdi import FtdiError
-
-# and now some test cases...
-
-
-class DeviceFunctions(CallCheckMixin, unittest.TestCase):
-
-    def testContextManager(self):
-        def _():
-            with Device():
-                pass
-        self.assertCallsExact(_,
-                ['ftdi_init', 'ftdi_usb_open', 'ftdi_set_bitmode',
-                 'ftdi_usb_close', 'ftdi_deinit'])
-
-    def testOpen(self):
-        # a lazy_open open() shouldn't do anything
-        self.assertCallsExact(lambda: Device(lazy_open=True), [])
-        # a non-lazy_open open() should open the port...
-        self.assertCalls(lambda: Device(), 'ftdi_usb_open')
-        # and given a device_id, it should do a open_desc
-        self.assertCalls(lambda: Device('bogus'), 'ftdi_usb_open_desc')
-        # check that opening a specific interface does that
-
-    def testOpenInterface(self):
-        self.assertCalls(lambda: Device(interface_select=1),
-                         'ftdi_set_interface')
-        # check that opening a specific interface does that
-        self.assertNotCalls(lambda: Device(), 'ftdi_set_interface')
-
-    def testReadWrite(self):
-        with Device() as dev:
-            self.assertCalls(lambda: dev.write('xxx'), 'ftdi_write_data')
-            self.assertCalls(lambda: dev.read(10), 'ftdi_read_data')
-
-    def testFlush(self):
-        with Device() as dev:
-            self.assertCalls(dev.flush_input, 'ftdi_usb_purge_rx_buffer')
-            self.assertCalls(dev.flush_output, 'ftdi_usb_purge_tx_buffer')
-            self.assertCalls(dev.flush, 'ftdi_usb_purge_buffers')
-
-    def testClose(self):
-        d = Device()
-        d.close()
-        self.assertRaises(FtdiError, d.write, 'hello')
-        d = Device()
-        d.close()
-        self.assertRaises(FtdiError, d.read, 1)
-
-
-class LoopbackTest(unittest.TestCase):
-    """
-    these all require mode='t' to pass in Python3
-    """
-
-    def testPrint(self):
-        d = LoopDevice(mode='t')
-        d.write('Hello')
-        d.write(' World\n')
-        d.write('Bye')
-        self.assertEqual(d.readline(), 'Hello World\n')
-        self.assertEqual(d.readline(), 'Bye')
-
-    def testLines(self):
-        d = LoopDevice(mode='t')
-        lines = ['Hello\n', 'World\n', 'And\n', 'Goodbye\n']
-        d.writelines(lines)
-        self.assertEqual(d.readlines(), lines)
-
-    def testIterate(self):
-        d = LoopDevice(mode='t')
-        lines = ['Hello\n', 'World\n', 'And\n', 'Goodbye\n']
-        d.writelines(lines)
-        for idx, line in enumerate(d):
-            self.assertEqual(line, lines[idx])
-
-    def testBuffer(self):
-        d = LoopDevice(mode='t', chunk_size=3)
-        d.write('Hello')
-        d.write(' World\n')
-        d.write('Bye')
-        self.assertEqual(d.readline(), 'Hello World\n')
-        self.assertEqual(d.readline(), 'Bye')
-
-
-if __name__ == "__main__":
-    unittest.main()