Commits

Ginés Martínez Sánchez  committed 9813ddd Draft

rename circular.py to circular_fifo.py

  • Participants
  • Parent commits 18cc4b1

Comments (0)

Files changed (4)

File ginsfsm/circular.py

-# -*- coding: utf-8 -*-
-""" Porting of GSMG.C
-"""
-import logging
-
-from ginsfsm.utils import string_to_bytearray
-
-DEFAULT_CIRCULAR_SIZE = 8 * 1021
-
-
-class CircularFullBufferError(Exception):
-    """ Raised when Circular have problems."""
-
-
-class Circular(object):
-    def __init__(self, size=DEFAULT_CIRCULAR_SIZE):
-        """ Circular buffer.
-        """
-        self.size = size
-        self.start = 0
-        self.count = 0
-        self.data = bytearray(size)
-
-    def __str__(self):
-        return "size %d, count %d, start %d, data %s" % (
-            self.size,
-            self.count,
-            self.start,
-            self.data,
-        )
-
-    def putdata(self, data):
-        """ WRITING to gmsg.
-            Add 'ln' bytes to the message tail.
-            Return number of written bytes.
-        """
-        data = string_to_bytearray(data)
-        length = ln = len(data)
-        if ln > self.free_space():
-            raise CircularFullBufferError(
-                "ERROR full buffer, not space for %d bytes" % (ln,))
-
-        tail = (self.start + self.count) % self.size  # write pointer
-        right_len = self.size - tail  # right zone
-        self.count += ln
-        if ln <= right_len:
-            # enough with the right zone
-            self.data[tail:tail + ln] = data[:]
-        else:
-            # put maximum at the right zone
-            self.data[tail:tail + right_len] = data[:right_len]
-            ln -= right_len
-
-            # put the rest at the left zone
-            self.data[0:ln] = data[right_len:]
-
-        return length
-
-    def getdata(self, ln=0):
-        """ READING : Pull 'ln' bytes.
-            Return the pulled data.
-            If ln is <= 0 then all data is pulled.
-            If there is no enough 'ln' bytes, it return all the remaining.
-        """
-        if ln <= 0 or ln > self.count:
-            ln = self.count
-        right_len = self.size - self.start  # right zone (from start to end)
-        if ln <= right_len:
-            # enough with the right zone
-            data = self.data[self.start:ln]
-            self.count -= ln
-            self.start += ln
-        else:
-            # get all the right zone
-            data = self.data[self.start:self.size]
-            self.count -= ln  # decrement now!
-            ln -= self.size - self.start
-
-            # add the rest of the left zone
-            data += self.data[0:ln]
-            self.start = ln
-
-        return data
-
-    def busy_space(self):
-        """READING: Return number of bytes pending of reading
-        """
-        return self.count
-
-    def free_space(self):
-        """ WRITTING: Return number of free byte space to write.
-        """
-        free_bytes = self.size - self.count
-        return free_bytes

File ginsfsm/circular_fifo.py

+# -*- coding: utf-8 -*-
+""" FIFO queue implemented with a Circular buffer.
+    It's not thread-safe.
+"""
+from ginsfsm.utils import string_to_bytearray
+
+DEFAULT_CIRCULAR_SIZE = 8192
+
+
+class CircularFullBufferError(Exception):
+    """ Raised when Circular have problems."""
+
+
+class Circular(object):
+    def __init__(self, size=DEFAULT_CIRCULAR_SIZE):
+        """ Circular buffer.
+        """
+        self.size = size
+        self.start = 0
+        self.count = 0
+        self.data = bytearray(size)
+
+    def __str__(self):
+        return "size %d, count %d, start %d, data %s" % (
+            self.size,
+            self.count,
+            self.start,
+            self.data,
+        )
+
+    def putdata(self, data):
+        """ WRITING to gmsg.
+            Add 'ln' bytes to the message tail.
+            Return number of written bytes.
+        """
+        data = string_to_bytearray(data)
+        length = ln = len(data)
+        if ln > self.free_space():
+            raise CircularFullBufferError(
+                "ERROR full buffer, not space for %d bytes" % (ln,))
+
+        tail = (self.start + self.count) % self.size  # write pointer
+        right_len = self.size - tail  # right zone
+        self.count += ln
+        if ln <= right_len:
+            # enough with the right zone
+            self.data[tail:tail + ln] = data[:]
+        else:
+            # put maximum at the right zone
+            self.data[tail:tail + right_len] = data[:right_len]
+            ln -= right_len
+
+            # put the rest at the left zone
+            self.data[0:ln] = data[right_len:]
+
+        return length
+
+    def getdata(self, ln=0):
+        """ READING : Pull 'ln' bytes.
+            Return the pulled data.
+            If ln is <= 0 then all data is pulled.
+            If there is no enough 'ln' bytes, it return all the remaining.
+        """
+        if ln <= 0 or ln > self.count:
+            ln = self.count
+        right_len = self.size - self.start  # right zone (from start to end)
+        if ln <= right_len:
+            # enough with the right zone
+            data = self.data[self.start:ln]
+            self.count -= ln
+            self.start += ln
+        else:
+            # get all the right zone
+            data = self.data[self.start:self.size]
+            self.count -= ln  # decrement now!
+            ln -= self.size - self.start
+
+            # add the rest of the left zone
+            data += self.data[0:ln]
+            self.start = ln
+
+        return data
+
+    def busy_space(self):
+        """READING: Return number of bytes pending of reading
+        """
+        return self.count
+
+    def free_space(self):
+        """ WRITTING: Return number of free byte space to write.
+        """
+        free_bytes = self.size - self.count
+        return free_bytes

File ginsfsm/tests/test_circular.py

-import unittest
-from ginsfsm.circular import (
-    Circular,
-    CircularFullBufferError,
-)
-
-
-class TestCircular(unittest.TestCase):
-    def setUp(self):
-        self.circular = Circular(5)
-
-    def test_getdata_putdata_without_overwrite(self):
-        data = self.circular.getdata(1)
-        self.assertEqual(data, b'')
-
-        count = self.circular.free_space()
-        self.assertEqual(count, 5)
-
-        count = self.circular.putdata("123")
-        self.assertEqual(count, 3)
-        count = self.circular.free_space()
-        self.assertEqual(count, 2)
-
-        count = self.circular.putdata("ab")
-        self.assertEqual(count, 2)
-        count = self.circular.free_space()
-        self.assertEqual(count, 0)
-
-        self.assertRaises(CircularFullBufferError,
-            self.circular.putdata,
-            "XX")
-
-        data = self.circular.getdata(5)
-        self.assertEqual(data, b"123ab")
-
-        data = self.circular.getdata(1)
-        self.assertEqual(data, b'')
-
-        count = self.circular.putdata("qw")
-        self.assertEqual(count, 2)
-        data = self.circular.getdata()
-        self.assertEqual(data, b'qw')
-
-    def test_getdata_putdata_with_overwrite(self):
-        count = self.circular.putdata("123")
-        self.assertEqual(count, 3)
-        count = self.circular.free_space()
-        self.assertEqual(count, 2)
-
-        self.assertRaises(CircularFullBufferError,
-            self.circular.putdata,
-            "abc")
-
-        data = self.circular.getdata(2)
-        self.assertEqual(data, b"12")
-        count = self.circular.free_space()
-        self.assertEqual(count, 4)
-        count = self.circular.busy_space()
-        self.assertEqual(count, 1)
-
-        count = self.circular.putdata("abcd")
-        self.assertEqual(count, 4)
-
-        data = self.circular.getdata(0)
-        self.assertEqual(data, b"3abcd")
-        count = self.circular.free_space()
-        self.assertEqual(count, 5)
-
-        count = self.circular.putdata("zxcvb")
-        self.assertEqual(count, 5)
-        count = self.circular.free_space()
-        self.assertEqual(count, 0)
-
-        data = self.circular.getdata(4)
-        self.assertEqual(data, b"zxcv")
-        count = self.circular.free_space()
-        self.assertEqual(count, 4)
-
-        data = self.circular.getdata(3)
-        self.assertEqual(data, b"")
-        count = self.circular.free_space()
-        self.assertEqual(count, 5)
-
-if __name__ == "__main__":
-    circular = Circular(5)
-
-    circular.putdata("123")
-    print circular
-    circular.getdata(2)
-    print circular
-
-    circular.putdata("abcd")
-    print circular
-
-    circular.getdata(0)
-    print circular

File ginsfsm/tests/test_circular_fifo.py

+import unittest
+from ginsfsm.circular_fifo import (
+    Circular,
+    CircularFullBufferError,
+)
+
+
+class TestCircular(unittest.TestCase):
+    def setUp(self):
+        self.circular = Circular(5)
+
+    def test_getdata_putdata_without_overwrite(self):
+        data = self.circular.getdata(1)
+        self.assertEqual(data, b'')
+
+        count = self.circular.free_space()
+        self.assertEqual(count, 5)
+
+        count = self.circular.putdata("123")
+        self.assertEqual(count, 3)
+        count = self.circular.free_space()
+        self.assertEqual(count, 2)
+
+        count = self.circular.putdata("ab")
+        self.assertEqual(count, 2)
+        count = self.circular.free_space()
+        self.assertEqual(count, 0)
+
+        self.assertRaises(CircularFullBufferError,
+            self.circular.putdata,
+            "XX")
+
+        data = self.circular.getdata(5)
+        self.assertEqual(data, b"123ab")
+
+        data = self.circular.getdata(1)
+        self.assertEqual(data, b'')
+
+        count = self.circular.putdata("qw")
+        self.assertEqual(count, 2)
+        data = self.circular.getdata()
+        self.assertEqual(data, b'qw')
+
+    def test_getdata_putdata_with_overwrite(self):
+        count = self.circular.putdata("123")
+        self.assertEqual(count, 3)
+        count = self.circular.free_space()
+        self.assertEqual(count, 2)
+
+        self.assertRaises(CircularFullBufferError,
+            self.circular.putdata,
+            "abc")
+
+        data = self.circular.getdata(2)
+        self.assertEqual(data, b"12")
+        count = self.circular.free_space()
+        self.assertEqual(count, 4)
+        count = self.circular.busy_space()
+        self.assertEqual(count, 1)
+
+        count = self.circular.putdata("abcd")
+        self.assertEqual(count, 4)
+
+        data = self.circular.getdata(0)
+        self.assertEqual(data, b"3abcd")
+        count = self.circular.free_space()
+        self.assertEqual(count, 5)
+
+        count = self.circular.putdata("zxcvb")
+        self.assertEqual(count, 5)
+        count = self.circular.free_space()
+        self.assertEqual(count, 0)
+
+        data = self.circular.getdata(4)
+        self.assertEqual(data, b"zxcv")
+        count = self.circular.free_space()
+        self.assertEqual(count, 4)
+
+        data = self.circular.getdata(3)
+        self.assertEqual(data, b"")
+        count = self.circular.free_space()
+        self.assertEqual(count, 5)
+
+if __name__ == "__main__":
+    circular = Circular(5)
+
+    circular.putdata("123")
+    print circular
+    circular.getdata(2)
+    print circular
+
+    circular.putdata("abcd")
+    print circular
+
+    circular.getdata(0)
+    print circular