Javascript篇 | 手写p-limit
前言
面试中经常会遇到这样一道题:“假设有 100 个异步请求,如何控制并发数为 5?”
这道题看似简单,实际上涉及到队列、Promise、异步控制流等多个知识点。而 p-limit 就是解决这个问题的经典方案——npm 上每周下载量超过 1 亿次的高频工具包。
面试官考这道题,不是看你能不能”背代码”,而是看你:
- 对异步编程的理解深度
- 能不能设计清晰的 API
- 代码实现是否考虑边界情况
- 有没有工程化思维(队列、资源管理)
今天我们就从需求分析到完整实现,手把手写一个 p-limit。
诊断自测
在开始之前,试着回答以下问题:
1. 以下代码有什么问题?在生产环境中会带来什么后果?
const urls = Array.from({ length: 1000 }, (_, i) => `/api/item/${i}`);
const results = await Promise.all(urls.map(url => fetch(url)));
2. p-limit 的核心 API 是什么样的?
3. 如果并发限制为 2,有 5 个任务,第 3 个任务最早什么时候开始执行?
点击查看答案
第1题: 同时发起 1000 个请求!这会导致:浏览器连接数限制(通常每个域名 6 个 TCP 连接,其余排队)、服务器压力过大可能返回 429、内存占用激增。应该控制并发数。
第2题: const limit = pLimit(5);,然后用 limit(() => asyncFn()) 包裹异步任务。limit 函数返回一个 Promise,当并发数未满时立即执行,否则排队等待。
第3题: 第 3 个任务最早在第 1 个或第 2 个任务完成后开始执行。因为并发限制为 2,前两个任务占满了”槽位”,第 3 个必须等其中一个完成后才能开始。
如果都答对了,说明你对并发控制已经有一定理解!继续阅读可以深入掌握实现细节。
并发控制的需求场景
为什么需要并发控制?
在实际项目中,我们经常需要发送大量异步请求:
// 批量上传文件
const files = getSelectedFiles(); // 可能有几十上百个
// 批量查询数据
const userIds = [1, 2, 3, /* ... 几百个 */];
// 爬虫/数据采集
const urls = getAllPageUrls(); // 可能有几千个
如果不控制并发:
- 浏览器限制:同一域名最多 6 个并发连接(HTTP/1.1),多余的请求只能排队
- 服务器压力:瞬间大量请求可能触发限流(429 Too Many Requests)
- 内存问题:几千个 Promise 同时创建,内存占用巨大
- 超时风险:排队时间过长导致请求超时
理想的解决方案
import pLimit from 'p-limit';
const limit = pLimit(5); // 并发上限 5
const urls = Array.from({ length: 100 }, (_, i) => `/api/item/${i}`);
const results = await Promise.all(
urls.map(url => limit(() => fetch(url)))
);
// 同一时刻最多只有 5 个请求在执行
p-limit 的 API 设计
在动手实现之前,先明确 API:
// 1. 创建限制器,指定最大并发数
const limit = pLimit(concurrency);
// 2. 用 limit 包裹异步函数
const result = await limit(asyncFn, arg1, arg2);
// 3. limit 返回 Promise,行为和直接调用 asyncFn 一致
// 如果当前并发未满 → 立即执行
// 如果当前并发已满 → 排队等待
// 4. 额外属性
limit.activeCount; // 当前正在执行的任务数
limit.pendingCount; // 队列中等待的任务数
limit.clearQueue(); // 清空等待队列
核心实现:队列 + 计数器
实现思路
核心就是两个东西:
- 计数器(activeCount):记录当前有多少个任务在执行
- 队列(queue):存放等待执行的任务
逻辑:
- 新任务进来,先包装成一个”可控的 Promise”放入队列
- 检查 activeCount 是否小于并发限制
- 如果可以执行 → 从队列取出任务执行,activeCount++
- 任务完成后 → activeCount—,然后尝试从队列取下一个任务执行
第一步:骨架
function pLimit(concurrency) {
// 参数校验
if (!Number.isInteger(concurrency) || concurrency < 1) {
throw new TypeError('Expected concurrency to be a positive integer');
}
const queue = [];
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.length > 0) {
const fn = queue.shift();
fn();
}
};
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (fn, resolve, args) => {
queue.push(run.bind(undefined, fn, resolve, args));
if (activeCount < concurrency && queue.length > 0) {
const fn = queue.shift();
fn();
}
};
const generator = (fn, ...args) =>
new Promise(resolve => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.length,
},
clearQueue: {
value: () => {
queue.length = 0;
},
},
});
return generator;
}
逐行解析
让我们拆开来看每个部分:
generator 函数(入口):
const generator = (fn, ...args) =>
new Promise(resolve => {
enqueue(fn, resolve, args);
});
这就是 limit() 的本体。它返回一个 Promise,把任务和 resolve 一起传给 enqueue。调用者拿到的 Promise 会在任务执行完成后被 resolve。
enqueue 函数(入队 + 调度):
const enqueue = (fn, resolve, args) => {
// 把 run 函数放入队列
queue.push(run.bind(undefined, fn, resolve, args));
// 如果还有空闲槽位,立即执行队列中的任务
if (activeCount < concurrency && queue.length > 0) {
const fn = queue.shift();
fn();
}
};
注意这里用了 queue.push 然后马上 queue.shift,看起来多此一举?其实不是——统一走队列可以确保 FIFO 顺序,代码更简洁。
run 函数(执行任务):
const run = async (fn, resolve, args) => {
activeCount++;
// 包一层 async IIFE,确保即使 fn 不是 async 函数也能正常处理
const result = (async () => fn(...args))();
// 立即 resolve 外层 Promise(传递内部 Promise)
resolve(result);
try {
await result;
} catch {}
// 无论成功失败,都调用 next
next();
};
这里有个巧妙的设计:resolve(result) 不是 resolve 一个值,而是 resolve 一个 Promise。根据 Promise 规范,当你 resolve(anotherPromise) 时,外层 Promise 会”跟随”这个内部 Promise 的状态。
next 函数(调度下一个):
const next = () => {
activeCount--;
if (queue.length > 0) {
const fn = queue.shift();
fn();
}
};
任务完成后,减少计数器,然后从队列取出下一个任务执行。
完整版(可直接使用)
function pLimit(concurrency) {
if (!Number.isInteger(concurrency) || concurrency < 1) {
throw new TypeError('Expected concurrency to be a positive integer');
}
const queue = [];
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.length > 0) {
queue.shift()();
}
};
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (fn, resolve, args) => {
queue.push(run.bind(undefined, fn, resolve, args));
// 使用 queueMicrotask 确保 activeCount 已经更新
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
})();
};
const generator = (fn, ...args) =>
new Promise(resolve => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: { get: () => activeCount },
pendingCount: { get: () => queue.length },
clearQueue: {
value: () => { queue.length = 0; },
},
});
return generator;
}
注意完整版中
enqueue多了一个await Promise.resolve(),这是为了确保同步循环中 push 的多个任务都先入队,然后再统一调度。否则可能出现并发数没有充分利用的情况。
使用示例
批量请求
const limit = pLimit(5);
async function fetchAllUsers(userIds) {
const promises = userIds.map(id =>
limit(() => fetch(`/api/users/${id}`).then(r => r.json()))
);
return Promise.all(promises);
}
const users = await fetchAllUsers([1, 2, 3, /* ... 100个 */]);
// 同一时刻最多 5 个请求在飞
文件上传
const limit = pLimit(3);
async function uploadFiles(files) {
const results = await Promise.all(
files.map(file =>
limit(async () => {
const formData = new FormData();
formData.append('file', file);
const res = await fetch('/api/upload', {
method: 'POST',
body: formData,
});
return res.json();
})
)
);
console.log(`上传完成: ${results.length} 个文件`);
return results;
}
带进度的批量操作
const limit = pLimit(10);
async function batchProcess(items, onProgress) {
let completed = 0;
const results = await Promise.all(
items.map(item =>
limit(async () => {
const result = await processItem(item);
completed++;
onProgress({
completed,
total: items.length,
active: limit.activeCount,
pending: limit.pendingCount,
});
return result;
})
)
);
return results;
}
await batchProcess(items, (progress) => {
console.log(`进度: ${progress.completed}/${progress.total}`);
console.log(`活跃: ${progress.active}, 等待: ${progress.pending}`);
});
扩展:相关工具
p-queue:更强大的队列
p-limit 只做并发控制,p-queue 提供了更丰富的功能:
import PQueue from 'p-queue';
const queue = new PQueue({
concurrency: 5,
interval: 1000, // 每 1000ms 的间隔内
intervalCap: 10, // 最多启动 10 个任务(限速)
timeout: 30000, // 单个任务超时时间
});
queue.on('active', () => {
console.log(`活跃: ${queue.pending} 个等待中`);
});
// 添加任务
await queue.add(() => fetchData());
// 添加优先级更高的任务
await queue.add(() => urgentTask(), { priority: 1 });
// 等待所有任务完成
await queue.onIdle();
// 暂停/恢复
queue.pause();
queue.start();
p-retry:自动重试
import pRetry from 'p-retry';
const result = await pRetry(
async () => {
const res = await fetch('/api/data');
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json();
},
{
retries: 3,
onFailedAttempt: (error) => {
console.log(`第 ${error.attemptNumber} 次失败,还剩 ${error.retriesLeft} 次重试`);
},
}
);
p-limit + p-retry 组合使用
const limit = pLimit(5);
async function fetchWithRetry(url) {
return limit(() =>
pRetry(() => fetch(url).then(r => r.json()), { retries: 3 })
);
}
const results = await Promise.all(
urls.map(url => fetchWithRetry(url))
);
常见误区
误区1:用 for 循环 + await 实现”并发控制”
// ❌ 这不是并发控制,这是串行执行!
async function fetchAll(urls) {
const results = [];
for (const url of urls) {
const res = await fetch(url); // 一个一个地等
results.push(await res.json());
}
return results;
}
这是串行执行,100 个请求一个接一个,速度极慢。并发控制是让多个请求同时进行,但限制”同时”的数量。
误区2:用 Promise.all 分批执行就够了
// ⚠️ 分批执行,但有问题
async function fetchInBatches(urls, batchSize) {
const results = [];
for (let i = 0; i < urls.length; i += batchSize) {
const batch = urls.slice(i, i + batchSize);
const batchResults = await Promise.all(batch.map(url => fetch(url)));
results.push(...batchResults);
}
return results;
}
这种方式的问题是:每一批中最慢的那个请求会”拖后腿”。比如一批 5 个请求,4 个 100ms 就完成了,1 个要 2 秒——那 4 个完成的”槽位”就白白浪费了 1.9 秒。
p-limit 的做法是:一个任务完成就立即启动下一个,没有”等一批”的空窗期。
误区3:activeCount 不需要在 catch 里减少
// ❌ 如果 fn 抛出错误,activeCount 永远不会减少
const run = async (fn, resolve, args) => {
activeCount++;
const result = await fn(...args);
activeCount--; // 如果 fn 抛错,这行不会执行
next();
};
正确做法是用 try/catch/finally 或像 p-limit 源码那样 try { await result } catch {} next(),确保无论成功失败都会调用 next()。
误区4:并发数设置得越大越好
并发数不是越大越好:
- 太小(如 1):退化为串行,速度慢
- 太大(如 100):接近不限制,失去控制意义
- 合理范围:HTTP 请求通常 5-20,文件 I/O 通常 3-10,CPU 密集型通常等于 CPU 核心数
小结
p-limit 看似简单,但它背后的设计思想——队列调度 + 信号量机制——是并发编程的基础模式。
核心要点
- 并发控制 ≠ 串行执行:并发控制是限制”同时执行的数量”,不是一个一个排队
- 核心组件:计数器(activeCount)+ 队列(queue)+ 调度器(next)
- resolve(promise):利用 Promise 的”跟随”特性,把内部 Promise 的状态传递给外部
- 分批 vs 池化:p-limit 用的是池化模型,一个完成就补一个,没有”等一批”的空窗
- 错误处理:必须确保无论任务成功还是失败,都要 activeCount— 并调度下一个
本章思维导图
- 需求场景
- 批量请求(避免并发过高)
- 文件上传(服务器限流)
- 爬虫采集(控制连接数)
- API 设计
- pLimit(concurrency) → limit 函数
- limit(fn, ...args) → Promise
- limit.activeCount(当前执行数)
- limit.pendingCount(队列长度)
- limit.clearQueue()(清空队列)
- 核心实现
- 计数器 activeCount
- 队列 queue(FIFO)
- enqueue:入队 + 判断是否启动
- run:执行 + resolve + next
- next:activeCount-- + 取下一个
- 关键技巧
- resolve(promise) 状态跟随
- async IIFE 包装非 async 函数
- queueMicrotask 确保顺序
- 相关工具
- p-queue(优先级、限速、暂停)
- p-retry(自动重试)
- p-all(简化批量用法)
练习挑战
挑战一:基础(⭐)
用 p-limit 的思路,实现一个简化版的并发控制函数。只需要支持基本功能:
const limit = createLimit(2);
limit(() => delay(1000).then(() => console.log('A'))); // 立即开始
limit(() => delay(500).then(() => console.log('B'))); // 立即开始
limit(() => delay(300).then(() => console.log('C'))); // 排队等待
答案与解析
function createLimit(concurrency) {
const queue = [];
let active = 0;
function next() {
active--;
if (queue.length > 0) {
queue.shift()();
}
}
return function(fn) {
return new Promise((resolve, reject) => {
const run = async () => {
active++;
try {
const result = await fn();
resolve(result);
} catch (err) {
reject(err);
}
next();
};
if (active < concurrency) {
run();
} else {
queue.push(run);
}
});
};
}
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
这是最简版本,只有 20 行左右。核心就是:能执行就执行,不能执行就放队列,执行完调 next。
挑战二:进阶(⭐⭐)
给上面的实现添加 activeCount、pendingCount 和 clearQueue 功能。然后实现一个带进度回调的批量执行函数:
async function batchRun(tasks, concurrency, onProgress) {
// tasks: Array<() => Promise<any>>
// concurrency: 并发数
// onProgress: ({ completed, total }) => void
}
答案与解析
function createLimit(concurrency) {
const queue = [];
let active = 0;
function next() {
active--;
if (queue.length > 0) {
queue.shift()();
}
}
const limit = function(fn) {
return new Promise((resolve, reject) => {
const run = async () => {
active++;
try {
resolve(await fn());
} catch (err) {
reject(err);
}
next();
};
if (active < concurrency) {
run();
} else {
queue.push(run);
}
});
};
Object.defineProperties(limit, {
activeCount: { get: () => active },
pendingCount: { get: () => queue.length },
clearQueue: { value: () => { queue.length = 0; } },
});
return limit;
}
async function batchRun(tasks, concurrency, onProgress) {
const limit = createLimit(concurrency);
let completed = 0;
return Promise.all(
tasks.map(task =>
limit(async () => {
const result = await task();
completed++;
onProgress?.({ completed, total: tasks.length });
return result;
})
)
);
}
// 使用
await batchRun(
urls.map(url => () => fetch(url)),
5,
({ completed, total }) => console.log(`${completed}/${total}`)
);
挑战三:综合(⭐⭐⭐)
实现一个支持超时和重试的增强版 p-limit:
const limit = createAdvancedLimit({
concurrency: 3,
timeout: 5000, // 单个任务超时 5s
retries: 2, // 失败重试 2 次
onTaskComplete: (result) => console.log('完成:', result),
onTaskError: (error) => console.error('失败:', error),
});
答案与解析
function createAdvancedLimit(options) {
const {
concurrency,
timeout = 0,
retries = 0,
onTaskComplete,
onTaskError,
} = options;
const queue = [];
let active = 0;
function next() {
active--;
if (queue.length > 0) {
queue.shift()();
}
}
async function executeWithTimeout(fn, ms) {
if (!ms) return fn();
return Promise.race([
fn(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Task timeout')), ms)
),
]);
}
async function executeWithRetry(fn, retriesLeft) {
try {
return await executeWithTimeout(fn, timeout);
} catch (err) {
if (retriesLeft > 0) {
return executeWithRetry(fn, retriesLeft - 1);
}
throw err;
}
}
const limit = function(fn) {
return new Promise((resolve, reject) => {
const run = async () => {
active++;
try {
const result = await executeWithRetry(fn, retries);
onTaskComplete?.(result);
resolve(result);
} catch (err) {
onTaskError?.(err);
reject(err);
}
next();
};
if (active < concurrency) {
run();
} else {
queue.push(run);
}
});
};
Object.defineProperties(limit, {
activeCount: { get: () => active },
pendingCount: { get: () => queue.length },
clearQueue: { value: () => { queue.length = 0; } },
});
return limit;
}
核心是把 timeout 和 retry 分成两个独立的函数组合起来。executeWithTimeout 用 Promise.race 实现超时,executeWithRetry 用递归实现重试。这种组合设计是工程中的常见模式。
自我检测
读完本章后,确认你能回答以下问题:
- 能说出为什么需要并发控制(浏览器限制、服务器限流、内存占用)
- 能描述 p-limit 的 API 设计(limit 函数、activeCount、pendingCount)
- 能手写简化版 p-limit(队列 + 计数器 + next 调度)
- 能解释 resolve(promise) 的”状态跟随”机制
- 能解释为什么”分批执行”不如”池化模型”高效
- 能说出 for-await 串行和 Promise.all 全并发的区别
- 知道并发数应该怎么合理设置(5-20 for HTTP, 3-10 for I/O)
- 了解 p-queue、p-retry 等相关工具的用途
购买课程解锁全部内容
大厂前端面试通关:71 篇构建完整知识体系
¥89.90