并发编程 —— 让CPU的每一个核心都忙起来
现代CPU动辄8核、16核,但如果你的程序只用一个核,那剩下的核心就在”摸鱼”。并发编程就是让多个任务同时运行的技术。在C/C++中,并发编程是出了名的”雷区”——数据竞争、死锁、竞态条件,个个都是噩梦。Rust用它的类型系统在编译期就帮你排除了大部分并发bug,让你可以放心地写并发代码。
📋 开篇自测:你已经知道多少?
- 你能区分”并发”和”并行”的概念吗?
- 什么是数据竞争(data race)?它为什么危险?
- async/await和线程的区别是什么?各适合什么场景?
一、并发与并行——先把概念理清
1.1 并发 vs 并行
这两个词经常被混用,但含义不同:
- 并发(Concurrency):多个任务在同一时间段内交替执行。就像一个厨师同时做三道菜——切一会儿菜,去翻一下锅,再回来继续切。同一时刻只做一件事,但通过快速切换让三道菜都在”推进”。
- 并行(Parallelism):多个任务真正地同时执行。就像三个厨师各做一道菜——同一时刻真的有三件事在同时发生。
在有多个CPU核心的计算机上,并发和并行可以同时存在。Rust对两者都提供了强大的支持。
1.2 为什么并发编程难?
并发编程最大的敌人是数据竞争——两个线程同时访问同一块数据,且至少一个在写。这会导致不可预测的行为:
线程A: 读取 count = 10
线程B: 读取 count = 10
线程A: count = 10 + 1 → 写回 11
线程B: count = 10 + 1 → 写回 11
结果: count = 11(应该是12!)
在C/C++中,数据竞争导致的bug极其难复现和调试。Rust的所有权和借用规则天然地防止了大部分数据竞争——如果你的代码有潜在的数据竞争,它根本编译不过。
二、线程——最基本的并发单元
2.1 创建线程
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("子线程打印:{}", i);
thread::sleep(Duration::from_millis(200));
}
});
for i in 1..=3 {
println!("主线程打印:{}", i);
thread::sleep(Duration::from_millis(300));
}
// 等待子线程结束
handle.join().unwrap();
println!("所有线程执行完毕");
}
thread::spawn接收一个闭包(匿名函数),在新线程中执行。.join()会阻塞当前线程,直到子线程执行完毕。
2.2 move闭包——把数据”搬”进线程
线程可能比创建它的函数活得更久,所以你不能让线程借用局部变量——那变量可能已经被释放了。Rust的解决方案是move关键字:
use std::thread;
fn main() {
let greeting = String::from("你好,来自主线程!");
let handle = thread::spawn(move || {
// greeting的所有权被move到了这个线程中
println!("{}", greeting);
});
// println!("{}", greeting); // 编译错误!所有权已转移
handle.join().unwrap();
}
move把闭包用到的变量的所有权转移给了新线程。这保证了线程中的数据在任何时候都是有效的。
2.3 多线程实战:并行计算
use std::thread;
fn main() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// 把数据分成两半,分别在两个线程中计算
let mid = data.len() / 2;
let first_half = data[..mid].to_vec();
let second_half = data[mid..].to_vec();
let handle1 = thread::spawn(move || {
let sum: i32 = first_half.iter().sum();
println!("前半部分的和:{}", sum);
sum
});
let handle2 = thread::spawn(move || {
let sum: i32 = second_half.iter().sum();
println!("后半部分的和:{}", sum);
sum
});
let sum1 = handle1.join().unwrap();
let sum2 = handle2.join().unwrap();
println!("总和:{}", sum1 + sum2);
}
🤔 想一想 如果不用
move,而是让两个线程直接借用data的不同部分,行不行?(提示:理论上不同部分没有冲突,但Rust的编译器无法保证线程不会比data活得更久,所以不允许。)
三、消息传递——“不要通过共享内存来通信”
3.1 Channel(通道)
Go语言有一句名言:“不要通过共享内存来通信,而要通过通信来共享数据。“Rust也支持这种模式——通过Channel在线程间传递消息。
use std::sync::mpsc; // mpsc = Multiple Producer, Single Consumer
use std::thread;
fn main() {
// 创建通道:tx(发送端)和rx(接收端)
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec![
String::from("今天"),
String::from("天气"),
String::from("真好"),
];
for msg in messages {
tx.send(msg).unwrap();
thread::sleep(std::time::Duration::from_millis(500));
}
});
// 在主线程中接收消息
for received in rx {
println!("收到消息:{}", received);
}
}
Channel就像一条传送带——一头放东西(send),另一头取东西(recv)。当发送端关闭时(被drop),接收端的迭代器会自动结束。
3.2 多个生产者
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// 克隆发送端,让多个线程都能发送
for i in 0..3 {
let tx_clone = tx.clone();
thread::spawn(move || {
let msg = format!("来自线程{}的消息", i);
tx_clone.send(msg).unwrap();
});
}
// 原始的tx需要被drop,否则rx永远不会结束
drop(tx);
for msg in rx {
println!("{}", msg);
}
}
四、共享状态——当你确实需要共享数据时
4.1 Mutex——互斥锁
有时候多个线程确实需要访问同一块数据。Rust提供了Mutex(互斥锁)来保护共享数据:
use std::sync::Mutex;
fn main() {
let counter = Mutex::new(0);
{
let mut num = counter.lock().unwrap(); // 获取锁
*num += 1;
} // 锁在这里自动释放
println!("计数器:{}", *counter.lock().unwrap());
}
Mutex就像一间只有一把钥匙的更衣室——一次只能一个人进去。你必须先lock()拿到钥匙才能访问里面的数据,用完后钥匙自动归还(锁自动释放)。
4.2 Arc——原子引用计数
要在多个线程间共享Mutex,需要用Arc(Atomic Reference Counting)——它是Rc的线程安全版本:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter_clone.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("最终计数:{}", *counter.lock().unwrap()); // 10
}
Arc让多个线程可以共享同一个Mutex的所有权,每个clone增加引用计数,当最后一个Arc被drop时数据才会被释放。
4.3 Send和Sync——编译器的并发安全卫士
Rust有两个特殊的trait,它们是并发安全的守门人:
- Send:类型的值可以安全地在线程间转移所有权
- Sync:类型的引用可以安全地在线程间共享
几乎所有基本类型都是Send + Sync的。但有些类型不是——比如Rc<T>不是Send也不是Sync(因为它的引用计数不是原子操作),所以你没法在线程间共享Rc,编译器会阻止你。
这就是Rust所谓的”无畏并发”(fearless concurrency)——不是说并发不会出问题,而是说那些最危险的并发bug在编译期就被挡住了。
🤔 想一想 如果你尝试用
Rc<Mutex<i32>>替代Arc<Mutex<i32>>在多线程中使用,编译器会给出什么错误?试试看!
五、异步编程——async/await
5.1 为什么需要异步?
线程适合CPU密集型任务。但对于IO密集型任务(等待网络响应、读写文件),线程太”重”了——每个线程要占几MB的栈空间,创建和切换都有开销。
异步编程让你可以在一个线程上同时”等待”多个IO操作——当一个任务在等待网络响应时,运行时会切换去执行另一个任务。就像一个服务员同时照顾多张桌子——当一桌客人在看菜单时,去给另一桌上菜。
5.2 async/await语法
// Cargo.toml中需要添加:
// [dependencies]
// tokio = { version = "1", features = ["full"] }
// 注意:features = ["full"] 会启用 tokio 的所有功能模块。
// 在生产项目中可以按需选择,如只用 ["rt-multi-thread", "macros", "time"] 来减小编译体积。
use tokio::time::{sleep, Duration};
async fn fetch_data(source: &str) -> String {
println!("开始从{}获取数据...", source);
sleep(Duration::from_secs(1)).await; // 模拟IO等待
format!("来自{}的数据", source)
}
#[tokio::main]
async fn main() {
// 串行执行:总共2秒
let data1 = fetch_data("数据库").await;
let data2 = fetch_data("API").await;
println!("串行结果:{}, {}", data1, data2);
}
5.3 并发执行异步任务
use tokio::time::{sleep, Duration};
async fn fetch_data(source: &str, delay_ms: u64) -> String {
println!("开始从{}获取数据...", source);
sleep(Duration::from_millis(delay_ms)).await;
println!("{}的数据获取完毕!", source);
format!("{}的结果", source)
}
#[tokio::main]
async fn main() {
// 并发执行:总共只需要最长的那个时间
let (r1, r2, r3) = tokio::join!(
fetch_data("数据库", 1000),
fetch_data("缓存", 200),
fetch_data("外部API", 1500),
);
println!("结果:{}, {}, {}", r1, r2, r3);
}
tokio::join!让三个异步任务并发运行(在异步运行时上并发执行),总时间约1.5秒而不是2.7秒。
5.4 用tokio::spawn创建异步任务
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let task1 = tokio::spawn(async {
sleep(Duration::from_millis(500)).await;
println!("任务1完成");
42
});
let task2 = tokio::spawn(async {
sleep(Duration::from_millis(300)).await;
println!("任务2完成");
"hello"
});
let result1 = task1.await.unwrap();
let result2 = task2.await.unwrap();
println!("结果:{}, {}", result1, result2);
}
5.5 线程 vs 异步:怎么选?
| 场景 | 推荐方式 | 原因 |
|---|---|---|
| CPU密集计算 | 多线程 | 需要真正的并行计算能力 |
| 大量IO操作 | async/await | 轻量级,一个线程可处理数万并发连接 |
| 少量简单并发 | 多线程 | 简单直观,不需要异步运行时(但如果项目已引入tokio,用async保持一致性也可以) |
| Web服务器 | async/await | 连接数多,大部分时间在等IO |
| 数据处理pipeline | 多线程+channel | 各阶段CPU密集,适合线程间传递数据 |
⚠️ 常见误区
- 在async函数中做CPU密集运算——async的核心是让出执行权给其他任务。如果你在async函数中做大量计算不await,会阻塞整个异步运行时。CPU密集的工作应该用
tokio::task::spawn_blocking:// 错误:在async中直接做CPU密集计算,会阻塞运行时 async fn bad_hash(data: Vec<u8>) -> String { expensive_hash(&data) // 没有await点,其他任务无法执行 } // 正确:用spawn_blocking把CPU密集工作交给专用线程池 async fn good_hash(data: Vec<u8>) -> String { tokio::task::spawn_blocking(move || expensive_hash(&data)) .await .unwrap() }- 忘了drop原始的发送端——用channel时,如果原始的
tx没有被drop,rx的迭代器永远不会结束,程序会卡住。- 死锁——一个线程持有锁A等待锁B,另一个线程持有锁B等待锁A。Rust无法在编译期完全防止死锁,这需要你自己注意锁的获取顺序。
- 过度使用Mutex——如果数据只是从一个线程传到另一个线程,用channel更简单也更安全。Mutex适合”多个线程需要读写同一块数据”的场景。
- 混淆async和多线程——
async默认不创建新线程。tokio的多线程运行时会使用线程池,但你写的async代码本身是在事件循环中调度的。
📝 掌握度自测
- 基础概念:用自己的话解释”并发”和”并行”的区别。一个单核CPU能实现并行吗?
- 线程安全:为什么
Rc<T>不能在线程间共享?需要用什么替代? - 消息传递:写一段代码,用channel实现”3个生产者线程各发送一条消息,主线程接收并打印”。
- 共享状态:用
Arc<Mutex<Vec<i32>>>实现”5个线程各往Vec中push一个数字,最后打印Vec”。 - 异步编程:解释
tokio::join!和依次.await的区别。什么场景下前者更高效?
💡 自我评估
- 答对5题:并发编程的核心概念已经掌握,你可以自信地编写多线程和异步代码了。
- 答对3-4题:基础概念已掌握,建议多实践async/await或者线程间共享数据的场景。
- 答对0-2题:并发编程确实是个进阶话题。建议先从简单的thread::spawn开始,逐步增加channel和Mutex的使用,最后再接触async。
购买课程解锁全部内容
内存安全 + 零成本抽象:Rust 系统编程实战
¥29.90