跳到主要内容
预计阅读 42 分钟

Javascript篇 | 手写p-limit

前言

面试中经常会遇到这样一道题:“假设有 100 个异步请求,如何控制并发数为 5?”

这道题看似简单,实际上涉及到队列、Promise、异步控制流等多个知识点。而 p-limit 就是解决这个问题的经典方案——npm 上每周下载量超过 1 亿次的高频工具包。

面试官考这道题,不是看你能不能”背代码”,而是看你:

  • 对异步编程的理解深度
  • 能不能设计清晰的 API
  • 代码实现是否考虑边界情况
  • 有没有工程化思维(队列、资源管理)

今天我们就从需求分析到完整实现,手把手写一个 p-limit

诊断自测

在开始之前,试着回答以下问题:

1. 以下代码有什么问题?在生产环境中会带来什么后果?

const urls = Array.from({ length: 1000 }, (_, i) => `/api/item/${i}`);
const results = await Promise.all(urls.map(url => fetch(url)));

2. p-limit 的核心 API 是什么样的?

3. 如果并发限制为 2,有 5 个任务,第 3 个任务最早什么时候开始执行?

点击查看答案

第1题: 同时发起 1000 个请求!这会导致:浏览器连接数限制(通常每个域名 6 个 TCP 连接,其余排队)、服务器压力过大可能返回 429、内存占用激增。应该控制并发数。

第2题: const limit = pLimit(5);,然后用 limit(() => asyncFn()) 包裹异步任务。limit 函数返回一个 Promise,当并发数未满时立即执行,否则排队等待。

第3题: 第 3 个任务最早在第 1 个或第 2 个任务完成后开始执行。因为并发限制为 2,前两个任务占满了”槽位”,第 3 个必须等其中一个完成后才能开始。

如果都答对了,说明你对并发控制已经有一定理解!继续阅读可以深入掌握实现细节。

并发控制的需求场景

为什么需要并发控制?

在实际项目中,我们经常需要发送大量异步请求:

// 批量上传文件
const files = getSelectedFiles(); // 可能有几十上百个

// 批量查询数据
const userIds = [1, 2, 3, /* ... 几百个 */];

// 爬虫/数据采集
const urls = getAllPageUrls(); // 可能有几千个

如果不控制并发:

  1. 浏览器限制:同一域名最多 6 个并发连接(HTTP/1.1),多余的请求只能排队
  2. 服务器压力:瞬间大量请求可能触发限流(429 Too Many Requests)
  3. 内存问题:几千个 Promise 同时创建,内存占用巨大
  4. 超时风险:排队时间过长导致请求超时

理想的解决方案

import pLimit from 'p-limit';

const limit = pLimit(5); // 并发上限 5

const urls = Array.from({ length: 100 }, (_, i) => `/api/item/${i}`);

const results = await Promise.all(
  urls.map(url => limit(() => fetch(url)))
);
// 同一时刻最多只有 5 个请求在执行

p-limit 的 API 设计

在动手实现之前,先明确 API:

// 1. 创建限制器,指定最大并发数
const limit = pLimit(concurrency);

// 2. 用 limit 包裹异步函数
const result = await limit(asyncFn, arg1, arg2);

// 3. limit 返回 Promise,行为和直接调用 asyncFn 一致
// 如果当前并发未满 → 立即执行
// 如果当前并发已满 → 排队等待

// 4. 额外属性
limit.activeCount;   // 当前正在执行的任务数
limit.pendingCount;  // 队列中等待的任务数
limit.clearQueue();  // 清空等待队列

核心实现:队列 + 计数器

实现思路

核心就是两个东西:

  • 计数器(activeCount):记录当前有多少个任务在执行
  • 队列(queue):存放等待执行的任务

逻辑:

  1. 新任务进来,先包装成一个”可控的 Promise”放入队列
  2. 检查 activeCount 是否小于并发限制
  3. 如果可以执行 → 从队列取出任务执行,activeCount++
  4. 任务完成后 → activeCount—,然后尝试从队列取下一个任务执行

第一步:骨架

function pLimit(concurrency) {
  // 参数校验
  if (!Number.isInteger(concurrency) || concurrency < 1) {
    throw new TypeError('Expected concurrency to be a positive integer');
  }

  const queue = [];
  let activeCount = 0;

  const next = () => {
    activeCount--;

    if (queue.length > 0) {
      const fn = queue.shift();
      fn();
    }
  };

  const run = async (fn, resolve, args) => {
    activeCount++;

    const result = (async () => fn(...args))();

    resolve(result);

    try {
      await result;
    } catch {}

    next();
  };

  const enqueue = (fn, resolve, args) => {
    queue.push(run.bind(undefined, fn, resolve, args));

    if (activeCount < concurrency && queue.length > 0) {
      const fn = queue.shift();
      fn();
    }
  };

  const generator = (fn, ...args) =>
    new Promise(resolve => {
      enqueue(fn, resolve, args);
    });

  Object.defineProperties(generator, {
    activeCount: {
      get: () => activeCount,
    },
    pendingCount: {
      get: () => queue.length,
    },
    clearQueue: {
      value: () => {
        queue.length = 0;
      },
    },
  });

  return generator;
}

逐行解析

让我们拆开来看每个部分:

generator 函数(入口):

const generator = (fn, ...args) =>
  new Promise(resolve => {
    enqueue(fn, resolve, args);
  });

这就是 limit() 的本体。它返回一个 Promise,把任务和 resolve 一起传给 enqueue。调用者拿到的 Promise 会在任务执行完成后被 resolve。

enqueue 函数(入队 + 调度):

const enqueue = (fn, resolve, args) => {
  // 把 run 函数放入队列
  queue.push(run.bind(undefined, fn, resolve, args));

  // 如果还有空闲槽位,立即执行队列中的任务
  if (activeCount < concurrency && queue.length > 0) {
    const fn = queue.shift();
    fn();
  }
};

注意这里用了 queue.push 然后马上 queue.shift,看起来多此一举?其实不是——统一走队列可以确保 FIFO 顺序,代码更简洁。

run 函数(执行任务):

const run = async (fn, resolve, args) => {
  activeCount++;

  // 包一层 async IIFE,确保即使 fn 不是 async 函数也能正常处理
  const result = (async () => fn(...args))();

  // 立即 resolve 外层 Promise(传递内部 Promise)
  resolve(result);

  try {
    await result;
  } catch {}

  // 无论成功失败,都调用 next
  next();
};

这里有个巧妙的设计:resolve(result) 不是 resolve 一个值,而是 resolve 一个 Promise。根据 Promise 规范,当你 resolve(anotherPromise) 时,外层 Promise 会”跟随”这个内部 Promise 的状态。

next 函数(调度下一个):

const next = () => {
  activeCount--;

  if (queue.length > 0) {
    const fn = queue.shift();
    fn();
  }
};

任务完成后,减少计数器,然后从队列取出下一个任务执行。

完整版(可直接使用)

function pLimit(concurrency) {
  if (!Number.isInteger(concurrency) || concurrency < 1) {
    throw new TypeError('Expected concurrency to be a positive integer');
  }

  const queue = [];
  let activeCount = 0;

  const next = () => {
    activeCount--;
    if (queue.length > 0) {
      queue.shift()();
    }
  };

  const run = async (fn, resolve, args) => {
    activeCount++;
    const result = (async () => fn(...args))();
    resolve(result);
    try {
      await result;
    } catch {}
    next();
  };

  const enqueue = (fn, resolve, args) => {
    queue.push(run.bind(undefined, fn, resolve, args));

    // 使用 queueMicrotask 确保 activeCount 已经更新
    (async () => {
      await Promise.resolve();
      if (activeCount < concurrency && queue.length > 0) {
        queue.shift()();
      }
    })();
  };

  const generator = (fn, ...args) =>
    new Promise(resolve => {
      enqueue(fn, resolve, args);
    });

  Object.defineProperties(generator, {
    activeCount: { get: () => activeCount },
    pendingCount: { get: () => queue.length },
    clearQueue: {
      value: () => { queue.length = 0; },
    },
  });

  return generator;
}

注意完整版中 enqueue 多了一个 await Promise.resolve(),这是为了确保同步循环中 push 的多个任务都先入队,然后再统一调度。否则可能出现并发数没有充分利用的情况。

使用示例

批量请求

const limit = pLimit(5);

async function fetchAllUsers(userIds) {
  const promises = userIds.map(id =>
    limit(() => fetch(`/api/users/${id}`).then(r => r.json()))
  );

  return Promise.all(promises);
}

const users = await fetchAllUsers([1, 2, 3, /* ... 100个 */]);
// 同一时刻最多 5 个请求在飞

文件上传

const limit = pLimit(3);

async function uploadFiles(files) {
  const results = await Promise.all(
    files.map(file =>
      limit(async () => {
        const formData = new FormData();
        formData.append('file', file);
        const res = await fetch('/api/upload', {
          method: 'POST',
          body: formData,
        });
        return res.json();
      })
    )
  );

  console.log(`上传完成: ${results.length} 个文件`);
  return results;
}

带进度的批量操作

const limit = pLimit(10);

async function batchProcess(items, onProgress) {
  let completed = 0;

  const results = await Promise.all(
    items.map(item =>
      limit(async () => {
        const result = await processItem(item);
        completed++;
        onProgress({
          completed,
          total: items.length,
          active: limit.activeCount,
          pending: limit.pendingCount,
        });
        return result;
      })
    )
  );

  return results;
}

await batchProcess(items, (progress) => {
  console.log(`进度: ${progress.completed}/${progress.total}`);
  console.log(`活跃: ${progress.active}, 等待: ${progress.pending}`);
});

扩展:相关工具

p-queue:更强大的队列

p-limit 只做并发控制,p-queue 提供了更丰富的功能:

import PQueue from 'p-queue';

const queue = new PQueue({
  concurrency: 5,
  interval: 1000,       // 每 1000ms 的间隔内
  intervalCap: 10,      // 最多启动 10 个任务(限速)
  timeout: 30000,       // 单个任务超时时间
});

queue.on('active', () => {
  console.log(`活跃: ${queue.pending} 个等待中`);
});

// 添加任务
await queue.add(() => fetchData());

// 添加优先级更高的任务
await queue.add(() => urgentTask(), { priority: 1 });

// 等待所有任务完成
await queue.onIdle();

// 暂停/恢复
queue.pause();
queue.start();

p-retry:自动重试

import pRetry from 'p-retry';

const result = await pRetry(
  async () => {
    const res = await fetch('/api/data');
    if (!res.ok) throw new Error(`HTTP ${res.status}`);
    return res.json();
  },
  {
    retries: 3,
    onFailedAttempt: (error) => {
      console.log(`第 ${error.attemptNumber} 次失败,还剩 ${error.retriesLeft} 次重试`);
    },
  }
);

p-limit + p-retry 组合使用

const limit = pLimit(5);

async function fetchWithRetry(url) {
  return limit(() =>
    pRetry(() => fetch(url).then(r => r.json()), { retries: 3 })
  );
}

const results = await Promise.all(
  urls.map(url => fetchWithRetry(url))
);

常见误区

误区1:用 for 循环 + await 实现”并发控制”

// ❌ 这不是并发控制,这是串行执行!
async function fetchAll(urls) {
  const results = [];
  for (const url of urls) {
    const res = await fetch(url);        // 一个一个地等
    results.push(await res.json());
  }
  return results;
}

这是串行执行,100 个请求一个接一个,速度极慢。并发控制是让多个请求同时进行,但限制”同时”的数量。

误区2:用 Promise.all 分批执行就够了

// ⚠️ 分批执行,但有问题
async function fetchInBatches(urls, batchSize) {
  const results = [];
  for (let i = 0; i < urls.length; i += batchSize) {
    const batch = urls.slice(i, i + batchSize);
    const batchResults = await Promise.all(batch.map(url => fetch(url)));
    results.push(...batchResults);
  }
  return results;
}

这种方式的问题是:每一批中最慢的那个请求会”拖后腿”。比如一批 5 个请求,4 个 100ms 就完成了,1 个要 2 秒——那 4 个完成的”槽位”就白白浪费了 1.9 秒。

p-limit 的做法是:一个任务完成就立即启动下一个,没有”等一批”的空窗期。

误区3:activeCount 不需要在 catch 里减少

// ❌ 如果 fn 抛出错误,activeCount 永远不会减少
const run = async (fn, resolve, args) => {
  activeCount++;
  const result = await fn(...args);
  activeCount--;   // 如果 fn 抛错,这行不会执行
  next();
};

正确做法是用 try/catch/finally 或像 p-limit 源码那样 try { await result } catch {} next(),确保无论成功失败都会调用 next()

误区4:并发数设置得越大越好

并发数不是越大越好:

  • 太小(如 1):退化为串行,速度慢
  • 太大(如 100):接近不限制,失去控制意义
  • 合理范围:HTTP 请求通常 5-20,文件 I/O 通常 3-10,CPU 密集型通常等于 CPU 核心数

小结

p-limit 看似简单,但它背后的设计思想——队列调度 + 信号量机制——是并发编程的基础模式。

核心要点

  1. 并发控制 ≠ 串行执行:并发控制是限制”同时执行的数量”,不是一个一个排队
  2. 核心组件:计数器(activeCount)+ 队列(queue)+ 调度器(next)
  3. resolve(promise):利用 Promise 的”跟随”特性,把内部 Promise 的状态传递给外部
  4. 分批 vs 池化:p-limit 用的是池化模型,一个完成就补一个,没有”等一批”的空窗
  5. 错误处理:必须确保无论任务成功还是失败,都要 activeCount— 并调度下一个

本章思维导图

手写 p-limit
  • 需求场景
    • 批量请求(避免并发过高)
    • 文件上传(服务器限流)
    • 爬虫采集(控制连接数)
  • API 设计
    • pLimit(concurrency) → limit 函数
    • limit(fn, ...args) → Promise
    • limit.activeCount(当前执行数)
    • limit.pendingCount(队列长度)
    • limit.clearQueue()(清空队列)
  • 核心实现
    • 计数器 activeCount
    • 队列 queue(FIFO)
    • enqueue:入队 + 判断是否启动
    • run:执行 + resolve + next
    • next:activeCount-- + 取下一个
  • 关键技巧
    • resolve(promise) 状态跟随
    • async IIFE 包装非 async 函数
    • queueMicrotask 确保顺序
  • 相关工具
    • p-queue(优先级、限速、暂停)
    • p-retry(自动重试)
    • p-all(简化批量用法)

练习挑战

挑战一:基础(⭐)

p-limit 的思路,实现一个简化版的并发控制函数。只需要支持基本功能:

const limit = createLimit(2);

limit(() => delay(1000).then(() => console.log('A'))); // 立即开始
limit(() => delay(500).then(() => console.log('B')));  // 立即开始
limit(() => delay(300).then(() => console.log('C')));  // 排队等待
答案与解析
function createLimit(concurrency) {
  const queue = [];
  let active = 0;

  function next() {
    active--;
    if (queue.length > 0) {
      queue.shift()();
    }
  }

  return function(fn) {
    return new Promise((resolve, reject) => {
      const run = async () => {
        active++;
        try {
          const result = await fn();
          resolve(result);
        } catch (err) {
          reject(err);
        }
        next();
      };

      if (active < concurrency) {
        run();
      } else {
        queue.push(run);
      }
    });
  };
}

function delay(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

这是最简版本,只有 20 行左右。核心就是:能执行就执行,不能执行就放队列,执行完调 next

挑战二:进阶(⭐⭐)

给上面的实现添加 activeCountpendingCountclearQueue 功能。然后实现一个带进度回调的批量执行函数:

async function batchRun(tasks, concurrency, onProgress) {
  // tasks: Array<() => Promise<any>>
  // concurrency: 并发数
  // onProgress: ({ completed, total }) => void
}
答案与解析
function createLimit(concurrency) {
  const queue = [];
  let active = 0;

  function next() {
    active--;
    if (queue.length > 0) {
      queue.shift()();
    }
  }

  const limit = function(fn) {
    return new Promise((resolve, reject) => {
      const run = async () => {
        active++;
        try {
          resolve(await fn());
        } catch (err) {
          reject(err);
        }
        next();
      };

      if (active < concurrency) {
        run();
      } else {
        queue.push(run);
      }
    });
  };

  Object.defineProperties(limit, {
    activeCount: { get: () => active },
    pendingCount: { get: () => queue.length },
    clearQueue: { value: () => { queue.length = 0; } },
  });

  return limit;
}

async function batchRun(tasks, concurrency, onProgress) {
  const limit = createLimit(concurrency);
  let completed = 0;

  return Promise.all(
    tasks.map(task =>
      limit(async () => {
        const result = await task();
        completed++;
        onProgress?.({ completed, total: tasks.length });
        return result;
      })
    )
  );
}

// 使用
await batchRun(
  urls.map(url => () => fetch(url)),
  5,
  ({ completed, total }) => console.log(`${completed}/${total}`)
);

挑战三:综合(⭐⭐⭐)

实现一个支持超时和重试的增强版 p-limit

const limit = createAdvancedLimit({
  concurrency: 3,
  timeout: 5000,      // 单个任务超时 5s
  retries: 2,         // 失败重试 2 次
  onTaskComplete: (result) => console.log('完成:', result),
  onTaskError: (error) => console.error('失败:', error),
});
答案与解析
function createAdvancedLimit(options) {
  const {
    concurrency,
    timeout = 0,
    retries = 0,
    onTaskComplete,
    onTaskError,
  } = options;

  const queue = [];
  let active = 0;

  function next() {
    active--;
    if (queue.length > 0) {
      queue.shift()();
    }
  }

  async function executeWithTimeout(fn, ms) {
    if (!ms) return fn();

    return Promise.race([
      fn(),
      new Promise((_, reject) =>
        setTimeout(() => reject(new Error('Task timeout')), ms)
      ),
    ]);
  }

  async function executeWithRetry(fn, retriesLeft) {
    try {
      return await executeWithTimeout(fn, timeout);
    } catch (err) {
      if (retriesLeft > 0) {
        return executeWithRetry(fn, retriesLeft - 1);
      }
      throw err;
    }
  }

  const limit = function(fn) {
    return new Promise((resolve, reject) => {
      const run = async () => {
        active++;
        try {
          const result = await executeWithRetry(fn, retries);
          onTaskComplete?.(result);
          resolve(result);
        } catch (err) {
          onTaskError?.(err);
          reject(err);
        }
        next();
      };

      if (active < concurrency) {
        run();
      } else {
        queue.push(run);
      }
    });
  };

  Object.defineProperties(limit, {
    activeCount: { get: () => active },
    pendingCount: { get: () => queue.length },
    clearQueue: { value: () => { queue.length = 0; } },
  });

  return limit;
}

核心是把 timeout 和 retry 分成两个独立的函数组合起来。executeWithTimeoutPromise.race 实现超时,executeWithRetry 用递归实现重试。这种组合设计是工程中的常见模式。

自我检测

读完本章后,确认你能回答以下问题:

  • 能说出为什么需要并发控制(浏览器限制、服务器限流、内存占用)
  • 能描述 p-limit 的 API 设计(limit 函数、activeCount、pendingCount)
  • 能手写简化版 p-limit(队列 + 计数器 + next 调度)
  • 能解释 resolve(promise) 的”状态跟随”机制
  • 能解释为什么”分批执行”不如”池化模型”高效
  • 能说出 for-await 串行和 Promise.all 全并发的区别
  • 知道并发数应该怎么合理设置(5-20 for HTTP, 3-10 for I/O)
  • 了解 p-queue、p-retry 等相关工具的用途

购买课程解锁全部内容

大厂前端面试通关:71 篇构建完整知识体系

¥89.90