Commits

Ginés Martínez Sánchez committed 47057c4 Draft

improving io buffers

Comments (0)

Files changed (4)

ginsfsm/circular.py

+# -*- coding: utf-8 -*-
+""" Porting of GSMG.C
+"""
+import logging
+
+from ginsfsm.utils import string_to_bytearray
+
+DEFAULT_CIRCULAR_SIZE = 8 * 1021
+
+
+class CircularError(Exception):
+    """ Raised when Circular have problems."""
+
+
+class Circular(object):
+    def __init__(self, size=DEFAULT_CIRCULAR_SIZE):
+        """ Circular buffer.
+        """
+        self.size = size
+        self.start = 0
+        self.count = 0
+        self.data = bytearray(size)
+
+    def __str__(self):
+        return "size %d, count %d, start %d, data %s" % (
+            self.size,
+            self.count,
+            self.start,
+            self.data,
+        )
+
+    def putdata(self, data):
+        """ WRITING to gmsg.
+            Add 'ln' bytes to the message tail.
+            Return number of written bytes.
+        """
+        data = string_to_bytearray(data)
+        ln = len(data)
+        if ln > self.freebytes():
+            logging.error("ERROR circular putchar: FULL buffer")
+            raise CircularError(
+                "ERROR full buffer, not space for %d bytes" % (ln,))
+        tail = (self.start + self.count) % self.size
+        self.data[tail:ln] = data[:]
+        self.count += ln
+        return ln
+
+    def getdata(self, ln):
+        """ READING : Pull 'ln' bytes.
+            Read from the reading current position.
+            Return the pulled data or None.
+        """
+        if ln <= 0:
+            return None
+        if ln <= self.count:
+            data = self.data[self.start:ln]
+            self.count -= ln
+            self.start = (self.start + ln) % self.size
+            return data
+
+        return None
+
+    def bytesleft(self):
+        """READING: Return number of bytes pending of reading
+        """
+        return self.count
+
+    def freebytes(self):
+        """ WRITTING: Return number of free byte space to write.
+        """
+        free_bytes = self.size - self.count
+        return free_bytes
         #  Calcula tamaño segmento
         #
         if gmsg.cur_size >= gmsg.max_size:
-            msg = "segm_create(): MAXIMUM size reached (%d over %d)" % (
+            msg = "ERROR segm_create(): MAXIMUM size reached (%d over %d)" % (
                 gmsg.cur_size, gmsg.max_size)
-            logging.warning(msg)
+            logging.error(msg)
             raise SegmError(msg)  # Alcanzado máximo tamaño
 
         data_size = gmsg.max_size - gmsg.cur_size
 
 
 class GMsg(ListItem):
-    def __init__(self, segm_size, max_size=0, circular=False):
+    def __init__(self, segm_size, max_size=0):
         """ Message Manager
         The message are organized in segments.
 
 
         If max_size is 0 then the max_size is the segm_size.
 
-        The buffer can be circular, then being restricted to only one segment.
-
         :param segm_size: Maximum size of a segment
         :param max_size:
-        :param circular:
         """
         self.segm_size = segm_size  # tamaño segmentos datos
         # tamaño máximo total, si es 0, igual a segm_size (like PACKET!!!)
     def getdata(self, ln):
         """ READING : Pull 'ln' bytes.
             Read from the reading current position.
-            If there is no enough data (ln) then return None.
             Return the pulled data or None.
         """
         if not self.rd_segm:
             segm = dl_next(segm)
         return sum_len
 
-    def free_space(self):
+    def freebytes(self):
         """ WRITTING: Return number of free byte space to write.
         """
         free_bytes = self.max_size - self.totalbytes()

ginsfsm/tests/test_circular.py

+import unittest
+from ginsfsm.circular import (
+    Circular,
+    CircularError,
+)
+
+
+class TestCircular(unittest.TestCase):
+    def setUp(self):
+        self.circular = Circular(5)
+
+    def test_getdata_putdata_without_overwrite(self):
+        data = self.circular.getdata(1)
+        self.assertEqual(data, None)
+
+        count = self.circular.freebytes()
+        self.assertEqual(count, 5)
+
+        count = self.circular.putdata("123")
+        self.assertEqual(count, 3)
+        count = self.circular.freebytes()
+        self.assertEqual(count, 2)
+
+        count = self.circular.putdata("ab")
+        self.assertEqual(count, 2)
+        count = self.circular.freebytes()
+        self.assertEqual(count, 0)
+
+        self.assertRaises(CircularError,
+            self.circular.putdata,
+            "XX")
+
+        data = self.circular.getdata(5)
+        self.assertEqual(data, b"123ab")
+
+        data = self.circular.getdata(1)
+        self.assertEqual(data, None)
+
+    def test_getdata_putdata_with_overwrite(self):
+        count = self.circular.putdata("123")
+        self.assertEqual(count, 3)
+        count = self.circular.freebytes()
+        self.assertEqual(count, 2)
+
+        self.assertRaises(CircularError,
+            self.circular.putdata,
+            "abc")
+
+        count = self.circular.freebytes()
+        self.assertEqual(count, 2)
+
+        data = self.circular.getdata(2)
+        self.assertEqual(data, b"12")
+        count = self.circular.freebytes()
+        self.assertEqual(count, 4)
+
+
+if __name__ == "__main__":
+    circular = Circular(5)
+
+    circular.putdata("123")
+    print circular
+    circular.putdata("abc")
+    print circular
+    count = circular.freebytes()
+    data = circular.getdata(5)
+    print data
+    print circular

ginsfsm/tests/test_gmsg.py

 import unittest
-from ginsfsm.gmsg import GMsg
+from ginsfsm.gmsg import (
+    GMsg,
+#    GMsgError,
+#    SegmError
+)
 
 
 class TestGMsg(unittest.TestCase):
         self.gmsg.putdata("12")
         count = self.gmsg.totalbytes()
         self.assertEqual(count, 2)
+        count = self.gmsg.freebytes()
+        self.assertEqual(count, 3)
         self.gmsg.putdata("abcd")
         count = self.gmsg.bytesleft()
         self.assertEqual(count, 5)
+        count = self.gmsg.freebytes()
+        self.assertEqual(count, 0)
+
+    def test_no_circular(self):
+        # fill the buffer
+        count = self.gmsg.putdata("123")
+        self.assertEqual(count, 3)
+
+        count = self.gmsg.totalbytes()
+        self.assertEqual(count, 3)
+        count = self.gmsg.freebytes()
+        self.assertEqual(count, 2)
+
+        count = self.gmsg.putdata("456")
+        self.assertEqual(count, 2)
+
+        count = self.gmsg.totalbytes()
+        self.assertEqual(count, 5)
+        count = self.gmsg.freebytes()
+        self.assertEqual(count, 0)
+
+        # pull all the buffer
+        data = self.gmsg.getdata(3)
+        self.assertEqual(data, bytearray(b"123"))
+        data = self.gmsg.getdata(3)
+        self.assertEqual(data, bytearray(b"45"))
+
+        count = self.gmsg.bytesleft()
+        self.assertEqual(count, 0)
+
+        count = self.gmsg.totalbytes()
+        self.assertEqual(count, 5)
+        count = self.gmsg.freebytes()
+        self.assertEqual(count, 0)
+