Commits

Anonymous committed efc56af

Removed numpy dependency and started work on ArrayQueue2 class

  • Participants
  • Parent commits dd33d29

Comments (0)

Files changed (1)

File src/array_queue.py

 import weakref
-import numpy
 from multiprocessing.sharedctypes import RawArray
-from multiprocessing import Pipe, Lock
+from multiprocessing import Pipe, Lock, RawValue, Condition
 from Queue import Empty, Full
 
 
+class ArrayQueue2(object):
+    """Multiprocess queue using shared memory and no pipes
+    """
+    def __init__(self, struct, size=20):
+        size = int(size)
+        self.buffer = RawArray(struct, size)
+        self.stock = RawArray('I', size)
+        self.queue = RawArray('I', size)
+        
+        self.stock[:] = xrange(size)
+        
+        self.stock_write = RawValue('I',0)
+        self.stock_read = RawValue('I',0)
+        self.queue_write = RawValue('I',0)
+        self.queue_read = RawValue('I',0)
+        
+        self.put_cond = Condition(Lock())
+        self.get_cond = Condition(Lock())
+        
+    def put(self, scalar):
+        with self.put_cond:
+            ### not finished ###
+
+
 class ArrayQueue(object):
     """
     Multiprocess queue using shared memory
         size - number of slots in the buffer
         """
         buf = RawArray(struct, int(size))
-        self._buffer = buf
-        self.buffer = numpy.frombuffer(buf, dtype=numpy.dtype(buf._type_))
+        self.buffer = buf
         stock_out, stock_in = Pipe(duplex=False)
         queue_out, queue_in = Pipe(duplex=False)
         
         queue_out_lock = Lock()
         queue_in_lock = Lock()
         
+        self.stock_closed = RawValue('h', 0)
+        self.queue_closed = RawValue('h', 0)
+        
         for i in xrange(size):
             stock_in.send(i)
         self.map={}
             
         
     def __getstate__(self):
-        d = super(ArrayQueue, self).__getstate__()
-        d.pop("_map", None)
-        d.pop("buffer", None)
+        d = self.__dict__.copy()
+        d.pop("map", None)
         return d
     
     def __setstate__(self, d):
-        super(ArrayQueue,self).__setstate__(d)
         d['map']={}
-        buffer = self._buffer
-        dt = buffer._type_
-        d['buffer']=numpy.frombuffer(buffer, 
-                                     dtype=numpy.dtype(dt))
+        self.__dict__.update(d)
         
     def put(self, scalar, block=True, timeout=None):
         stock_out,stock_out_lock , queue_in, queue_in_lock = self._put_obj
         
         with stock_out_lock:
+            if self.stock_closed.value:
+                raise EOFError
             if block:
                 if timeout is None:
                     idx = stock_out.recv()
                     idx = stock_out.recv()
                 else:
                     raise Full
+                
+            if idx is None:
+                self.stock_closed.value = 1
+                raise EOFError
         
         self.buffer[idx] = scalar
         #print "sending", idx
             stock_in.send(idx)
         
     def close(self):
-        put_obj = self._put_obj
-        with put_obj[3]:
-            put_obj[2].send(None)
+        stock_out, stock_out_lock , queue_in, queue_in_lock = self._put_obj
+        stock_in, stock_in_lock = self._ret_obj
+        with queue_in_lock:
+            queue_in.send(None)
+        with stock_in_lock:
+            stock_in.send(None)
         
-    def get(self, block=True, timeout=None):
+    def get(self, block=True, timeout=None):        
         queue_out, lock = self._get_obj
         
         with lock:
+            if self.queue_closed.value:
+                raise EOFError
+            
             if block:
                 if timeout is None:
                     idx = queue_out.recv()
                     idx = queue_out.recv()
                 else:
                     raise Empty
-            
-        if idx is None:
-            raise EOFError
+                
+            if idx is None:
+                self.queue_closed.value = 1
+                raise EOFError
+        
         value = self.buffer[idx]
         #print "ID", id(value), sys.getrefcount(value)
         r = weakref.ref(value, self._finalise)
         self.map[id(r)]=(idx, r)
-        return value
+        return value
+