Snippets

Camilo Rocha Priority queue implemented with a minimum heap in which heapify operations are done with heap indexes

Created by Former user

File pqueue.py Added

  • Ignore whitespace
  • Hide word diff
+class pqueue(object):
+  """a minheap implementation of a priority queue"""
+
+  """lowest priority"""
+  INF = float('inf')
+  
+  def __init__(self,cap=100):
+    """creates a priority queue with keys in the set {0..cap-1}"""
+    # list of pairs (key,priority) in a minheap structure
+    self.__heap = [ (k,self.INF) for k in range(cap) ]
+    # index of i in h
+    self.__key  = [ k for k in range(cap) ]
+
+  def __str__(self):
+    """return the string representation of the priority queue"""
+    return str(self.__heap)
+
+  def __len__(self):
+    """return the number of keys in the data structure"""
+    return len(self.__heap)
+
+  def __is_root(self,i):
+    """is the given index the root of the minheap?"""
+    return i==0
+  
+  def __parent(self,i):
+    """return the parent index of i; it might not exist"""
+    return (i-1)>>1
+
+  def __left(self,i):
+    """return the left child index of i; it might not exist"""
+    return 1+(i<<1)
+
+  def __right(self,i):
+    """return the right child index of i; it might not exist"""
+    return (i+1)<<1
+
+  def __heapify_up(self,i):
+    """heapify up index i in the heap"""
+    if i!=0:
+      p = self.__parent(i)
+      if self.__heap[i][1] < self.__heap[p][1]:
+        ki,kp = self.__heap[i][0],self.__heap[p][0]
+        self.__heap[i],self.__heap[p] = self.__heap[p],self.__heap[i]
+        self.__key[ki],self.__key[kp] = p,i
+        self.__heapify_up(p)
+
+  def __heapify_down(self,i):
+    """heapify down index i in the heap"""
+    l,r,b = self.__left(i),self.__right(i),i
+    if l<len(self) and self.__heap[b][1]>self.__heap[l][1]:
+      b = l
+    if r<len(self) and self.__heap[b][1]>self.__heap[r][1]:
+      b = r
+    if i!=b:
+      ki,kb = self.__heap[i][0],self.__heap[b][0]
+      self.__heap[i],self.__heap[b] = self.__heap[b],self.__heap[i]
+      self.__key[ki],self.__key[kb] = b,i
+      self.__heapify_down(b)
+
+  def is_key(self,k):
+    """is the given k a key in the queue?"""
+    return 0<=k<len(self)
+
+  def get_priority(self,k):
+    """return the priority of the given key k"""
+    assert self.is_key(k)
+    return self.__heap[self.__key[k]][1]
+  
+  def query(self):
+    """return the pair (key,priority) with the highest priority"""
+    return self.__heap[0]
+
+  def deque(self):
+    """make the priority of the element with the highest priority the
+    lowest in the queue
+    """
+    self.__heap[0] = (self.__heap[0][0],self.INF)
+    self.__heapify_down(0)
+
+  def decrease_key(self,k,p):
+    """decrease the priority of the given key k to p; it should be the
+    case that k is a valid key and p <= get_priority(k)
+    """
+    assert self.is_key(k)
+    assert self.get_priority(k)>=p
+    self.__heap[self.__key[k]] = (k,p)
+    self.__heapify_up(self.__key[k])
+
+  def enque(self,k,p):
+    """sets the priority of key k to be p, assuming that it has the
+    highest priority
+    """
+    assert self.is_key(k)
+    assert self.get_priority(k) == self.INF
+    self.decrease_key(k,p)
+
+  def is_empty(self):
+    """is the priority queue empty in the sense that no key has a
+    priority?
+    """
+    return self.__heap[0][1] == self.INF
+
+  def is_full(self):
+    """is the priority queue full in the sense that all keys have a
+    priority?
+    """
+    return self.__heap[-1][1] != self.INF
+
+  def add_key(self):
+    """add the next key to the priority queue with the worst priority
+    """
+    self.__heap.append((len(self.__heap),self.INF))
+    self.__key.append(len(self.__key))
HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.