跳到主要内容
预计阅读 22 分钟

并发编程 —— 让CPU的每一个核心都忙起来

现代CPU动辄8核、16核,但如果你的程序只用一个核,那剩下的核心就在”摸鱼”。并发编程就是让多个任务同时运行的技术。在C/C++中,并发编程是出了名的”雷区”——数据竞争、死锁、竞态条件,个个都是噩梦。Rust用它的类型系统在编译期就帮你排除了大部分并发bug,让你可以放心地写并发代码。

📋 开篇自测:你已经知道多少?

  1. 你能区分”并发”和”并行”的概念吗?
  2. 什么是数据竞争(data race)?它为什么危险?
  3. async/await和线程的区别是什么?各适合什么场景?

一、并发与并行——先把概念理清

1.1 并发 vs 并行

这两个词经常被混用,但含义不同:

  • 并发(Concurrency):多个任务在同一时间段内交替执行。就像一个厨师同时做三道菜——切一会儿菜,去翻一下锅,再回来继续切。同一时刻只做一件事,但通过快速切换让三道菜都在”推进”。
  • 并行(Parallelism):多个任务真正地同时执行。就像三个厨师各做一道菜——同一时刻真的有三件事在同时发生。

在有多个CPU核心的计算机上,并发和并行可以同时存在。Rust对两者都提供了强大的支持。

1.2 为什么并发编程难?

并发编程最大的敌人是数据竞争——两个线程同时访问同一块数据,且至少一个在写。这会导致不可预测的行为:

线程A: 读取 count = 10
线程B: 读取 count = 10
线程A: count = 10 + 1 → 写回 11
线程B: count = 10 + 1 → 写回 11
结果: count = 11(应该是12!)

在C/C++中,数据竞争导致的bug极其难复现和调试。Rust的所有权和借用规则天然地防止了大部分数据竞争——如果你的代码有潜在的数据竞争,它根本编译不过。


二、线程——最基本的并发单元

2.1 创建线程

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("子线程打印:{}", i);
            thread::sleep(Duration::from_millis(200));
        }
    });

    for i in 1..=3 {
        println!("主线程打印:{}", i);
        thread::sleep(Duration::from_millis(300));
    }

    // 等待子线程结束
    handle.join().unwrap();
    println!("所有线程执行完毕");
}

thread::spawn接收一个闭包(匿名函数),在新线程中执行。.join()会阻塞当前线程,直到子线程执行完毕。

2.2 move闭包——把数据”搬”进线程

线程可能比创建它的函数活得更久,所以你不能让线程借用局部变量——那变量可能已经被释放了。Rust的解决方案是move关键字:

use std::thread;

fn main() {
    let greeting = String::from("你好,来自主线程!");

    let handle = thread::spawn(move || {
        // greeting的所有权被move到了这个线程中
        println!("{}", greeting);
    });

    // println!("{}", greeting);  // 编译错误!所有权已转移
    handle.join().unwrap();
}

move把闭包用到的变量的所有权转移给了新线程。这保证了线程中的数据在任何时候都是有效的。

2.3 多线程实战:并行计算

use std::thread;

fn main() {
    let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

    // 把数据分成两半,分别在两个线程中计算
    let mid = data.len() / 2;
    let first_half = data[..mid].to_vec();
    let second_half = data[mid..].to_vec();

    let handle1 = thread::spawn(move || {
        let sum: i32 = first_half.iter().sum();
        println!("前半部分的和:{}", sum);
        sum
    });

    let handle2 = thread::spawn(move || {
        let sum: i32 = second_half.iter().sum();
        println!("后半部分的和:{}", sum);
        sum
    });

    let sum1 = handle1.join().unwrap();
    let sum2 = handle2.join().unwrap();
    println!("总和:{}", sum1 + sum2);
}

🤔 想一想 如果不用move,而是让两个线程直接借用data的不同部分,行不行?(提示:理论上不同部分没有冲突,但Rust的编译器无法保证线程不会比data活得更久,所以不允许。)


三、消息传递——“不要通过共享内存来通信”

3.1 Channel(通道)

Go语言有一句名言:“不要通过共享内存来通信,而要通过通信来共享数据。“Rust也支持这种模式——通过Channel在线程间传递消息。

use std::sync::mpsc;  // mpsc = Multiple Producer, Single Consumer
use std::thread;

fn main() {
    // 创建通道:tx(发送端)和rx(接收端)
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let messages = vec![
            String::from("今天"),
            String::from("天气"),
            String::from("真好"),
        ];

        for msg in messages {
            tx.send(msg).unwrap();
            thread::sleep(std::time::Duration::from_millis(500));
        }
    });

    // 在主线程中接收消息
    for received in rx {
        println!("收到消息:{}", received);
    }
}

Channel就像一条传送带——一头放东西(send),另一头取东西(recv)。当发送端关闭时(被drop),接收端的迭代器会自动结束。

3.2 多个生产者

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 克隆发送端,让多个线程都能发送
    for i in 0..3 {
        let tx_clone = tx.clone();
        thread::spawn(move || {
            let msg = format!("来自线程{}的消息", i);
            tx_clone.send(msg).unwrap();
        });
    }

    // 原始的tx需要被drop,否则rx永远不会结束
    drop(tx);

    for msg in rx {
        println!("{}", msg);
    }
}

四、共享状态——当你确实需要共享数据时

4.1 Mutex——互斥锁

有时候多个线程确实需要访问同一块数据。Rust提供了Mutex(互斥锁)来保护共享数据:

use std::sync::Mutex;

fn main() {
    let counter = Mutex::new(0);

    {
        let mut num = counter.lock().unwrap();  // 获取锁
        *num += 1;
    }  // 锁在这里自动释放

    println!("计数器:{}", *counter.lock().unwrap());
}

Mutex就像一间只有一把钥匙的更衣室——一次只能一个人进去。你必须先lock()拿到钥匙才能访问里面的数据,用完后钥匙自动归还(锁自动释放)。

4.2 Arc——原子引用计数

要在多个线程间共享Mutex,需要用Arc(Atomic Reference Counting)——它是Rc的线程安全版本:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter_clone = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter_clone.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("最终计数:{}", *counter.lock().unwrap());  // 10
}

Arc让多个线程可以共享同一个Mutex的所有权,每个clone增加引用计数,当最后一个Arc被drop时数据才会被释放。

4.3 Send和Sync——编译器的并发安全卫士

Rust有两个特殊的trait,它们是并发安全的守门人:

  • Send:类型的值可以安全地在线程间转移所有权
  • Sync:类型的引用可以安全地在线程间共享

几乎所有基本类型都是Send + Sync的。但有些类型不是——比如Rc<T>不是Send也不是Sync(因为它的引用计数不是原子操作),所以你没法在线程间共享Rc,编译器会阻止你。

这就是Rust所谓的”无畏并发”(fearless concurrency)——不是说并发不会出问题,而是说那些最危险的并发bug在编译期就被挡住了。

🤔 想一想 如果你尝试用Rc<Mutex<i32>>替代Arc<Mutex<i32>>在多线程中使用,编译器会给出什么错误?试试看!


五、异步编程——async/await

5.1 为什么需要异步?

线程适合CPU密集型任务。但对于IO密集型任务(等待网络响应、读写文件),线程太”重”了——每个线程要占几MB的栈空间,创建和切换都有开销。

异步编程让你可以在一个线程上同时”等待”多个IO操作——当一个任务在等待网络响应时,运行时会切换去执行另一个任务。就像一个服务员同时照顾多张桌子——当一桌客人在看菜单时,去给另一桌上菜。

5.2 async/await语法

// Cargo.toml中需要添加:
// [dependencies]
// tokio = { version = "1", features = ["full"] }
// 注意:features = ["full"] 会启用 tokio 的所有功能模块。
// 在生产项目中可以按需选择,如只用 ["rt-multi-thread", "macros", "time"] 来减小编译体积。

use tokio::time::{sleep, Duration};

async fn fetch_data(source: &str) -> String {
    println!("开始从{}获取数据...", source);
    sleep(Duration::from_secs(1)).await;  // 模拟IO等待
    format!("来自{}的数据", source)
}

#[tokio::main]
async fn main() {
    // 串行执行:总共2秒
    let data1 = fetch_data("数据库").await;
    let data2 = fetch_data("API").await;
    println!("串行结果:{}, {}", data1, data2);
}

5.3 并发执行异步任务

use tokio::time::{sleep, Duration};

async fn fetch_data(source: &str, delay_ms: u64) -> String {
    println!("开始从{}获取数据...", source);
    sleep(Duration::from_millis(delay_ms)).await;
    println!("{}的数据获取完毕!", source);
    format!("{}的结果", source)
}

#[tokio::main]
async fn main() {
    // 并发执行:总共只需要最长的那个时间
    let (r1, r2, r3) = tokio::join!(
        fetch_data("数据库", 1000),
        fetch_data("缓存", 200),
        fetch_data("外部API", 1500),
    );

    println!("结果:{}, {}, {}", r1, r2, r3);
}

tokio::join!让三个异步任务并发运行(在异步运行时上并发执行),总时间约1.5秒而不是2.7秒。

5.4 用tokio::spawn创建异步任务

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let task1 = tokio::spawn(async {
        sleep(Duration::from_millis(500)).await;
        println!("任务1完成");
        42
    });

    let task2 = tokio::spawn(async {
        sleep(Duration::from_millis(300)).await;
        println!("任务2完成");
        "hello"
    });

    let result1 = task1.await.unwrap();
    let result2 = task2.await.unwrap();
    println!("结果:{}, {}", result1, result2);
}

5.5 线程 vs 异步:怎么选?

场景推荐方式原因
CPU密集计算多线程需要真正的并行计算能力
大量IO操作async/await轻量级,一个线程可处理数万并发连接
少量简单并发多线程简单直观,不需要异步运行时(但如果项目已引入tokio,用async保持一致性也可以)
Web服务器async/await连接数多,大部分时间在等IO
数据处理pipeline多线程+channel各阶段CPU密集,适合线程间传递数据

⚠️ 常见误区

  1. 在async函数中做CPU密集运算——async的核心是让出执行权给其他任务。如果你在async函数中做大量计算不await,会阻塞整个异步运行时。CPU密集的工作应该用tokio::task::spawn_blocking
    // 错误:在async中直接做CPU密集计算,会阻塞运行时
    async fn bad_hash(data: Vec<u8>) -> String {
        expensive_hash(&data)  // 没有await点,其他任务无法执行
    }
    
    // 正确:用spawn_blocking把CPU密集工作交给专用线程池
    async fn good_hash(data: Vec<u8>) -> String {
        tokio::task::spawn_blocking(move || expensive_hash(&data))
            .await
            .unwrap()
    }
  2. 忘了drop原始的发送端——用channel时,如果原始的tx没有被drop,rx的迭代器永远不会结束,程序会卡住。
  3. 死锁——一个线程持有锁A等待锁B,另一个线程持有锁B等待锁A。Rust无法在编译期完全防止死锁,这需要你自己注意锁的获取顺序。
  4. 过度使用Mutex——如果数据只是从一个线程传到另一个线程,用channel更简单也更安全。Mutex适合”多个线程需要读写同一块数据”的场景。
  5. 混淆async和多线程——async默认不创建新线程。tokio的多线程运行时会使用线程池,但你写的async代码本身是在事件循环中调度的。

📝 掌握度自测

  1. 基础概念:用自己的话解释”并发”和”并行”的区别。一个单核CPU能实现并行吗?
  2. 线程安全:为什么Rc<T>不能在线程间共享?需要用什么替代?
  3. 消息传递:写一段代码,用channel实现”3个生产者线程各发送一条消息,主线程接收并打印”。
  4. 共享状态:用Arc<Mutex<Vec<i32>>>实现”5个线程各往Vec中push一个数字,最后打印Vec”。
  5. 异步编程:解释tokio::join!和依次.await的区别。什么场景下前者更高效?

💡 自我评估

  • 答对5题:并发编程的核心概念已经掌握,你可以自信地编写多线程和异步代码了。
  • 答对3-4题:基础概念已掌握,建议多实践async/await或者线程间共享数据的场景。
  • 答对0-2题:并发编程确实是个进阶话题。建议先从简单的thread::spawn开始,逐步增加channel和Mutex的使用,最后再接触async。

购买课程解锁全部内容

内存安全 + 零成本抽象:Rust 系统编程实战

¥29.90