Snippets

Camilo Rocha Priority queue implemented with a minimum heap in which heapify operations are done with key indexes

Created by Camilo Rocha last modified Camilo Rocha
class pqueue(object):
  """a minheap implementation of a priority queue"""

  """lowest priority"""
  INF = float('inf')

  def __init__(self,cap=100):
    """creates a priority queue with keys in the set {0..cap-1}"""
    # list of pairs (key,priority) in a minheap structure
    self.__heap = [ (i,self.INF) for i in range(cap) ]
    # index of i in h
    self.__key = [ i for i in range(cap) ]

  def __str__(self):
    """return the string representation of the priority queue"""
    return str(self.__heap)

  def __len__(self):
    """return the number of keys in the data structure"""
    return len(self.__heap)
  
  def __is_root(self,k):
    """is the given key the root of the minheap?"""
    return self.__key[k]==0

  def __parent(self,k):
    """return the key of the parent of the given key; if it does not
    exist, an error is raised
    """
    assert not(self.__is_root(k))
    return self.__heap[(self.__key[k]-1)>>1][0]

  def parent(self,k):
    return self.__parent(k)
  
  def __left(self,k):
    """return the key of the left child of k in the heap; if it does not
    exist, an error is raised
    """
    assert self.is_key(1+(self.__key[k]<<1))
    return self.__heap[1+(self.__key[k]<<1)][0]

  def __right(self,k):
    """return the key of the right child of k in the heap; if it does not
    exist, an error is raised
    """
    assert self.is_key((1+self.__key[k])<<1)
    return self.__heap[(1+self.__key[k])<<1][0]

  def __heapify_up(self,k):
    """heapify up key k in the heap"""
    if not(self.__is_root(k)):
      kp = self.__parent(k)
      if self.get_priority(kp)>self.get_priority(k):
        self.__heap[self.__key[k]],self.__heap[self.__key[kp]] = \
            self.__heap[self.__key[kp]],self.__heap[self.__key[k]]
        self.__key[k],self.__key[kp] = self.__key[kp],self.__key[k]
        self.__heapify_up(k)

  def __heapify_down(self,k):
    """heapify down key k in the heap"""
    l,r,b = None,None,k
    if self.is_key(1+(self.__key[k]<<1)):
      l = self.__left(k)
      if self.__heap[self.__key[l]][1]<self.__heap[self.__key[b]][1]:
        b = l
    if self.is_key((1+self.__key[k])<<1):
      r = self.__right(k)
      if self.__heap[self.__key[r]][1]<self.__heap[self.__key[b]][1]:
        b = r
    if b!=k:
      self.__heap[self.__key[k]],self.__heap[self.__key[b]] = \
        self.__heap[self.__key[b]],self.__heap[self.__key[k]]
      self.__key[k],self.__key[b] = self.__key[b],self.__key[k]
      self.__heapify_down(k)

  def is_key(self,k):
    """return if the given k is a key in the priority queue"""
    return 0<=k<len(self)
    
  def get_priority(self,k):
    """return the priority of the given key k"""
    assert self.is_key(k)
    return self.__heap[self.__key[k]][1]

  def query(self):
    """return the pair (key,priority) with the highest priority"""
    return self.__heap[0]

  def deque(self):
    """make the priority of the element with the highest priority the
    lowest in the queue
    """
    self.__heap[0]=(self.__heap[0][0],self.INF)
    self.__heapify_down(self.__heap[0][0])

  def decrease_key(self,k,p):
    """decrease the priority of the given key k to p; it should be the
    case that k is a valid key and p <= get_priority(k)
    """
    assert self.is_key(k)
    assert p <= self.get_priority(k)
    self.__heap[k]=(k,p)
    self.__heapify_up(k)

  def enque(self,k,p):
    """sets the priority of key k to be p, assuming that it has the
    highest priority
    """
    assert self.is_key(k)
    assert self.get_priority(k) == self.INF
    self.decrease_key(k,p)
    
  def is_empty(self):
    """is the priority queue empty in the sense that no key has a
    priority?
    """
    return self.__heap[0][1] == self.INF

  def is_full(self):
    """is the priority queue full in the sense that all keys have a
    priority?
    """
    return self.__heap[-1][1] != self.INF

  def add_key(self):
    """add the next key to the priority queue with the worst priority
    """
    self.__heap.append((len(self.__heap),self.INF))
    self.__key.append(len(self.__key))

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.