Commits

Ben Bass committed f95d570

add buffering to allow system interruption during long read/writes

Comments (0)

Files changed (3)

pylibftdi/driver.py

       user must perform an explicit open() call prior to other operations.
     """
     def __init__(self, device_id=None, mode="b",
-                 encoding="latin1", lazy_open=False):
+                 encoding="latin1", lazy_open=False,
+                 buffer_size=0):
         self._opened = False
         self.driver = Driver()
         self.fdll = self.driver.fdll
         self._baudrate = 9600
         # defining softspace allows us to 'print' to this device
         self.softspace = 0
+        # buffer_size (if not 0) chunks the reads and writes
+        # to allow interruption
+        self.buffer_size = buffer_size
         # lazy_open tells us not to open immediately.
         if not lazy_open:
             self.open()
             if self.device_id is None:
                 res = self.fdll.ftdi_usb_open(*tuple(open_args))
             else:
-                res = self._open_by_desc(open_args)
                 # attempt to match device_id to serial number
                 open_args.extend([0, c_char_p(self.device_id.encode('latin1'))])
                 res = self.fdll.ftdi_usb_open_desc(*tuple(open_args))
             raise FtdiError("read() on closed Device")
 
         # read the data
-        byte_data = self._read(length)
+        if self.buffer_size != 0:
+            remaining = length
+            byte_data_list = []
+            while remaining > 0:
+                rx_bytes = self._read(min(remaining, self.buffer_size))
+                if not rx_bytes:
+                    break
+                byte_data_list.append(rx_bytes)
+                remaining -= len(rx_bytes)
+            byte_data = b''.join(byte_data_list)
+        else:
+            byte_data = self._read(length)
         if self.mode == 'b':
             return byte_data
         else:
             byte_data = data.encode(self.encoding)
 
         # actually write it
-        written = self._write(byte_data)
+        if self.buffer_size != 0:
+            remaining = len(byte_data)
+            written = 0
+            while remaining > 0:
+                start = written
+                length = min(remaining, self.buffer_size)
+                result = self._write(byte_data[start: start + length])
+                if result == -1:
+                    written == result
+                    break
+                elif result == 0:
+                    # don't continue to try writing forever if nothing
+                    # is actually being written
+                    break
+                else:
+                    written += result
+                    remaining -= result
+        else:
+            written = self._write(byte_data)
         if written == -1:
             raise FtdiError(self.get_error_string())
         return written

tests/test_bitbang.py

                 pass
         self.assertCallsExact(_,
                 ['ftdi_init', 'ftdi_usb_open',
-                 'ftdi_set_bitmode',
+                 'ftdi_set_bitmode', 'ftdi_set_bitmode',
                  'ftdi_usb_close', 'ftdi_deinit'])
 
     def testOpen(self):

tests/test_driver.py

             with Device():
                 pass
         self.assertCallsExact(_,
-                ['ftdi_init', 'ftdi_usb_open',
+                ['ftdi_init', 'ftdi_usb_open', 'ftdi_set_bitmode',
                  'ftdi_usb_close', 'ftdi_deinit'])
 
     def testOpen(self):
         for idx, line in enumerate(d):
             self.assertEqual(line, lines[idx])
 
+    def testBuffer(self):
+        d = LoopDevice(mode='t', buffer_size=3)
+        d.write('Hello')
+        d.write(' World\n')
+        d.write('Bye')
+        self.assertEqual(d.readline(), 'Hello World\n')
+        self.assertEqual(d.readline(), 'Bye')
+
+
 
 if __name__ == "__main__":
     unittest.main()