co.js

//让函数并发执行
/**
 * 创建一个具有并发控制和等待时间选项的函数
 * @export
 * @param {function} func - 要协作执行的函数
 * @param {number} [concurrency=1] - 允许的最大并发执行数
 * @param {number} [wt=0] - 每次函数执行后的等待时间(以毫秒为单位)
 * @return {function} - 返回一个新的函数,该函数接受原始函数的参数并返回一个promise
 */
export function co(func, concurrency=1, wt = 0) {
    let queue   = [];
    let canceled, workers = new Set();
    function finish() {
        if (!canceled && fn.complete) {
            fn.complete();
        }
        fn.finish && fn.finish(canceled);
        canceled = undefined;
    }
    /**
     * 从队列中取出任务并执行它们,直到队列为空
     *
     * @param {Array} queue - 任务队列
     * @param {function} func - 要执行的函数
     */
    async function exec(queue,func) {
        while (queue.length > 0) {
            const {args, resolve, reject} = queue.shift();
            let ret = canceled?undefined:await func(...args);
            resolve(ret);
            if(wt&&!canceled){
                await new Promise(res =>setTimeout(res, wt));
            }
        }
    }
    /**
     * 包装的函数,将参数放入队列并根据并发限制启动新的工作程序
     *
     * @param {...any} args - 传递给原函数的参数
     * @return {Promise} - 一个promise,它将在原函数的结果上解析
     */
    function fn(...args) {
        return new Promise(async (resolve, reject) => {
            queue.push({args, resolve, reject});
            if (workers.size >= concurrency) return;
            let worker=exec(queue,func).then(() =>workers.delete(worker)).finally(()=>workers.size===0&&finish());
            workers.add(worker);
        });
    }
    /**
     * 停止所有当前的和未来的执行
     */
    fn.stop = () => {
        if (queue.length) {
            canceled = true;
            fn.break && fn.break();
        }
    }
    return fn;
}

/**
 * 为Function原型添加一个新的方法,允许使用co函数轻松包装任何函数
 *
 * @param {number} [concurrency=1] - 允许的最大并发执行数
 * @param {number} [waitTime=0] - 每次函数执行后的等待时间(以毫秒为单位)
 * @return {function} - 返回一个新的函数,该函数接受原始函数的参数并返回一个promise
 */
Function.prototype.co = function (concurrency, waitTime = 0) {
    return co(this, concurrency, waitTime);
}
export default co;