Heaps and Priority Queues

Introduction

A heap is a specialized tree-based data structure that efficiently supports finding and removing the minimum (or maximum) element. Priority queues, typically implemented with heaps, are essential for algorithms like Dijkstra's shortest path and heap sort.

Learning Objectives

By the end of this reading, you will be able to:

  • Understand heap properties and types
  • Implement a binary heap using an array
  • Use priority queues effectively
  • Apply heaps to solve problems

1. What is a Heap?

A heap is a complete binary tree where each node satisfies the heap property:

  • Min-Heap: Parent ≤ children (root is minimum)
  • Max-Heap: Parent ≥ children (root is maximum)

Min-Heap Example

          1            Parent ≤ Children
         / \
        3   2
       / \ / \
      7  4 5  6

Array representation: [1, 3, 2, 7, 4, 5, 6]

Max-Heap Example

          9            Parent ≥ Children
         / \
        7   8
       / \ / \
      3  5 6  4

Array representation: [9, 7, 8, 3, 5, 6, 4]

Key Properties

  1. Complete Binary Tree: All levels filled except possibly the last, filled left-to-right
  2. Heap Property: Parent-child relationship maintained
  3. Not Sorted: Only parent-child relationships guaranteed
  4. Efficient Operations: Insert and delete in O(log n), find min/max in O(1)

2. Array Representation

Because heaps are complete binary trees, we can efficiently store them in arrays.

Array:    [1, 3, 2, 7, 4, 5, 6]
Index:     0  1  2  3  4  5  6

Tree:
              1 (index 0)
            /   \
       3 (1)     2 (2)
       /   \     /   \
    7(3)  4(4) 5(5) 6(6)

Index Relationships

For node at index i:

  • Parent: (i - 1) // 2
  • Left Child: 2 * i + 1
  • Right Child: 2 * i + 2
def parent(i):
    return (i - 1) // 2

def left_child(i):
    return 2 * i + 1

def right_child(i):
    return 2 * i + 2

3. Heap Implementation

Min-Heap

class MinHeap:
    def __init__(self):
        self.heap = []

    def __len__(self):
        return len(self.heap)

    def is_empty(self):
        return len(self.heap) == 0

    def peek(self):
        """Return minimum element: O(1)"""
        if self.is_empty():
            raise IndexError("Heap is empty")
        return self.heap[0]

    def push(self, value):
        """Add element: O(log n)"""
        self.heap.append(value)
        self._sift_up(len(self.heap) - 1)

    def pop(self):
        """Remove and return minimum: O(log n)"""
        if self.is_empty():
            raise IndexError("Heap is empty")

        min_val = self.heap[0]
        last = self.heap.pop()

        if self.heap:
            self.heap[0] = last
            self._sift_down(0)

        return min_val

    def _sift_up(self, index):
        """Restore heap property upward"""
        parent = (index - 1) // 2

        while index > 0 and self.heap[index] < self.heap[parent]:
            self.heap[index], self.heap[parent] = self.heap[parent], self.heap[index]
            index = parent
            parent = (index - 1) // 2

    def _sift_down(self, index):
        """Restore heap property downward"""
        size = len(self.heap)

        while True:
            smallest = index
            left = 2 * index + 1
            right = 2 * index + 2

            if left < size and self.heap[left] < self.heap[smallest]:
                smallest = left
            if right < size and self.heap[right] < self.heap[smallest]:
                smallest = right

            if smallest == index:
                break

            self.heap[index], self.heap[smallest] = self.heap[smallest], self.heap[index]
            index = smallest

Build Heap from Array

def heapify(arr):
    """Convert array to heap in-place: O(n)"""
    n = len(arr)

    # Start from last non-leaf node
    for i in range(n // 2 - 1, -1, -1):
        _sift_down(arr, i, n)

def _sift_down(arr, index, size):
    while True:
        smallest = index
        left = 2 * index + 1
        right = 2 * index + 2

        if left < size and arr[left] < arr[smallest]:
            smallest = left
        if right < size and arr[right] < arr[smallest]:
            smallest = right

        if smallest == index:
            break

        arr[index], arr[smallest] = arr[smallest], arr[index]
        index = smallest

Why O(n)? Most nodes are near the bottom and sift down a short distance.


4. Priority Queue

A priority queue is an abstract data type where elements have priorities, and higher-priority elements are served first.

Operations

OperationDescriptionHeap Time
insert(item, priority)Add with priorityO(log n)
extract_min/max()Remove highest priorityO(log n)
peek()View highest priorityO(1)
decrease_key()Increase priorityO(log n)

Python's heapq Module

import heapq

# Create heap (min-heap)
heap = []
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappush(heap, 4)
heapq.heappush(heap, 1)
heapq.heappush(heap, 5)

print(heap)                # [1, 1, 4, 3, 5]
print(heapq.heappop(heap)) # 1
print(heapq.heappop(heap)) # 1

# Convert list to heap
nums = [3, 1, 4, 1, 5, 9, 2, 6]
heapq.heapify(nums)        # O(n)

# Peek (smallest element)
print(nums[0])

# Push and pop together
heapq.heappushpop(heap, 0) # More efficient than push then pop

# n smallest/largest
print(heapq.nsmallest(3, [3, 1, 4, 1, 5]))  # [1, 1, 3]
print(heapq.nlargest(3, [3, 1, 4, 1, 5]))   # [5, 4, 3]

Max-Heap with heapq

Python only provides min-heap, but you can simulate max-heap:

import heapq

# Negate values
max_heap = []
heapq.heappush(max_heap, -5)
heapq.heappush(max_heap, -3)
heapq.heappush(max_heap, -7)

max_val = -heapq.heappop(max_heap)  # 7

Priority Queue with Custom Objects

import heapq

class Task:
    def __init__(self, priority, name):
        self.priority = priority
        self.name = name

    def __lt__(self, other):
        return self.priority < other.priority

tasks = []
heapq.heappush(tasks, Task(3, "Low priority"))
heapq.heappush(tasks, Task(1, "High priority"))
heapq.heappush(tasks, Task(2, "Medium priority"))

while tasks:
    task = heapq.heappop(tasks)
    print(f"Processing: {task.name}")

Or use tuples:

# (priority, counter, task) - counter breaks ties
import itertools

counter = itertools.count()
heap = []

heapq.heappush(heap, (3, next(counter), "Low"))
heapq.heappush(heap, (1, next(counter), "High"))
heapq.heappush(heap, (1, next(counter), "Also High"))

while heap:
    priority, _, task = heapq.heappop(heap)
    print(task)

5. Heap Sort

Sort by repeatedly extracting the min/max element.

def heap_sort(arr):
    """Sort array using heap: O(n log n)"""
    # Build max-heap
    n = len(arr)
    for i in range(n // 2 - 1, -1, -1):
        sift_down_max(arr, i, n)

    # Extract elements one by one
    for i in range(n - 1, 0, -1):
        arr[0], arr[i] = arr[i], arr[0]  # Move max to end
        sift_down_max(arr, 0, i)

def sift_down_max(arr, index, size):
    while True:
        largest = index
        left = 2 * index + 1
        right = 2 * index + 2

        if left < size and arr[left] > arr[largest]:
            largest = left
        if right < size and arr[right] > arr[largest]:
            largest = right

        if largest == index:
            break

        arr[index], arr[largest] = arr[largest], arr[index]
        index = largest

Heap Sort Properties:

  • Time: O(n log n) always
  • Space: O(1) - in-place
  • Not stable (relative order of equal elements may change)

6. Common Heap Applications

Find K Largest Elements

import heapq

def k_largest(nums, k):
    """Find k largest elements: O(n log k)"""
    # Use min-heap of size k
    heap = nums[:k]
    heapq.heapify(heap)

    for num in nums[k:]:
        if num > heap[0]:
            heapq.heapreplace(heap, num)

    return heap

print(k_largest([3, 1, 4, 1, 5, 9, 2, 6], 3))  # [5, 6, 9]

Find K Smallest Elements

def k_smallest(nums, k):
    """Find k smallest elements: O(n log k)"""
    # Use max-heap (negated values) of size k
    heap = [-x for x in nums[:k]]
    heapq.heapify(heap)

    for num in nums[k:]:
        if num < -heap[0]:
            heapq.heapreplace(heap, -num)

    return [-x for x in heap]

Merge K Sorted Lists

import heapq

def merge_k_sorted(lists):
    """Merge k sorted lists: O(n log k)"""
    result = []
    heap = []

    # Initialize with first element from each list
    for i, lst in enumerate(lists):
        if lst:
            heapq.heappush(heap, (lst[0], i, 0))

    while heap:
        val, list_idx, elem_idx = heapq.heappop(heap)
        result.append(val)

        # Add next element from same list
        if elem_idx + 1 < len(lists[list_idx]):
            next_val = lists[list_idx][elem_idx + 1]
            heapq.heappush(heap, (next_val, list_idx, elem_idx + 1))

    return result

lists = [[1, 4, 7], [2, 5, 8], [3, 6, 9]]
print(merge_k_sorted(lists))  # [1, 2, 3, 4, 5, 6, 7, 8, 9]

Running Median

import heapq

class MedianFinder:
    """Track median of a stream of numbers"""

    def __init__(self):
        self.lo = []  # Max-heap (negated) for smaller half
        self.hi = []  # Min-heap for larger half

    def add_num(self, num):
        heapq.heappush(self.lo, -num)

        # Ensure lo's max ≤ hi's min
        heapq.heappush(self.hi, -heapq.heappop(self.lo))

        # Balance sizes (lo can have one more)
        if len(self.lo) < len(self.hi):
            heapq.heappush(self.lo, -heapq.heappop(self.hi))

    def find_median(self):
        if len(self.lo) > len(self.hi):
            return -self.lo[0]
        return (-self.lo[0] + self.hi[0]) / 2

mf = MedianFinder()
mf.add_num(1)
mf.add_num(2)
print(mf.find_median())  # 1.5
mf.add_num(3)
print(mf.find_median())  # 2

Task Scheduler

import heapq
from collections import Counter

def least_interval(tasks, n):
    """
    Minimum time to complete tasks with cooldown n between same tasks.
    """
    counts = list(Counter(tasks).values())
    max_heap = [-c for c in counts]
    heapq.heapify(max_heap)

    time = 0
    while max_heap:
        cycle = []
        for _ in range(n + 1):
            if max_heap:
                cycle.append(heapq.heappop(max_heap))

        for count in cycle:
            if count + 1 < 0:  # Still tasks remaining
                heapq.heappush(max_heap, count + 1)

        time += n + 1 if max_heap else len(cycle)

    return time

print(least_interval(['A','A','A','B','B','B'], 2))  # 8

7. Heap Complexity Summary

OperationTime Complexity
Build HeapO(n)
Insert (push)O(log n)
Extract Min/Max (pop)O(log n)
Peek (find min/max)O(1)
Decrease KeyO(log n)
Delete arbitraryO(log n)

Space: O(n)


8. Indexed Priority Queue (Advanced)

An indexed priority queue allows updating priorities by key.

class IndexedMinPQ:
    """Priority queue with O(log n) decrease_key"""

    def __init__(self, max_size):
        self.max_size = max_size
        self.n = 0
        self.keys = [None] * max_size       # key[i] = priority of i
        self.pq = [-1] * (max_size + 1)     # pq[i] = index at heap position i
        self.qp = [-1] * max_size           # qp[i] = heap position of index i

    def contains(self, i):
        return self.qp[i] != -1

    def insert(self, i, key):
        self.n += 1
        self.qp[i] = self.n
        self.pq[self.n] = i
        self.keys[i] = key
        self._swim(self.n)

    def decrease_key(self, i, key):
        self.keys[i] = key
        self._swim(self.qp[i])

    def del_min(self):
        min_idx = self.pq[1]
        self._swap(1, self.n)
        self.n -= 1
        self._sink(1)
        self.qp[min_idx] = -1
        self.keys[min_idx] = None
        return min_idx

    def _swim(self, k):
        while k > 1 and self._greater(k // 2, k):
            self._swap(k, k // 2)
            k = k // 2

    def _sink(self, k):
        while 2 * k <= self.n:
            j = 2 * k
            if j < self.n and self._greater(j, j + 1):
                j += 1
            if not self._greater(k, j):
                break
            self._swap(k, j)
            k = j

    def _greater(self, i, j):
        return self.keys[self.pq[i]] > self.keys[self.pq[j]]

    def _swap(self, i, j):
        self.pq[i], self.pq[j] = self.pq[j], self.pq[i]
        self.qp[self.pq[i]] = i
        self.qp[self.pq[j]] = j

Used in Dijkstra's and Prim's algorithms for O(E log V) complexity.


Exercises

Basic

  1. Implement a max-heap with push, pop, and peek.

  2. Given an array, find the 3rd largest element.

  3. Sort an array using heap sort.

Intermediate

  1. Given a stream of integers, find the median at each point.

  2. Merge k sorted arrays into one sorted array.

  3. Given an array, find the k most frequent elements.

Advanced

  1. Implement a priority queue that supports both insert and delete by key.

  2. Given n ropes with lengths, find the minimum cost to connect all ropes (cost = sum of lengths being connected).

  3. Implement a data structure that returns the maximum element in a sliding window of size k.


Summary

  • Heaps are complete binary trees with parent-child ordering
  • Min-heap: parent ≤ children; Max-heap: parent ≥ children
  • Efficient array representation using index formulas
  • O(1) for peek, O(log n) for insert and delete
  • Priority queues are built on heaps
  • Python's heapq module provides min-heap operations
  • Applications: sorting, k-th element, merge sorted lists, scheduling

Next Reading

Graphs →