Commits

Kyle Schaffrick committed f70ed9a

Decouple Multiplexer and ChannelFrameQueue, and fix livelock when channel
transmit queue is full.

  • Participants
  • Parent commits 80faf19

Comments (0)

Files changed (1)

 
         self.tx_sem = semaphore()
         self.tx_enqueue_notify = self.tx_sem.release
-        self.tx_dequeue_wait = self.tx_sem.acqevent.wait
         self.tx_wait_if_empty = self.tx_sem.acquire
 
     def get_next_muxed_frame(self):
             chan_nr = chan_nrs[chan_idx]
             queue = self._channel_queues[chan_nr]
 
-            if len(queue.send_frame_queue):
-                result = queue.send_frame_queue.pop(0)
+            result = queue.get_next_tx_frame()
+            if result is not None:
                 result.channel = chan_nr
 
         self._last_channel_idx = chan_idx
         Demultiplex a received frame.
         """
         try:
-            self._channel_queues[frame.channel].recv_frame_queue.send(frame)
+            self._channel_queues[frame.channel].put_next_rx_frame(frame)
         except KeyError:
             raise errors.UnknownChannelReceivedError("Recieved on unknown"
                     " channel %i" % frame.channel)
 
     def all_channel_exception(self, exc):
-        [ cq.recv_frame_queue.send(exc=exc)
-                for cq in self._channel_queues.itervalues() ]
+        for cq in self._channel_queues.itervalues():
+            cq.raise_(exc)
 
 
 class ChannelFrameQueue(object):
     """
     def __init__(self, multiplexer, send_queue_max=10):
         self.mux = multiplexer
-        self.send_queue_max = send_queue_max
-        self.send_frame_queue = []
-        self.recv_frame_queue = queue()
+        self.send_queue_sem = semaphore(limit=send_queue_max)
+        self._send_frame_queue = []
+        self._recv_frame_queue = queue()
         self._recv_pushback = []
 
+        self._send_queue_wait = self.send_queue_sem.release
+        self._send_dequeue_notify = self.send_queue_sem.acquire
+
     def send_frame(self, frame):
         """
         Queues a frame to be sent on this channel. This call will wait if the
         send queue length exceeds `send_queue_max` frames.
         """
         # We append the frame to our send queue, and then notify the mux.
-        self.send_frame_queue.append(frame)
+        self._send_queue_wait()
+        self._send_frame_queue.append(frame)
         self.mux.tx_enqueue_notify()
 
-        if self.send_queue_max is not None:
-            self._send_queue_wait()
+    def get_next_tx_frame(self):
+        """
+        Gets the next frame in the transmit queue, or None if it is empty.
+        """
+        if len(self._send_frame_queue):
+            self._send_dequeue_notify()
+            return self._send_frame_queue.pop(0)
 
-    def _send_queue_wait(self):
+    def put_next_rx_frame(self, frame):
         """
-        If our send queue is full, repeatedly wait for transmit queue dequeues
-        until it is no longer full.
+        Puts the next frame in the receive queue.
         """
-        while len(self.send_frame_queue) >= self.send_queue_max:
-            self.mux.tx_dequeue_wait()
+        self._recv_frame_queue.send(frame)
+
+    def raise_(self, exc):
+        """
+        Raise an exception on the channel.
+        """
+        self._recv_frame_queue.send(exc=exc)
 
     def recv_frame(self):
         """
         if self._recv_pushback:
             return self._recv_pushback.pop(0)
         else:
-            return self.recv_frame_queue.wait()
+            return self._recv_frame_queue.wait()
 
     def peek_frames(self):
         """
         Calls to drop_frame() or recv_frame() will invalidate the indices
         returned by this method.
         """
-        while self.recv_frame_queue.ready():
-            self._recv_pushback.append(self.recv_frame_queue.wait())
+        while self._recv_frame_queue.ready():
+            self._recv_pushback.append(self._recv_frame_queue.wait())
 
         index = 0
         for frame in self._recv_pushback:
             index += 1
 
         while True:
-            frame = self.recv_frame_queue.wait()
+            frame = self._recv_frame_queue.wait()
             self._recv_pushback.append(frame)
             yield index, frame
             index += 1