- Introduction
- Max heap data structrue
- Heapsort 堆排序
- The heapq module
- k largest problems
- When to use heap
- Appendix: Implement max heap in Python
Introduction
This post is about the heap data structure and heap sort.
Max heap data structrue
The heap data structure is a fusion of array and tree.
It has \(log(n)\) push and pop, and constant time for accessing the max or min element.
The max (or min) heap data structure is a complete binary tree with the max heap property: At each node, the parent is larger (or smaller) than its chidren. (In contrary, binary search trees have more stringent requirement, with the left child smaller than the parent, and the right child bigger than the parent).
Complete means that all levels of the tree, except possibly the last one (deepest) are fully filled, and, if the last level of the tree is not complete, the nodes of that level are filled from left to right (left-aligned).
\[\forall \text{complete binary tree, }\exists\text{ an unique array, and vice versa.}\] \[Left(i)=2*i+1\] \[Right(i)=2*i+2\] \[Parent(j)=[(i-2)//2]\]The “parent-children” relationship is defined by the array indices from these three functions. There is no need of any pointers. There is no need for attributes such as left or right children either.
The parent-child relationship can be mapped via the indices of the elements.
index | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |
---|---|---|---|---|---|---|---|---|---|
A | 1 | 2 | 3 | 4 | 7 | 8 | 5 | 9 | 6 |
Heapsort 堆排序
Heapsort can be thought of as an improved Selection sort. “Heapsort is nothing but an implementation of selection sort using the right data structure.”
Like Selection sort, heapsort partitions its input into a sorted and an unsorted region, and it iteratively shrinks the unsorted region by selecting the largest (or smallest) element from it and inserting it into the sorted region in order.
Unlike selection sort, heapsort does not waste time with a linear-time scan (no pointers) of the unsorted region; rather, heap sort maintains the unsorted region in a max heap data structure to more quickly find the largest element in each step.
Cost
- Time complexity: In practice on most machines it is slower than a well-implemented quicksort (\(O(n^2)\)), it has better worst-case \(O(n*log(n))\) runtime than quicksort.
Heapsort is an in-place algorithm, but it is not a stable sort.
- Space complexity: \(O(n^2)\)
Here is how heapsort compares with other well-known sorting methods.
The heapq module
The heapq module module is part of Python standard library, with well-documented Github source code.
Note that it implements min heap, which means that the root node is the smallest.
- heapq.heapify sorts input array (list, or list of lists) into a min heap in place. The result in example below [1, 2, 3, 4, 7, 8, 5, 9, 6] is a min heap. Note that the first one is indeed the smallest element.
- heapq.nlargest(k, A) returns the k largest elments in A, and keeps the min heap data structure
- heapq.heappop(A) pops the smallest element out of A, and keeps A in the min heap data structure
- heapq.heappush(A, element) pushes a new element into A, and keeps the min heap data structure
- heapq.heappushpop(A, element) pushes a new element into A, pops the smallest, and keeps the min heap data structure
- minHeap[0] returns the smallest element without popping it.
By default heapq.heapify sorts by the first elements of elements of input array as shown in the operations on A below.
from heapq import heapify
A = [5, 7 ,3, 9, 2, 8, 1, 4, 6]
heapify(A)
print(A)
# [1, 2, 3, 4, 7, 8, 5, 9, 6]
A = [[2, 6, 50],
[4, 7, 60],
[1, 5, 100],
[6, 3, 10]]
heapify(A)
print(A)
# [[1, 5, 100], [4, 7, 60], [2, 6, 50], [6, 3, 10]]
Sort by the nth element
If we want to sort by anything other than the first elements, there are several ways.
Below examples are adapted from SOF post on sorting by the nth element of array.
Customize the dunder method lt
We can define a custom list class with a modified __lt__ dunder (or magic) method. In our custom class myList, there is only one function, which is __lt__, where we define how to compare 2 myList objects:
from heapq import heapify
n = [[1, 5, 100],
[2, 6, 50],
[4, 7, 60],
[6, 3, 10]]
class MyList(list):
def __lt__(self, other):
return self[2] < other[2]
q = [MyList(x) for x in n]
print(q)
heapify(q)
print(q)
convert the original list of lists into a list of tuples.
We want to sort the 3rd element of each list. We change our input data structure by mapping our list to a tuple(sort_value, list).
from heapq import heapify, heappop,heappush
A = [[2, 6, 50],
[4, 7, 60],
[1, 5, 100],
[6, 3, 10]]
# change data structure to tuple(sort_value, list)
Q = [(x[2], x) for x in A]
# [(50, [2, 6, 50]), (60, [4, 7, 60]), (100, [1, 5, 100]), (10, [6, 3, 10])]
heapify(Q)
print(Q)
# [(10, [6, 3, 10]), (50, [2, 6, 50]), (100, [1, 5, 100]), (60, [4, 7, 60])]
print(heappop(Q))
# (10, [6, 3, 10])
l = [2, 5, 1]
heappush(Q, (l[2], l))
print(heappop(Q))
# (1, [2, 5, 1])
use heapq for max heap
To use the heapq module for max heap, we can negate the signs if input is an array of integers or floats. For other objects, we can use the customized dunder __lt()__ method.
The heapq._heapify_max function converts an input to a max heap in-place, in \(O(n)\) time. So we have to do it \(n\) times, it will be \(O(n^2)\). So this is not efficient although it works.
from heapq import heapify, heappop, heappush
A = [5, 7 ,3, 9, 2, 8, 1, 4, 6]
_heapify_max(A)
print("max heap", A)
# max heap [9, 7, 8, 6, 2, 3, 1, 4, 5]
print(heappop(A))
print("after poping max")
print(A)
print(heappop(A))
print("after poping the second number, we see that it is no longer max heap")
print(A)
print("need to heapify again")
_heapify_max(A)
print("max heap", A)
k largest problems
k longest strings in a stream
This problem gives us a sequence of strings in a stream, where not permitting back up to read earlier values. It asks us to return k longest strings, ordering is not important as long as they are the k longest.
If we don’t want the length of the strings in the output, instead of returning the min heap, we can return [i[1] for i in minH].
from heapq import heapify, heappop, heappush, heappushpop
import itertools
def top_k(k, stream):
minH = [(len(s), s) for s in itertools.islice(stream, k)] # initialize minH with the first k elements
print(minH)
heapify(minH)
print("after heapify", minH)
for n in stream[k+1:]:
heappushpop(minH, (len(n), n)) # pushpop is more efficient than combining push and pop per heapq documentation
# return [p[1] for p in heapq.nsmallest(k, minH)]
# return heapq.nlargest(k, minH)
return minH
A = ["dadadadadadamamama","so", "come", "a", 'goto', 'Spain','wawawawa']
print("Top k elements are", top_k(2, A))
# [(18, 'dadadadadadamamama'), (2, 'so')]
# after heapify [(2, 'so'), (18, 'dadadadadadamamama')]
# Top k elements are [(8, 'wawawawa'), (18, 'dadadadadadamamama')]
from heapq import heapify, heappop
def kthLargest(A, k):
k = len(A) - k + 1
heapify(A)
for i in range(k- 1):
heappop(A)
return heappop(A)
nums = [3,2,3,1,2,4,5,5,6, 9, 10, 100, 200]
k = 4
print(kthLargest(nums, k))
Below method has nothing to do with heap but is included for reference. It is very similar to quicksort, where elements are compared with pivot. Smaller ones are moved to the left, and bigger ones are moved to the right.
If the pivot index after sorting is larger than k, then continue to sort in the left part.
Else if the pivot index after sorting is smaller than k, then continue to sort in the right part.
In the lucky case the pivot index is the same as k, then return the value.
def findKthLargest(A, k):
k = len(A) - k # we are used to left to right (smallest first)
def quickSelect(l, r):
pivot, p = A[r], l
for i in range(l, r):
if A[i] <= pivot:
A[p], A[i] = A[i], A[p]
p += 1
A[p], A[r] = A[r], A[p]
if p > k:
return quickSelect(l, p - 1)
elif p < k:
return quickSelect(p + 1, r)
else:
return A[p]
return quickSelect(0, len(A) - 1)
nums = [3,2,3,1,2,4,5,5,6, 9, 10, 100, 200]
k = 4
print(findKthLargest(nums, k))
# 9
When to use heap
Use a heap when we only care about the extreme values (largest or smallest), and we don’t need fast lookup, delete or search for arbitrary elements.
For example, when we need the k largest or smallest elements.
The min heap data structure is used in Prim’s algorithm of the minimum spanning tree. See post “Minimum spanning tree 1”. The min heap is also called the “frontier”, where “all new unexplored neighbors of old continent” are kept. The minimum order is based on the distance/cost to visit. To find the next minimum connection, we pop the smallest from the min heap.
Appendix: Implement max heap in Python
Below implementation of the maxHeap object has 3 public functions: push, peek, and pop.
When a maxHeap object is instantiated, the input is transformed into a max heap using the __heapifyUp function.
Both __heapifyUp and __heapifyDown functions are recursive functions. The former compares with “parent” whereas the latter compares with “children”.
The __heapifyUp compares the value at the current index with the value at its parent index, if the value at current index is bigger, then it swaps with “its parent” so that the value at the parent index is bigger.
The __heapifyDown compares the value at the current index with the values at its children indices. If the current value is smaller, then it swaps with “one of its children” so that the value at the child index is smaller, or equivalently the value at the current index is bigger.
.
class maxHeap:
"""
3 private functions:
__swap
__heapifyUp
__heapifyDown
"""
def __init__(self, items= []):
# def __init__(self, items= object):
self.heap = []
for i in items:
self.heap.append(i)
self.__heapifyUp(len(self.heap) - 1)
def push(self, data):
self.heap.append(data)
self.__heapifyUp(len(self.heap) - 1)
def peek(self):
if self.heap[0]:
return self.heap[0]
else:
return False
def pop(self):
if len(self.heap) > 1:
self.__swap(0, len(self.heap) - 1) # swap the max and last element
max = self.heap.pop()
self.__heapifyDown(0) # heapify down the last element that got swapped to the max position
elif len(self.heap) ==1:
max = self.heap.pop()
else:
max = False
return max
def __swap(self, i, j):
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
def __heapifyUp(self, idx):
parent = (idx - 1) //2 # parent index
if idx <= 0:
return # do nothing if already at the top
elif self.heap[idx] > self.heap[parent]: # if larger than parent, then swap with parent
self.__swap(idx, parent)
self.__heapifyUp(parent) # recurse
def __heapifyDown(self, idx): # heapify down until it is not smaller than its children
left = idx *2 +1 # left child index
right = idx * 2 + 2 # right child index
largest = idx # assume current idx holds the largest
if len(self.heap) > left and self.heap[largest] < self.heap[left]:
largest = left
if len(self.heap) > right and self.heap[largest] < self.heap[right]:
largest = right
if largest != idx:
self.__swap(idx, largest)
self.__heapifyDown(largest) # recurse
Below code instantiates a maxHeap object we defined above, and repeatedly pops the maximum.
A = [5, 7,3,9, 2,8, 1, 4,6,]
maxHeap(A).heap
# [9, 7, 8, 6, 2, 3, 1, 4, 5]
M = maxHeap(A)
print(M.heap)
# [9, 7, 8, 6, 2, 3, 1, 4, 5]
M.push(10)
print(M.heap)
# [10, 9, 8, 6, 7, 3, 1, 4, 5, 2]
from collections import deque
A = deque()
for i in range(len(M.heap)):
A.appendleft(M.pop())
print(A) # sorted
# [10, 9, 8, 6, 7, 3, 1, 4, 5, 2]
# deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])