跳到主要内容
预计阅读 32 分钟

谁是最重要的 —— 堆与优先队列

在医院急诊室里,不是谁先来谁先看,而是谁最危急谁先看。这种”按优先级服务”的思想,正是堆和优先队列的核心——永远让最重要的那个站在最前面。

📋 开篇自测:你已经知道多少?

  1. 普通队列是”先进先出”,那”优先队列”和它有什么区别?你能想到生活中优先队列的例子吗?
  2. 堆是一棵完全二叉树,但它却可以用数组来存储——这是怎么做到的?
  3. 从100万个数字中找出最大的10个,你觉得最高效的方法是什么?需要把所有数字排序吗?

一、从急诊室到操作系统——为什么需要优先队列

前面我们学过队列:先进先出,公平有序。但现实中很多场景不需要”公平”,而需要”高效地找到最重要的”:

  • 操作系统进程调度: 高优先级的进程应该先执行,而不是排队等着
  • 网络路由器: 视频通话的数据包比邮件的优先级更高
  • 游戏引擎: 每一帧都要处理离玩家最近的碰撞事件
  • 医院急诊室: 心脏骤停的患者不能排在感冒患者后面

如果用普通数组来实现优先队列——每次插入O(1),但取出最高优先级的元素要扫描整个数组O(n)。或者用有序数组——取出是O(1)了,但插入要找到正确位置O(n)。两种方案都不够好。

有没有一种数据结构,插入和取出都能在O(log n)时间内完成?这就是堆(Heap)


二、堆的本质——一棵有规矩的完全二叉树

堆是一棵完全二叉树,并且满足堆性质

  • 最小堆(Min-Heap): 每个节点的值都小于等于它的子节点。根节点是全局最小值。
  • 最大堆(Max-Heap): 每个节点的值都大于等于它的子节点。根节点是全局最大值。

你可以把最小堆想象成一家公司的”辈分图”——辈分最高(值最小)的祖辈在最顶上,每个人的辈分都比自己的后辈高,但同辈之间没有大小之分。

        最小堆                      最大堆
          1                          9
        /   \                      /   \
       3     2                    7     8
      / \   /                   / \   /
     7   6 5                   3   6 5

规则:父 <= 子               规则:父 >= 子

注意一个关键区别:堆和二叉搜索树(BST)不一样。BST要求”左小右大”,而堆只要求”父比子小(或大)“。堆的左右子节点之间没有大小关系。这意味着堆的中序遍历并不是有序的。

完全二叉树——为什么堆非它不可

完全二叉树是指除了最后一层外,每一层都被填满了,且最后一层的节点从左到右连续排列,中间不能有空缺。

为什么堆一定要是完全二叉树?因为完全二叉树有一个无可替代的优势——可以用数组完美存储,不需要任何指针。


三、数组里藏着一棵树——堆的存储方式

把完全二叉树按层序(从上到下、从左到右)编号,每个节点的编号就是它在数组中的索引。父子之间的索引关系非常简洁:

树形结构:
            1 (索引0)
          /     \
       3 (索引1)  2 (索引2)
       / \        /
    7(索引3) 6(索引4) 5(索引5)

数组存储:
索引:  0  1  2  3  4  5
值:  [ 1, 3, 2, 7, 6, 5 ]

父子关系公式(索引从0开始):
  父节点索引    = (i - 1) // 2
  左子节点索引  = 2 * i + 1
  右子节点索引  = 2 * i + 2

验证一下:索引1的节点值是3,它的左子节点索引 = 2×1+1 = 3,值是7;右子节点索引 = 2×1+2 = 4,值是6。它的父节点索引 = (1-1)//2 = 0,值是1。完美对应。

用数组存储堆有三个好处:

  1. 省空间: 不需要存储左右子节点的指针
  2. 缓存友好: 数组在内存中连续存放,CPU缓存命中率高
  3. 计算快: 找父子节点只需要简单的加减乘除

🤔 想一想 如果一个最小堆的数组表示是 [2, 5, 3, 8, 7, 6],请画出对应的树形结构。然后思考:数组中的第一个元素一定是最小值吗?最后一个元素一定是最大值吗?


四、堆的核心操作——上浮与下沉

堆只有两个核心操作,理解了它们,整个堆的世界就打通了。

上浮(Sift Up)——新兵入伍,找到自己的位置

当你往堆底插入一个新元素后,它可能比父节点还小(在最小堆中),违反了堆性质。怎么办?让它一路和父节点交换,直到找到合适的位置。

插入元素0到最小堆:

  Step 1: 放到末尾       Step 2: 0 < 6, 交换       Step 3: 0 < 2, 交换
        1                       1                          0
      /   \                   /   \                      /   \
     3     2                 3     0                    3     1
    / \   / \               / \   / \                  / \   / \
   7   6 5   0             7   6 5   2                7   6 5   2

   0继续上浮...             0到达根节点,停止

下沉(Sift Down)——将军退位,选出新领袖

当你移除堆顶元素后,需要用堆底最后一个元素填到堆顶,然后让它一路和较小的子节点交换,直到找到合适的位置。

移除堆顶的1:

  Step 1: 末尾填到顶部    Step 2: 5 > 2, 与2交换    Step 3: 5 > 未违反, 停止
        5                       2                          2
      /   \                   /   \                      /   \
     3     2                 3     5                    3     5
    / \                     / \                        / \
   7   6                   7   6                      7   6

五、从零实现一个最小堆

理解了上浮和下沉,实现就是水到渠成的事情。

class MinHeap:
    def __init__(self):
        self.data = []

    def size(self):
        """获取堆的大小"""
        return len(self.data)

    def peek(self):
        """查看堆顶元素(最小值)"""
        if not self.data:
            raise IndexError("堆为空")
        return self.data[0]

    def push(self, val):
        """插入元素"""
        self.data.append(val)          # 先放到末尾
        self._sift_up(len(self.data) - 1)  # 再上浮到正确位置

    def pop(self):
        """取出堆顶元素(最小值)"""
        if not self.data:
            raise IndexError("堆为空")
        top = self.data[0]
        last = self.data.pop()
        if self.data:
            self.data[0] = last        # 末尾元素填到顶部
            self._sift_down(0)         # 下沉到正确位置
        return top

    def _sift_up(self, idx):
        """上浮操作"""
        while idx > 0:
            parent_idx = (idx - 1) // 2
            if self.data[idx] >= self.data[parent_idx]:
                break  # 已满足堆性质
            # 交换
            self.data[idx], self.data[parent_idx] = self.data[parent_idx], self.data[idx]
            idx = parent_idx

    def _sift_down(self, idx):
        """下沉操作"""
        n = len(self.data)
        while True:
            smallest = idx
            left = 2 * idx + 1
            right = 2 * idx + 2
            if left < n and self.data[left] < self.data[smallest]:
                smallest = left
            if right < n and self.data[right] < self.data[smallest]:
                smallest = right
            if smallest == idx:
                break  # 当前节点已经最小,停止
            self.data[idx], self.data[smallest] = self.data[smallest], self.data[idx]
            idx = smallest

# 验证
heap = MinHeap()
for v in [5, 3, 8, 1, 2, 7]:
    heap.push(v)

sorted_result = []
while heap.size() > 0:
    sorted_result.append(heap.pop())
print(sorted_result)  # [1, 2, 3, 5, 7, 8]——从小到大弹出

每次push和pop的时间复杂度都是 O(log n),因为上浮和下沉最多经过树的高度层,而完全二叉树的高度是log₂(n)。peek是 O(1),直接读数组第一个元素。


六、堆排序——用堆的特性给数组排序

既然堆顶永远是最大(或最小)的元素,反复取出堆顶不就能得到有序序列了吗?这就是堆排序的基本思路。

堆排序分两步:

第一步:建堆。 把无序数组变成一个最大堆。从最后一个非叶节点开始,逐个执行下沉操作。为什么从后往前?因为叶子节点天然满足堆性质(没有子节点),不需要调整。

第二步:排序。 反复把堆顶(最大值)和末尾元素交换,然后缩小堆的范围,对新的堆顶执行下沉。

def heap_sort(arr):
    n = len(arr)

    def sift_down(start, end):
        """下沉操作(最大堆版本)"""
        parent = start
        while True:
            child = 2 * parent + 1
            if child > end:
                break
            # 选较大的子节点
            if child + 1 <= end and arr[child + 1] > arr[child]:
                child = child + 1
            if arr[parent] >= arr[child]:
                break
            arr[parent], arr[child] = arr[child], arr[parent]
            parent = child

    # 第一步:建最大堆(从最后一个非叶节点开始下沉)
    for i in range(n // 2 - 1, -1, -1):
        sift_down(i, n - 1)

    # 第二步:逐个取出堆顶放到末尾
    for i in range(n - 1, 0, -1):
        arr[0], arr[i] = arr[i], arr[0]  # 堆顶(最大值)放到末尾
        sift_down(0, i - 1)              # 缩小范围,重新调整堆

    return arr

print(heap_sort([12, 11, 13, 5, 6, 7]))  # [5, 6, 7, 11, 12, 13]

建堆的时间复杂度——没你想的那么高

很多人以为建堆是O(n log n)——n个元素各下沉log(n)层。但实际上建堆只需要 O(n)

为什么?因为大部分节点在树的底层,而底层节点下沉的距离很短。具体来说:叶子节点(约n/2个)不需要下沉,倒数第二层的节点(约n/4个)最多下沉1层,倒数第三层(约n/8个)最多下沉2层……总的操作次数是 n/4×1 + n/8×2 + n/16×3 + … ≈ n。

堆排序整体的时间复杂度:建堆O(n) + 排序O(n log n) = O(n log n)。空间复杂度是 O(1)——全程在原数组上操作,不需要额外空间。

🤔 想一想 堆排序的时间复杂度在任何情况下都是O(n log n),空间复杂度是O(1)。这两点都很优秀,但为什么在实际工程中,快速排序通常比堆排序更受欢迎?(提示:想想CPU缓存和数据访问模式。)


七、优先队列——堆最闪耀的舞台

堆的最核心应用就是优先队列(Priority Queue)——一种每次取出的都是优先级最高(或最低)元素的数据结构。

从使用者的角度看,优先队列只暴露三个操作:

操作时间复杂度说明
push(插入)O(log n)新元素入队
pop(取最值)O(log n)取出优先级最高的元素
peek(看最值)O(1)查看但不取出

对比其他数据结构实现优先队列的效率:

底层结构插入取出最值查看最值
无序数组O(1)O(n)O(n)
有序数组O(n)O(1)O(1)
O(log n)O(log n)O(1)

堆在插入和取出上取得了漂亮的平衡,这就是它成为优先队列首选实现的原因。


八、经典题目实战

题目1:Top-K问题——从海量数据中找出最大的K个

问题: 给定一个包含n个元素的数组,找出其中最大的k个元素。

初学者的想法: 先排序,再取前k个。时间复杂度O(n log n)。

更好的方法: 维护一个大小为k的最小堆。遍历数组,如果当前元素比堆顶大,就替换堆顶并调整堆。遍历完成后,堆中就是最大的k个元素。

为什么用最小堆而不是最大堆?因为最小堆的堆顶是k个候选者中最小的,新来一个元素只需要和这个”门槛”比较——比门槛高就有资格进来,踢掉当前最小的。

def top_k(arr, k):
    heap = MinHeap()

    for val in arr:
        if heap.size() < k:
            heap.push(val)      # 堆还没满,直接放进去
        elif val > heap.peek():
            heap.pop()          # 踢掉最小的
            heap.push(val)      # 放入更大的

    # 堆中就是最大的k个元素
    result = []
    while heap.size() > 0:
        result.append(heap.pop())
    return result

print(top_k([3, 1, 5, 12, 2, 11, 9, 7, 8], 4))
# [8, 9, 11, 12](最大的4个,顺序可能不同)

时间复杂度: O(n log k)——遍历n个元素,每次堆操作O(log k)。当k远小于n时,这比排序的O(n log n)快很多。 空间复杂度: O(k)——只需要存k个元素的堆。

想象你是选秀节目的评委,有10万人来报名,你只需要选出10个晋级。你不需要给10万人排名次,只需要维护一个”前10名”的名单——每来一个新选手,跟名单里最差的比一下就行。

题目2:合并K个有序链表

问题: 给定k个已排序的链表,将它们合并成一个有序链表。

暴力方法: 两两合并,第一个和第二个合并,结果再和第三个合并……时间复杂度O(Nk),其中N是所有节点总数。

堆的思路: 把k个链表的当前头节点放进最小堆。每次从堆中取出最小的节点接到结果链表上,然后把这个节点的下一个节点放进堆中。

import heapq

class ListNode:
    """链表节点"""
    def __init__(self, val=0, next=None):
        self.val = val
        self.next = next

def merge_k_lists(lists):
    heap = []
    dummy = ListNode(0)  # 哨兵节点
    current = dummy

    # 把每个链表的头节点放入堆
    # 用 (值, 序号) 元组避免节点之间无法比较的问题
    for i, head in enumerate(lists):
        if head:
            heapq.heappush(heap, (head.val, i, head))

    while heap:
        val, i, node = heapq.heappop(heap)  # 取出最小节点
        current.next = node
        current = current.next
        if node.next:
            heapq.heappush(heap, (node.next.val, i, node.next))  # 把下一个节点放入堆

    return dummy.next

# 辅助函数:用列表构建链表
def build_list(values):
    dummy = ListNode(0)
    curr = dummy
    for v in values:
        curr.next = ListNode(v)
        curr = curr.next
    return dummy.next

# 构造测试数据
# 链表1: 1 -> 4 -> 7
# 链表2: 2 -> 5 -> 8
# 链表3: 3 -> 6 -> 9
l1 = build_list([1, 4, 7])
l2 = build_list([2, 5, 8])
l3 = build_list([3, 6, 9])

merged = merge_k_lists([l1, l2, l3])
result = []
while merged:
    result.append(merged.val)
    merged = merged.next
print(result)  # [1, 2, 3, 4, 5, 6, 7, 8, 9]

时间复杂度: O(N log k)——一共N个节点,每次堆操作O(log k)。 空间复杂度: O(k)——堆中始终最多k个节点。

题目3:数据流的中位数——双堆的精妙配合

问题: 设计一个数据结构,支持两个操作:(1)插入一个数字;(2)返回当前所有数字的中位数。

这是堆最精妙的应用之一。思路是用两个堆把数据分成两半:

  • 左半部分用一个最大堆,存较小的那一半数字,堆顶是左半部分最大的
  • 右半部分用一个最小堆,存较大的那一半数字,堆顶是右半部分最小的

中位数就在两个堆顶之间:如果总数是奇数,较多元素的那个堆的堆顶就是中位数;如果是偶数,两个堆顶的平均值就是中位数。

数据流: 5, 2, 8, 1, 7, 3

插入5:  左最大堆 [5]        右最小堆 []           中位数 = 5
插入2:  左最大堆 [2]        右最小堆 [5]          中位数 = (2+5)/2 = 3.5
插入8:  左最大堆 [2]        右最小堆 [5, 8]       中位数 = 5
插入1:  左最大堆 [1, 2]     右最小堆 [5, 8]       中位数 = (2+5)/2 = 3.5
插入7:  左最大堆 [1, 2]     右最小堆 [5, 7, 8]    中位数 = 5
插入3:  左最大堆 [1, 2, 3]  右最小堆 [5, 7, 8]    中位数 = (3+5)/2 = 4
import heapq

class MedianFinder:
    def __init__(self):
        self.lo = []  # 左半部分,最大堆(用取反模拟),堆顶是左边最大的
        self.hi = []  # 右半部分,最小堆,堆顶是右边最小的

    def add_num(self, num):
        # 规则:先加入左边,再平衡
        heapq.heappush(self.lo, -num)

        # 保证左边最大值 <= 右边最小值
        if self.hi and -self.lo[0] > self.hi[0]:
            heapq.heappush(self.hi, -heapq.heappop(self.lo))
        elif len(self.lo) > len(self.hi) + 1:
            # 保持两个堆的大小差不超过1(左边最多比右边多1个)
            heapq.heappush(self.hi, -heapq.heappop(self.lo))

        # 如果右边反而更多,挪一个回左边
        if len(self.hi) > len(self.lo):
            heapq.heappush(self.lo, -heapq.heappop(self.hi))

    def find_median(self):
        if len(self.lo) > len(self.hi):
            return -self.lo[0]  # 奇数个,左边多一个
        return (-self.lo[0] + self.hi[0]) / 2  # 偶数个,取平均

# 验证
mf = MedianFinder()
mf.add_num(5); print(mf.find_median())  # 5
mf.add_num(2); print(mf.find_median())  # 3.5
mf.add_num(8); print(mf.find_median())  # 5
mf.add_num(1); print(mf.find_median())  # 3.5
mf.add_num(7); print(mf.find_median())  # 5
mf.add_num(3); print(mf.find_median())  # 4

时间复杂度: 插入O(log n),查询中位数O(1)。 空间复杂度: O(n)。

这道题的精妙之处在于——你不需要维护一个完整的有序序列,只需要知道”中间那个值”在哪里。两个堆像两只手,一左一右托住数据,中间的缝隙就是中位数。


九、堆操作的时间复杂度总结

操作时间复杂度说明
建堆(heapify)O(n)从无序数组构建堆
插入(push)O(log n)放到末尾 + 上浮
取出堆顶(pop)O(log n)末尾填顶 + 下沉
查看堆顶(peek)O(1)直接读数组第0个元素
堆排序O(n log n)建堆 + n次取出堆顶

⚠️ 常见误区

  • 误区1:“堆排序比快排好,因为最坏情况也是O(n log n)” —— 理论上确实如此。但堆排序在取堆顶后需要下沉,访问的数组位置跳跃很大(父子节点索引可能相差很远),对CPU缓存极不友好。快排虽然最坏O(n²),但平均情况下数据访问局部性好,实际跑得更快。
  • 误区2:“最小堆中最大的元素一定在最后一个位置” —— 不一定。最大元素一定在叶子节点中,但叶子节点有多个,最大值可能在其中任何一个。
  • 误区3:“堆就是优先队列” —— 堆是一种数据结构,优先队列是一种抽象概念。优先队列也可以用其他结构实现(如有序数组、平衡树),只是堆是最常用的实现方式。

📝 掌握度自测

  1. 概念题: 最大堆和最小堆的区别是什么?在什么场景下应该用最大堆,什么场景下用最小堆?
  2. 手算题: 将 [4, 10, 3, 5, 1, 8] 逐个插入一个空的最小堆,画出每一步插入后的堆结构(可以画树形,也可以写数组)。
  3. 分析题: 为什么建堆操作的时间复杂度是O(n)而不是O(n log n)?请用自己的话解释。
  4. 编码题: 用最小堆实现一个函数,接收一个整数数组和一个整数k,返回数组中第k小的元素。
  5. 设计题: 如果你要设计一个任务调度系统——任务有不同的优先级,每次取出优先级最高的任务执行,新任务可以随时加入——你会怎么设计?用什么数据结构?

💡 自我评估

  • 全部答对:堆和优先队列已经是你的得力工具了,遇到Top-K和调度类问题可以信手拈来。
  • 答对3-4题:核心概念已经掌握,建议动手实现一次堆排序加深印象。
  • 答对1-2题:回到”上浮”和”下沉”部分重新阅读,这是理解堆的关键。
  • 全部答错:不要着急,建议先在纸上模拟堆的插入和删除过程,画出每一步的树形结构。可视化是理解堆最好的方式。

堆让我们能高效地管理”优先级”,但还有两个专门化的数据结构同样值得了解。下一章是本课程的最后一个知识章节——我们来认识 Trie(字典树)和并查集。Trie 为前缀搜索而生,能让自动补全和拼写检查飞快运行;并查集则擅长快速判断”两个元素是否属于同一组”,在社交网络连通性和岛屿计数等问题中表现出色。

购买课程解锁全部内容

刷题到通关:数据结构与算法面试实战

¥29.90