Snippets
Camilo Rocha Priority queue implemented with a minimum heap in which heapify operations are done with key indexes
Created by
Camilo Rocha
last modified
Camilo Rocha
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | class pqueue(object):
"""a minheap implementation of a priority queue"""
"""lowest priority"""
INF = float('inf')
def __init__(self,cap=100):
"""creates a priority queue with keys in the set {0..cap-1}"""
# list of pairs (key,priority) in a minheap structure
self.__heap = [ (i,self.INF) for i in range(cap) ]
# index of i in h
self.__key = [ i for i in range(cap) ]
def __str__(self):
"""return the string representation of the priority queue"""
return str(self.__heap)
def __len__(self):
"""return the number of keys in the data structure"""
return len(self.__heap)
def __is_root(self,k):
"""is the given key the root of the minheap?"""
return self.__key[k]==0
def __parent(self,k):
"""return the key of the parent of the given key; if it does not
exist, an error is raised
"""
assert not(self.__is_root(k))
return self.__heap[(self.__key[k]-1)>>1][0]
def parent(self,k):
return self.__parent(k)
def __left(self,k):
"""return the key of the left child of k in the heap; if it does not
exist, an error is raised
"""
assert self.is_key(1+(self.__key[k]<<1))
return self.__heap[1+(self.__key[k]<<1)][0]
def __right(self,k):
"""return the key of the right child of k in the heap; if it does not
exist, an error is raised
"""
assert self.is_key((1+self.__key[k])<<1)
return self.__heap[(1+self.__key[k])<<1][0]
def __heapify_up(self,k):
"""heapify up key k in the heap"""
if not(self.__is_root(k)):
kp = self.__parent(k)
if self.get_priority(kp)>self.get_priority(k):
self.__heap[self.__key[k]],self.__heap[self.__key[kp]] = \
self.__heap[self.__key[kp]],self.__heap[self.__key[k]]
self.__key[k],self.__key[kp] = self.__key[kp],self.__key[k]
self.__heapify_up(k)
def __heapify_down(self,k):
"""heapify down key k in the heap"""
l,r,b = None,None,k
if self.is_key(1+(self.__key[k]<<1)):
l = self.__left(k)
if self.__heap[self.__key[l]][1]<self.__heap[self.__key[b]][1]:
b = l
if self.is_key((1+self.__key[k])<<1):
r = self.__right(k)
if self.__heap[self.__key[r]][1]<self.__heap[self.__key[b]][1]:
b = r
if b!=k:
self.__heap[self.__key[k]],self.__heap[self.__key[b]] = \
self.__heap[self.__key[b]],self.__heap[self.__key[k]]
self.__key[k],self.__key[b] = self.__key[b],self.__key[k]
self.__heapify_down(k)
def is_key(self,k):
"""return if the given k is a key in the priority queue"""
return 0<=k<len(self)
def get_priority(self,k):
"""return the priority of the given key k"""
assert self.is_key(k)
return self.__heap[self.__key[k]][1]
def query(self):
"""return the pair (key,priority) with the highest priority"""
return self.__heap[0]
def deque(self):
"""make the priority of the element with the highest priority the
lowest in the queue
"""
self.__heap[0]=(self.__heap[0][0],self.INF)
self.__heapify_down(self.__heap[0][0])
def decrease_key(self,k,p):
"""decrease the priority of the given key k to p; it should be the
case that k is a valid key and p <= get_priority(k)
"""
assert self.is_key(k)
assert p <= self.get_priority(k)
self.__heap[k]=(k,p)
self.__heapify_up(k)
def enque(self,k,p):
"""sets the priority of key k to be p, assuming that it has the
highest priority
"""
assert self.is_key(k)
assert self.get_priority(k) == self.INF
self.decrease_key(k,p)
def is_empty(self):
"""is the priority queue empty in the sense that no key has a
priority?
"""
return self.__heap[0][1] == self.INF
def is_full(self):
"""is the priority queue full in the sense that all keys have a
priority?
"""
return self.__heap[-1][1] != self.INF
def add_key(self):
"""add the next key to the priority queue with the worst priority
"""
self.__heap.append((len(self.__heap),self.INF))
self.__key.append(len(self.__key))
|
Comments (0)
You can clone a snippet to your computer for local editing. Learn more.