Heaps and Priority Queues
Introduction
A heap is a specialized tree-based data structure that efficiently supports finding and removing the minimum (or maximum) element. Priority queues, typically implemented with heaps, are essential for algorithms like Dijkstra's shortest path and heap sort.
Learning Objectives
By the end of this reading, you will be able to:
- Understand heap properties and types
- Implement a binary heap using an array
- Use priority queues effectively
- Apply heaps to solve problems
1. What is a Heap?
A heap is a complete binary tree where each node satisfies the heap property:
- Min-Heap: Parent ≤ children (root is minimum)
- Max-Heap: Parent ≥ children (root is maximum)
Min-Heap Example
1 Parent ≤ Children
/ \
3 2
/ \ / \
7 4 5 6
Array representation: [1, 3, 2, 7, 4, 5, 6]
Max-Heap Example
9 Parent ≥ Children
/ \
7 8
/ \ / \
3 5 6 4
Array representation: [9, 7, 8, 3, 5, 6, 4]
Key Properties
- Complete Binary Tree: All levels filled except possibly the last, filled left-to-right
- Heap Property: Parent-child relationship maintained
- Not Sorted: Only parent-child relationships guaranteed
- Efficient Operations: Insert and delete in O(log n), find min/max in O(1)
2. Array Representation
Because heaps are complete binary trees, we can efficiently store them in arrays.
Array: [1, 3, 2, 7, 4, 5, 6]
Index: 0 1 2 3 4 5 6
Tree:
1 (index 0)
/ \
3 (1) 2 (2)
/ \ / \
7(3) 4(4) 5(5) 6(6)
Index Relationships
For node at index i:
- Parent:
(i - 1) // 2 - Left Child:
2 * i + 1 - Right Child:
2 * i + 2
def parent(i):
return (i - 1) // 2
def left_child(i):
return 2 * i + 1
def right_child(i):
return 2 * i + 2
3. Heap Implementation
Min-Heap
class MinHeap:
def __init__(self):
self.heap = []
def __len__(self):
return len(self.heap)
def is_empty(self):
return len(self.heap) == 0
def peek(self):
"""Return minimum element: O(1)"""
if self.is_empty():
raise IndexError("Heap is empty")
return self.heap[0]
def push(self, value):
"""Add element: O(log n)"""
self.heap.append(value)
self._sift_up(len(self.heap) - 1)
def pop(self):
"""Remove and return minimum: O(log n)"""
if self.is_empty():
raise IndexError("Heap is empty")
min_val = self.heap[0]
last = self.heap.pop()
if self.heap:
self.heap[0] = last
self._sift_down(0)
return min_val
def _sift_up(self, index):
"""Restore heap property upward"""
parent = (index - 1) // 2
while index > 0 and self.heap[index] < self.heap[parent]:
self.heap[index], self.heap[parent] = self.heap[parent], self.heap[index]
index = parent
parent = (index - 1) // 2
def _sift_down(self, index):
"""Restore heap property downward"""
size = len(self.heap)
while True:
smallest = index
left = 2 * index + 1
right = 2 * index + 2
if left < size and self.heap[left] < self.heap[smallest]:
smallest = left
if right < size and self.heap[right] < self.heap[smallest]:
smallest = right
if smallest == index:
break
self.heap[index], self.heap[smallest] = self.heap[smallest], self.heap[index]
index = smallest
Build Heap from Array
def heapify(arr):
"""Convert array to heap in-place: O(n)"""
n = len(arr)
# Start from last non-leaf node
for i in range(n // 2 - 1, -1, -1):
_sift_down(arr, i, n)
def _sift_down(arr, index, size):
while True:
smallest = index
left = 2 * index + 1
right = 2 * index + 2
if left < size and arr[left] < arr[smallest]:
smallest = left
if right < size and arr[right] < arr[smallest]:
smallest = right
if smallest == index:
break
arr[index], arr[smallest] = arr[smallest], arr[index]
index = smallest
Why O(n)? Most nodes are near the bottom and sift down a short distance.
4. Priority Queue
A priority queue is an abstract data type where elements have priorities, and higher-priority elements are served first.
Operations
| Operation | Description | Heap Time |
|---|---|---|
| insert(item, priority) | Add with priority | O(log n) |
| extract_min/max() | Remove highest priority | O(log n) |
| peek() | View highest priority | O(1) |
| decrease_key() | Increase priority | O(log n) |
Python's heapq Module
import heapq
# Create heap (min-heap)
heap = []
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappush(heap, 4)
heapq.heappush(heap, 1)
heapq.heappush(heap, 5)
print(heap) # [1, 1, 4, 3, 5]
print(heapq.heappop(heap)) # 1
print(heapq.heappop(heap)) # 1
# Convert list to heap
nums = [3, 1, 4, 1, 5, 9, 2, 6]
heapq.heapify(nums) # O(n)
# Peek (smallest element)
print(nums[0])
# Push and pop together
heapq.heappushpop(heap, 0) # More efficient than push then pop
# n smallest/largest
print(heapq.nsmallest(3, [3, 1, 4, 1, 5])) # [1, 1, 3]
print(heapq.nlargest(3, [3, 1, 4, 1, 5])) # [5, 4, 3]
Max-Heap with heapq
Python only provides min-heap, but you can simulate max-heap:
import heapq
# Negate values
max_heap = []
heapq.heappush(max_heap, -5)
heapq.heappush(max_heap, -3)
heapq.heappush(max_heap, -7)
max_val = -heapq.heappop(max_heap) # 7
Priority Queue with Custom Objects
import heapq
class Task:
def __init__(self, priority, name):
self.priority = priority
self.name = name
def __lt__(self, other):
return self.priority < other.priority
tasks = []
heapq.heappush(tasks, Task(3, "Low priority"))
heapq.heappush(tasks, Task(1, "High priority"))
heapq.heappush(tasks, Task(2, "Medium priority"))
while tasks:
task = heapq.heappop(tasks)
print(f"Processing: {task.name}")
Or use tuples:
# (priority, counter, task) - counter breaks ties
import itertools
counter = itertools.count()
heap = []
heapq.heappush(heap, (3, next(counter), "Low"))
heapq.heappush(heap, (1, next(counter), "High"))
heapq.heappush(heap, (1, next(counter), "Also High"))
while heap:
priority, _, task = heapq.heappop(heap)
print(task)
5. Heap Sort
Sort by repeatedly extracting the min/max element.
def heap_sort(arr):
"""Sort array using heap: O(n log n)"""
# Build max-heap
n = len(arr)
for i in range(n // 2 - 1, -1, -1):
sift_down_max(arr, i, n)
# Extract elements one by one
for i in range(n - 1, 0, -1):
arr[0], arr[i] = arr[i], arr[0] # Move max to end
sift_down_max(arr, 0, i)
def sift_down_max(arr, index, size):
while True:
largest = index
left = 2 * index + 1
right = 2 * index + 2
if left < size and arr[left] > arr[largest]:
largest = left
if right < size and arr[right] > arr[largest]:
largest = right
if largest == index:
break
arr[index], arr[largest] = arr[largest], arr[index]
index = largest
Heap Sort Properties:
- Time: O(n log n) always
- Space: O(1) - in-place
- Not stable (relative order of equal elements may change)
6. Common Heap Applications
Find K Largest Elements
import heapq
def k_largest(nums, k):
"""Find k largest elements: O(n log k)"""
# Use min-heap of size k
heap = nums[:k]
heapq.heapify(heap)
for num in nums[k:]:
if num > heap[0]:
heapq.heapreplace(heap, num)
return heap
print(k_largest([3, 1, 4, 1, 5, 9, 2, 6], 3)) # [5, 6, 9]
Find K Smallest Elements
def k_smallest(nums, k):
"""Find k smallest elements: O(n log k)"""
# Use max-heap (negated values) of size k
heap = [-x for x in nums[:k]]
heapq.heapify(heap)
for num in nums[k:]:
if num < -heap[0]:
heapq.heapreplace(heap, -num)
return [-x for x in heap]
Merge K Sorted Lists
import heapq
def merge_k_sorted(lists):
"""Merge k sorted lists: O(n log k)"""
result = []
heap = []
# Initialize with first element from each list
for i, lst in enumerate(lists):
if lst:
heapq.heappush(heap, (lst[0], i, 0))
while heap:
val, list_idx, elem_idx = heapq.heappop(heap)
result.append(val)
# Add next element from same list
if elem_idx + 1 < len(lists[list_idx]):
next_val = lists[list_idx][elem_idx + 1]
heapq.heappush(heap, (next_val, list_idx, elem_idx + 1))
return result
lists = [[1, 4, 7], [2, 5, 8], [3, 6, 9]]
print(merge_k_sorted(lists)) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
Running Median
import heapq
class MedianFinder:
"""Track median of a stream of numbers"""
def __init__(self):
self.lo = [] # Max-heap (negated) for smaller half
self.hi = [] # Min-heap for larger half
def add_num(self, num):
heapq.heappush(self.lo, -num)
# Ensure lo's max ≤ hi's min
heapq.heappush(self.hi, -heapq.heappop(self.lo))
# Balance sizes (lo can have one more)
if len(self.lo) < len(self.hi):
heapq.heappush(self.lo, -heapq.heappop(self.hi))
def find_median(self):
if len(self.lo) > len(self.hi):
return -self.lo[0]
return (-self.lo[0] + self.hi[0]) / 2
mf = MedianFinder()
mf.add_num(1)
mf.add_num(2)
print(mf.find_median()) # 1.5
mf.add_num(3)
print(mf.find_median()) # 2
Task Scheduler
import heapq
from collections import Counter
def least_interval(tasks, n):
"""
Minimum time to complete tasks with cooldown n between same tasks.
"""
counts = list(Counter(tasks).values())
max_heap = [-c for c in counts]
heapq.heapify(max_heap)
time = 0
while max_heap:
cycle = []
for _ in range(n + 1):
if max_heap:
cycle.append(heapq.heappop(max_heap))
for count in cycle:
if count + 1 < 0: # Still tasks remaining
heapq.heappush(max_heap, count + 1)
time += n + 1 if max_heap else len(cycle)
return time
print(least_interval(['A','A','A','B','B','B'], 2)) # 8
7. Heap Complexity Summary
| Operation | Time Complexity |
|---|---|
| Build Heap | O(n) |
| Insert (push) | O(log n) |
| Extract Min/Max (pop) | O(log n) |
| Peek (find min/max) | O(1) |
| Decrease Key | O(log n) |
| Delete arbitrary | O(log n) |
Space: O(n)
8. Indexed Priority Queue (Advanced)
An indexed priority queue allows updating priorities by key.
class IndexedMinPQ:
"""Priority queue with O(log n) decrease_key"""
def __init__(self, max_size):
self.max_size = max_size
self.n = 0
self.keys = [None] * max_size # key[i] = priority of i
self.pq = [-1] * (max_size + 1) # pq[i] = index at heap position i
self.qp = [-1] * max_size # qp[i] = heap position of index i
def contains(self, i):
return self.qp[i] != -1
def insert(self, i, key):
self.n += 1
self.qp[i] = self.n
self.pq[self.n] = i
self.keys[i] = key
self._swim(self.n)
def decrease_key(self, i, key):
self.keys[i] = key
self._swim(self.qp[i])
def del_min(self):
min_idx = self.pq[1]
self._swap(1, self.n)
self.n -= 1
self._sink(1)
self.qp[min_idx] = -1
self.keys[min_idx] = None
return min_idx
def _swim(self, k):
while k > 1 and self._greater(k // 2, k):
self._swap(k, k // 2)
k = k // 2
def _sink(self, k):
while 2 * k <= self.n:
j = 2 * k
if j < self.n and self._greater(j, j + 1):
j += 1
if not self._greater(k, j):
break
self._swap(k, j)
k = j
def _greater(self, i, j):
return self.keys[self.pq[i]] > self.keys[self.pq[j]]
def _swap(self, i, j):
self.pq[i], self.pq[j] = self.pq[j], self.pq[i]
self.qp[self.pq[i]] = i
self.qp[self.pq[j]] = j
Used in Dijkstra's and Prim's algorithms for O(E log V) complexity.
Exercises
Basic
Implement a max-heap with push, pop, and peek.
Given an array, find the 3rd largest element.
Sort an array using heap sort.
Intermediate
Given a stream of integers, find the median at each point.
Merge k sorted arrays into one sorted array.
Given an array, find the k most frequent elements.
Advanced
Implement a priority queue that supports both insert and delete by key.
Given n ropes with lengths, find the minimum cost to connect all ropes (cost = sum of lengths being connected).
Implement a data structure that returns the maximum element in a sliding window of size k.
Summary
- Heaps are complete binary trees with parent-child ordering
- Min-heap: parent ≤ children; Max-heap: parent ≥ children
- Efficient array representation using index formulas
- O(1) for peek, O(log n) for insert and delete
- Priority queues are built on heaps
- Python's heapq module provides min-heap operations
- Applications: sorting, k-th element, merge sorted lists, scheduling