JS 实现请求调度器

作者:孟陬 时间:2024-04-22 22:37:24 

目录
  • 抽象和复用

    • 串行

    • 分段串行,段中并行

  • 总结

    前言:JS 天然支持并行请求,但与此同时会带来一些问题,比如会造成目标服务器压力过大,所以本文引入“请求调度器”来节制并发度。

    TLDR; 直接跳转『抽象和复用』章节。

    为了获取一批互不依赖的资源,通常从性能考虑可以用 Promise.all(arrayOfPromises)来并发执行。比如我们已有 100 个应用的 id,需求是聚合所有应用的 PV,我们通常会这么写:


    const ids = [1001, 1002, 1003, 1004, 1005];
    const urlPrefix = 'http://opensearch.example.com/api/apps';

    // fetch 函数发送 HTTP 请求,返回 Promise
    const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch);

    Promise.all(appPromises)
    // 通过 reduce 做累加
    .then(apps => apps.reduce((initial, current) => initial + current.pv, 0))
    .catch((error) => console.log(error));

    上面的代码在应用个数不多的情况下,可以运行正常。当应用个数达到成千上万时,对支持并发数不是很好的系统,你的「压测」会把第三放服务器搞挂,暂时无法响应请求:


    <html>
    <head><title>502 Bad Gateway</title></head>
    <body bgcolor="white">
    <center><h1>502 Bad Gateway</h1></center>
    <hr><center>nginx/1.10.1</center>
    </body>
    </html>

    如何解决呢?

    一个很自然的想法是,既然不支持这么多的并发请求,那就分割成几大块,每块为一个 chunkchunk 内部的请求依然并发,但块的大小(chunkSize)限制在系统支持的最大并发数以内。前一个 chunk 结束后一个 chunk 才能继续执行,也就是说 chunk 内部的请求是并发的,但 chunk 之间是串行的。思路其实很简单,写起来却有一定难度。总结起来三个操作:分块、串行、聚合

    难点在如何串行执行 Promise,Promise 仅提供了并行(Promise.all)功能,并没有提供串行功能。我们从简单的三个请求开始,看如何实现,启发式解决问题(heuristic)。


    // task1, task2, task3 是三个返回 Promise 的工厂函数,模拟我们的异步请求
    const task1 = () => new Promise((resolve) => {
    setTimeout(() => {
    resolve(1);
    console.log('task1 executed');
    }, 1000);
    });

    const task2 = () => new Promise((resolve) => {
    setTimeout(() => {
    resolve(2);
    console.log('task2 executed');
    }, 1000);
    });

    const task3 = () => new Promise((resolve) => {
    setTimeout(() => {
    resolve(3);
    console.log('task3 executed');
    }, 1000);
    });

    // 聚合结果
    let result = 0;

    const resultPromise = [task1, task2, task3].reduce((current, next) =>  
    current.then((number) => {
    console.log('resolved with number', number); // task2, task3 的 Promise 将在这里被 resolve
    result += number;

    return next();
    }),

    Promise.resolve(0)) // 聚合初始值

    .then(function(last) {
    console.log('The last promise resolved with number', last); // task3 的 Promise 在这里被 resolve

    result += last;

    console.log('all executed with result', result);

    return Promise.resolve(result);
    });

    运行结果如图 1:

    JS 实现请求调度器

    代码解析:我们想要的效果,直观展示其实是 fn1().then(() => fn2()).then(() => fn3())。上面代码能让一组 Promise 按顺序执行的关键之处就在 reduce 这个“引擎”在一步步推动 Promise 工厂函数的执行。

    难点解决了,我们看看最终代码:


    /**
    * 模拟 HTTP 请求
    * @param {String} url
    * @return {Promise}
    */
    function fetch(url) {
    console.log(`Fetching ${url}`);
    return new Promise((resolve) => {
    setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000);
    });
    }

    const urlPrefix = 'http://opensearch.example.com/api/apps';

    const aggregator = {
    /**
    * 入口方法,开启定时任务
    *
    * @return {Promise}
    */
    start() {
    return this.fetchAppIds()
    .then(ids => this.fetchAppsSerially(ids, 2))
    .then(apps => this.sumPv(apps))
    .catch(error => console.error(error));
    },

    /**
    * 获取所有应用的 ID
    *
    * @private
    *
    * @return {Promise}
    */
    fetchAppIds() {
    return Promise.resolve([1001, 1002, 1003, 1004, 1005]);
    },

    promiseFactory(ids) {
    return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch));
    },

    /**
    * 获取所有应用的详情
    *
    * 一次并发请求 `concurrency` 个应用,称为一个 chunk
    * 前一个 `chunk` 并发完成后一个才继续,直至所有应用获取完毕
    *
    * @private
    *
    * @param {[Number]} ids
    * @param {Number} concurrency 一次并发的请求数量
    * @return {[Object]}  所有应用的信息
    */
    fetchAppsSerially(ids, concurrency = 100) {
    // 分块
    let chunkOfIds = ids.splice(0, concurrency);
    const tasks = [];

    while (chunkOfIds.length !== 0) {
    tasks.push(this.promiseFactory(chunkOfIds));
    chunkOfIds = ids.splice(0, concurrency);
    }

    // 按块顺序执行
    const result = [];
    return tasks.reduce((current, next) => current.then((chunkOfApps) => {
    console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n');
    result.push(...chunkOfApps); // 拍扁数组
    return next();
    }), Promise.resolve([]))
    .then((lastchunkOfApps) => {
    console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n');

    result.push(...lastchunkOfApps); // 再次拍扁它
    console.info('All chunks has been executed with result', result);
    return result;
    });
    },

    /**
    * 聚合所有应用的 PV
    *
    * @private
    *
    * @param {[]} apps
    * @return {[type]} [description]
    */
    sumPv(apps) {
    const initial = { pv: 0 };

    return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial);
    }
    };

    // 开始运行
    aggregator.start().then(console.log);

    运行结果如图 2:

    JS 实现请求调度器

    抽象和复用

    目的达到了,因具备通用性,下面开始抽象成一个模式以便复用。

    串行

    先模拟一个 http get 请求。


    /**
    * mocked http get.
    * @param {string} url
    * @returns {{ url: string; delay: number; }}
    */
    function httpGet(url) {
    const delay = Math.random() * 1000;

    console.info('GET', url);

    return new Promise((resolve) => {
    setTimeout(() => {
    resolve({
    url,
    delay,
    at: Date.now()
    })
    }, delay);
    })
    }

    串行执行一批请求。


    const ids = [1, 2, 3, 4, 5, 6, 7];

    // 批量请求函数,注意是 delay 执行的『函数』对了,否则会立即将请求发送出去,达不到串行的目的
    const httpGetters = ids.map(id =>
    () => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`)
    );

    // 串行执行之
    const tasks = await httpGetters.reduce((acc, cur) => {
    return acc.then(cur);

    // 简写,等价于
    // return acc.then(() => cur());
    }, Promise.resolve());

    tasks.then(() => {
    console.log('done');
    });

    注意观察控制台输出,应该串行输出以下内容:


    GET https://jsonplaceholder.typicode.com/posts/1
    GET https://jsonplaceholder.typicode.com/posts/2
    GET https://jsonplaceholder.typicode.com/posts/3
    GET https://jsonplaceholder.typicode.com/posts/4
    GET https://jsonplaceholder.typicode.com/posts/5
    GET https://jsonplaceholder.typicode.com/posts/6
    GET https://jsonplaceholder.typicode.com/posts/7

    分段串行,段中并行

    重点来了。本文的请求调度器实现


    /**
    * Schedule promises.
    * @param {Array<(...arg: any[]) => Promise<any>>} factories
    * @param {number} concurrency
    */
    function schedulePromises(factories, concurrency) {
    /**
    * chunk
    * @param {any[]} arr
    * @param {number} size
    * @returns {Array<any[]>}
    */
    const chunk = (arr, size = 1) => {
    return arr.reduce((acc, cur, idx) => {
    const modulo = idx % size;

    if (modulo === 0) {
    acc[acc.length] = [cur];
    } else {
    acc[acc.length - 1].push(cur);
    }

    return acc;
    }, [])
    };

    const chunks = chunk(factories, concurrency);

    let resps = [];

    return chunks.reduce(
    (acc, cur) => {
    return acc
    .then(() => {
     console.log('---');
     return Promise.all(cur.map(f => f()));
    })
    .then((intermediateResponses) => {
     resps.push(...intermediateResponses);

    return resps;
    })
    },

    Promise.resolve()
    );
    }

    测试下,执行调度器:


    // 分段串行,段中并行
    schedulePromises(httpGetters, 3).then((resps) => {
    console.log('resps:', resps);
    });

    控制台输出:


    ---
    GET https://jsonplaceholder.typicode.com/posts/1
    GET https://jsonplaceholder.typicode.com/posts/2
    GET https://jsonplaceholder.typicode.com/posts/3
    ---
    GET https://jsonplaceholder.typicode.com/posts/4
    GET https://jsonplaceholder.typicode.com/posts/5
    GET https://jsonplaceholder.typicode.com/posts/6
    ---
    GET https://jsonplaceholder.typicode.com/posts/7

    resps: [
    {
    "url": "https://jsonplaceholder.typicode.com/posts/1",
    "delay": 733.010980640727,
    "at": 1615131322163
    },
    {
    "url": "https://jsonplaceholder.typicode.com/posts/2",
    "delay": 594.5056229848931,
    "at": 1615131322024
    },
    {
    "url": "https://jsonplaceholder.typicode.com/posts/3",
    "delay": 738.8230109146299,
    "at": 1615131322168
    },
    {
    "url": "https://jsonplaceholder.typicode.com/posts/4",
    "delay": 525.4604386109747,
    "at": 1615131322698
    },
    {
    "url": "https://jsonplaceholder.typicode.com/posts/5",
    "delay": 29.086379722201183,
    "at": 1615131322201
    },
    {
    "url": "https://jsonplaceholder.typicode.com/posts/6",
    "delay": 592.2345027398272,
    "at": 1615131322765
    },
    {
    "url": "https://jsonplaceholder.typicode.com/posts/7",
    "delay": 513.0684467560949,
    "at": 1615131323284
    }
    ]

    总结

    1. 如果并发请求的数量太大,可以考虑分块串行,块中请求并发。

    2. 问题看似复杂,不放先简化之,然后一步步推导出关键点,最后抽象,就能找到解决方案。

    3. 本文的精髓在于使用 reduce 作为串行推动的引擎,故掌握其对我们日常开发遇到的迷局破解可提供新思路,reduce 精通见上篇 你终于用 Reduce 了 🎉。

    来源:https://juejin.cn/post/6936859831060037668

    标签:js,请求,调度器
    0
    投稿

    猜你喜欢

  • Python中的fileinput模块的简单实用示例

    2023-06-19 01:09:27
  • Python 面向对象之类class和对象基本用法示例

    2023-01-02 20:15:01
  • Python获取对象属性的几种方式小结

    2022-05-27 18:09:42
  • sqlserver 临时表 Vs 表变量 详细介绍

    2011-11-03 17:34:10
  • javascript实现无缝上下滚动特效

    2024-05-11 09:35:08
  • Python函数高级(命名空间、作用域、装饰器)

    2022-03-15 23:31:44
  • 使用xshell实现代理功能并navicat for MySQL 进行测试

    2024-01-23 19:59:05
  • Python实现的文本简单可逆加密算法示例

    2023-06-05 07:35:11
  • HTTP状态码

    2009-09-21 12:46:00
  • OpenCV半小时掌握基本操作之模板匹配

    2022-05-03 10:36:41
  • 推荐4个原生javascript常用的函数

    2024-02-23 09:05:41
  • 如何使用json在前后台进行数据传输实例介绍

    2024-05-03 15:03:56
  • Python Opencv使用ann神经网络识别手写数字功能

    2023-11-03 02:44:52
  • 浅谈Keras的Sequential与PyTorch的Sequential的区别

    2021-08-24 07:53:35
  • 详解Python字典的操作

    2023-07-10 11:20:50
  • Python OpenCV实现3种滤镜效果实例

    2021-06-04 10:20:27
  • vue实现输入框的模糊查询的示例代码(节流函数的应用场景)

    2024-05-08 09:33:35
  • 下载golang.org/x包的操作方法

    2023-07-11 16:54:04
  • python办公自动化之excel的操作

    2023-11-20 14:01:01
  • 浅谈如何使用python抓取网页中的动态数据实现

    2021-10-19 08:47:18
  • asp之家 网络编程 m.aspxhome.com