Snippets

Camilo Rocha Priority queue implemented with a minimum heap in which heapify operations are done with heap indexes

Created by Camilo Rocha
class pqueue(object):
  """a minheap implementation of a priority queue"""

  """lowest priority"""
  INF = float('inf')
  
  def __init__(self,cap=100):
    """creates a priority queue with keys in the set {0..cap-1}"""
    # list of pairs (key,priority) in a minheap structure
    self.__heap = [ (k,self.INF) for k in range(cap) ]
    # index of i in h
    self.__key  = [ k for k in range(cap) ]

  def __str__(self):
    """return the string representation of the priority queue"""
    return str(self.__heap)

  def __len__(self):
    """return the number of keys in the data structure"""
    return len(self.__heap)

  def __is_root(self,i):
    """is the given index the root of the minheap?"""
    return i==0
  
  def __parent(self,i):
    """return the parent index of i; it might not exist"""
    return (i-1)>>1

  def __left(self,i):
    """return the left child index of i; it might not exist"""
    return 1+(i<<1)

  def __right(self,i):
    """return the right child index of i; it might not exist"""
    return (i+1)<<1

  def __heapify_up(self,i):
    """heapify up index i in the heap"""
    if i!=0:
      p = self.__parent(i)
      if self.__heap[i][1] < self.__heap[p][1]:
        ki,kp = self.__heap[i][0],self.__heap[p][0]
        self.__heap[i],self.__heap[p] = self.__heap[p],self.__heap[i]
        self.__key[ki],self.__key[kp] = p,i
        self.__heapify_up(p)

  def __heapify_down(self,i):
    """heapify down index i in the heap"""
    l,r,b = self.__left(i),self.__right(i),i
    if l<len(self) and self.__heap[b][1]>self.__heap[l][1]:
      b = l
    if r<len(self) and self.__heap[b][1]>self.__heap[r][1]:
      b = r
    if i!=b:
      ki,kb = self.__heap[i][0],self.__heap[b][0]
      self.__heap[i],self.__heap[b] = self.__heap[b],self.__heap[i]
      self.__key[ki],self.__key[kb] = b,i
      self.__heapify_down(b)

  def is_key(self,k):
    """is the given k a key in the queue?"""
    return 0<=k<len(self)

  def get_priority(self,k):
    """return the priority of the given key k"""
    assert self.is_key(k)
    return self.__heap[self.__key[k]][1]
  
  def query(self):
    """return the pair (key,priority) with the highest priority"""
    return self.__heap[0]

  def deque(self):
    """make the priority of the element with the highest priority the
    lowest in the queue
    """
    self.__heap[0] = (self.__heap[0][0],self.INF)
    self.__heapify_down(0)

  def decrease_key(self,k,p):
    """decrease the priority of the given key k to p; it should be the
    case that k is a valid key and p <= get_priority(k)
    """
    assert self.is_key(k)
    assert self.get_priority(k)>=p
    self.__heap[self.__key[k]] = (k,p)
    self.__heapify_up(self.__key[k])

  def enque(self,k,p):
    """sets the priority of key k to be p, assuming that it has the
    highest priority
    """
    assert self.is_key(k)
    assert self.get_priority(k) == self.INF
    self.decrease_key(k,p)

  def is_empty(self):
    """is the priority queue empty in the sense that no key has a
    priority?
    """
    return self.__heap[0][1] == self.INF

  def is_full(self):
    """is the priority queue full in the sense that all keys have a
    priority?
    """
    return self.__heap[-1][1] != self.INF

  def add_key(self):
    """add the next key to the priority queue with the worst priority
    """
    self.__heap.append((len(self.__heap),self.INF))
    self.__key.append(len(self.__key))

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.