Commits

Ben Bass  committed 68d432f

prepare for 0.8 release; add tests; line-oriented access

  • Participants
  • Parent commits de3cd8f
  • Tags release 0.8

Comments (0)

Files changed (5)

+syntax: glob
+*.pyc
+*.pyo
+MANIFEST
 pylibftdi changes
 =================
 
+0.8
+ * added some unit tests
+ * API changes:
+   - when opening a device with a device_id parameter, this will now check
+     against both serial number and (if that fails), the device description.
+     Opening by device type (under the same proviso that an arbitrary device
+     will be selected if multiple matching devices are attached as when no
+     device_id is given) is frequently easier than matching by serial number.
+   - added flush(), flush_input() and flush_output() operations. This is
+     modelled after the pySerial API providing separate flush operations, and
+     gets the Device API closer to that of files.
+   - increased file-API compatibility, with line-oriented methods and iteration
 0.7
  * support multiple attached devices
  * API changes:
  - Supports Python 2 and Python 3
  - Supports parallel and serial devices
  - Support for multiple devices
+ - File-like interface wherever appropriate
  - Cross-platform
 
 :Limitations:
 
 Multiple devices are supported by passing the desired device serial number (as
 a string) in the ``device_id`` parameter - this is the first parameter in both
-Device() and BitBangDevice() constructors.
+Device() and BitBangDevice() constructors. Alternatively the device 'description'
+can be given, and an attempt will be made to match this if matching by serial
+number fails.
 
 Examples
 ~~~~~~~~

File pylibftdi/driver.py

 
 """
 
+import os
 import functools
 import warnings
 
     """
     def __init__(self, device_id=None, mode="b",
                  encoding="latin1", lazy_open=False):
+        self._opened = False
         self.driver = Driver()
         self.fdll = self.driver.fdll
-        self.opened = False
         # device_id is an optional serial number of the requested device.
         self.device_id = device_id
         # mode can be either 'b' for binary, or 't' for text.
         # to 9600, which certainly seems to be a de-facto
         # standard for serial devices.
         self._baudrate = 9600
+        # defining softspace allows us to 'print' to this device
+        self.softspace = 0
         # lazy_open tells us not to open immediately.
         if not lazy_open:
             self.open()
 
     def __del__(self):
         "tell driver to free the ftdi_context resource"
-        if self.opened:
+        if self._opened:
             self.close()
 
     def open(self):
         """
         open connection to a FTDI device
         """
-        if self.opened:
+        if self._opened:
             return
 
         # create context for this device
         self.ctx = create_string_buffer(1024)
-        if self.fdll.ftdi_init(byref(self.ctx)) != 0:
-            msg = self.get_error_string()
+        res = self.fdll.ftdi_init(byref(self.ctx))
+        if res != 0:
+            msg = "%s (%d)" % (self.get_error_string(), res)
             del self.ctx
             raise FtdiError(msg)
 
             if res != 0:
                 # swap last two parameters and try again
                 #  - attempt to match device_id to description
-                open_args[-2:] = open_args[-1:-3:-1]
+                open_args[-2], open_args[-1] = open_args[-1], open_args[-2]
                 res = self.fdll.ftdi_usb_open_desc(*tuple(open_args))
 
         if res != 0:
-            msg = self.get_error_string()
+            msg = "%s (%d)" % (self.get_error_string(), res)
             # free the context
             self.fdll.ftdi_deinit(byref(self.ctx))
             del self.ctx
             raise FtdiError(msg)
 
-        self.opened = True
+        self._opened = True
 
     def close(self):
         "close our connection, free resources"
-        if self.opened:
+        if self._opened:
             self.fdll.ftdi_usb_close(byref(self.ctx))
             self.fdll.ftdi_deinit(byref(self.ctx))
             del self.ctx
-        self.opened = False
+        self._opened = False
 
     @property
     def baudrate(self):
         return type depends on self.mode - if 'b' return
         raw bytes, else decode according to self.encoding
         """
-        if not self.opened:
+        if not self._opened:
             raise FtdiError("read() on closed Device")
 
         buf = create_string_buffer(length)
         write given `data` string to the FTDI device
         returns count of bytes written, which may be less than `len(data)`
         """
-        if not self.opened:
+        if not self._opened:
             raise FtdiError("read() on closed Device")
 
         try:
                     self.__class__.__name__)
         res = fn(byref(self.ctx))
         if res != 0:
-            raise FtdiError(self.get_error_string())
+            msg = "%s (%d)" % (self.get_error_string(), res)
+            raise FtdiError(msg)
 
     def flush_input(self):
         """
         "support for context manager"
         self.close()
 
+    #
+    # following are various properties and functions to make
+    # this emulate a file-object more closely.
+    #
+
+    @property
+    def closed(self):
+        """
+        The Python file API defines a read-only 'closed' attribute
+        """
+        return not self._opened
+
+    def readline(self, size=0):
+        """
+        readline() for file-like compatibility.
+        """
+        lsl = len(os.linesep)
+        line_buffer = []
+        while True:
+            next_char = self.read(1)
+            if next_char == '' or (size > 0 and len(line_buffer) > size):
+                break
+            line_buffer.append(next_char)
+            if (len(line_buffer) >= lsl and
+                line_buffer[-lsl:] == list(os.linesep)):
+                break
+        return ''.join(line_buffer)
+
+    def readlines(self, sizehint=None):
+        """
+        readlines() for file-like compatibility.
+        """
+        lines = []
+        if sizehint is not None:
+            string_blob = self.read(sizehint)
+            lines.extend(string_blob.split(os.linesep))
+
+        while True:
+            line = self.readline()
+            if not line:
+                break
+            lines.append(line)
+        return lines
+
+    def writelines(self, lines):
+        """
+        writelines for file-like compatibility.
+        """
+        for line in lines:
+            self.write(line)
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        while True:
+            line = self.readline()
+            if line:
+                return line
+            else:
+                raise StopIteration
+    next = __next__
+

File pylibftdi/test_driver.py

+"""
+pylibftdi - python wrapper for libftdi
 
+Copyright (c) 2010-2011 Ben Bass <benbass@codedstructure.net>
+See LICENSE file for details and (absence of) warranty
 
-import pylibftdi.driver
+pylibftdi: http://bitbucket.org/codedstructure/pylibftdi
+
+This module contains some basic tests for the higher-level
+functionality without requiring an actual hardware device
+to be attached.
+"""
+
+import sys
+if sys.version_info < (2,7):
+    try:
+        import unittest2 as unittest
+    except ImportError:
+        raise SystemExit("The test functionality is only supported in"
+                "Python 2.7+ unless unittest2 is installed")
+else:
+    import unittest
+
+VERBOSE = False
 
 fn_log = []
 class SimpleMock(object):
+    """
+    This is a simple mock plugin for fdll which logs any calls
+    made through it to fn_log, which is currently rather ugly
+    global state.
+    """
     def __init__(self, name="<base>"):
         self.__name = name
 
         return self.__dict__.get(key, SimpleMock(key))
 
     def __setattr__(self, key, value):
-        return self.__dict__.setdefault(key, value)
+        return self.__dict__.__setitem__(key, value)
 
     def __call__(self, *o, **k):
         fn_log.append(self.__name)
-        print("%s(*%s, **%s)" % (self.__name, o, k))
+        if VERBOSE:
+            print("%s(*%s, **%s)" % (self.__name, o, k))
         return 0
 
-class Driver(object):
+def get_calls(fn):
+    "return the called function names which the fdll mock object made"
+    del fn_log[:]
+    fn()
+    return fn_log
+
+
+
+# monkey patch the Driver class to be the mock thing above.
+class MockDriver(object):
     def __init__(self, *o, **k):
         self.fdll = SimpleMock()
 
-def assertCalls(fn, methodname):
-    del fn_log[:]
-    fn()
-    assert methodname in fn_log
 
-pylibftdi.driver.Driver = Driver
+import pylibftdi.driver
+from pylibftdi.driver import Device
 
-from pylibftdi import Device
+class LoopDevice(Device):
+    """
+    a mock device object which overrides read and write
+    to operate as an unbounded loopback pair
+    """
+    def __init__(self, *o, **k):
+        Device.__init__(self, *o, **k)
+        self.__buffer = []
 
-with Device() as dev:
-    assertCalls(lambda : dev.write('xxx'), 'ftdi_write_data')
-    assertCalls(lambda : dev.read(10), 'ftdi_read_data')
-    assertCalls(dev.flush_input, 'ftdi_usb_purge_rx_buffer')
-    assertCalls(dev.flush_output, 'ftdi_usb_purge_tx_buffer')
-    assertCalls(dev.flush, 'ftdi_usb_purge_buffers')
+    def read(self, size=None):
+        Device.read(self, size)
+        if size is None:
+            result = ''.join(self.__buffer)
+            self.__buffer = []
+        else:
+            result = ''.join(self.__buffer[:size])
+            self.__buffer = self.__buffer[size:]
+        return result
 
+    def write(self, data):
+        Device.write(self, data)
+        self.__buffer.extend(list(data))
+        return len(data)
 
+
+# and now some test cases...
+
+class DeviceFunctions(unittest.TestCase):
+
+    def setUp(self):
+        pylibftdi.driver.Driver = MockDriver
+
+    def assertCalls(self, fn, methodname):
+        del fn_log[:]
+        fn()
+        self.assertIn(methodname, fn_log)
+
+    def testContextManager(self):
+        def _():
+            with Device() as dev:
+                pass
+        self.assertEqual(get_calls(_),
+                ['ftdi_init', 'ftdi_usb_open',
+                 'ftdi_usb_close', 'ftdi_deinit'])
+
+    def testOpen(self):
+        # a lazy_open open() shouldn't do anything
+        self.assertEqual(get_calls(lambda: Device(lazy_open=True)), [])
+        # a non-lazy_open open() should open the port...
+        self.assertCalls(lambda: Device(), 'ftdi_usb_open')
+        # and given a device_id, it should do a open_desc
+        self.assertCalls(lambda: Device('bogus'), 'ftdi_usb_open_desc')
+
+    def testReadWrite(self):
+        with Device() as dev:
+            self.assertCalls(lambda : dev.write('xxx'), 'ftdi_write_data')
+            self.assertCalls(lambda : dev.read(10), 'ftdi_read_data')
+
+    def testFlush(self):
+        with Device() as dev:
+            self.assertCalls(dev.flush_input, 'ftdi_usb_purge_rx_buffer')
+            self.assertCalls(dev.flush_output, 'ftdi_usb_purge_tx_buffer')
+            self.assertCalls(dev.flush, 'ftdi_usb_purge_buffers')
+
+class LoopbackTest(unittest.TestCase):
+
+    def setUp(self):
+        pylibftdi.driver.Driver = MockDriver
+
+    def testPrint(self):
+        d = LoopDevice()
+        d.write('Hello')
+        d.write(' World\n')
+        d.write('Bye')
+        self.assertEqual(d.readline(), 'Hello World\n')
+        self.assertEqual(d.readline(), 'Bye')
+
+    def testLines(self):
+        d = LoopDevice()
+        lines = ['Hello\n', 'World\n', 'And\n', 'Goodbye\n']
+        d.writelines(lines)
+        self.assertEqual(d.readlines(), lines)
+
+    def testIterate(self):
+        d = LoopDevice()
+        lines = ['Hello\n', 'World\n', 'And\n', 'Goodbye\n']
+        d.writelines(lines)
+        for idx,line in enumerate(d):
+            self.assertEqual(line, lines[idx])
+
+
+if __name__ == "__main__":
+    if set(['-v', '--verbose']) & set(sys.argv):
+        VERBOSE = True
+    unittest.main()