Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

基于Node.js异步优先队列2.0来袭 #42

Open
Checkson opened this issue Mar 19, 2022 · 0 comments
Open

基于Node.js异步优先队列2.0来袭 #42

Checkson opened this issue Mar 19, 2022 · 0 comments

Comments

@Checkson
Copy link
Owner

前言

阔别Web前端3年多时间里面,今天我重新抽空整理了一下之前的一个开源库:priority-async-queue,一个基于Node.js的异步优先任务队列。不了解1.0版本的同学可以戳这里,我们今天主要围绕着paq的设计初衷和适用场景展开来说。

相比1.0,2.0改进了什么?

改进的主要有三点:

  • 1.0只支持单并发状态执行任务队列,2.0则加入了支持多并发执行状态,具体并发数,用户可以自行设置。
  • 2.0为每个任务加入了执行时间的统计,具体包括任务创建时间creatTime,任务开始执行时间startTime,和任务执行结束时间endTime。用户可以自己根据需求计算所需的任务等待时间:startTime - createTime,或者任务执行消耗的时间endTime - startTime,又或者只是用来写入日志文件,记录执行时间相关信息。
  • 1.0只支持在Node.js环境下执行,2.0则提供支持浏览器可执行的输出文件。让浏览器端遇到同样困扰的同学,能够迎刃而解。

paq设计思路

paq设计思路其实非常简单,一共就4个类组成:

  • Task 是描述每个待执行(异步/同步)任务的执行逻辑以及配置参数。
  • PriorityQueue 是控制每个待执行(异步/同步)任务的优先级队列、具有队列的基本属性和作。
  • AsyncQueue 是控制每个待执行(异步/同步)任务能严格一定顺序地执行的队列。
  • Event 模拟事件监听和事件触发的类。

2.0为了兼容浏览器环境能正常运行,去掉了对Node.js原生事件类EventEmitter的依赖,自己实现了简易的事件绑定和触发的功能。

下面是paq 2.0的程序流程图:

paq2 0

paq的设计初衷

我刚转岗到游戏开发的时候,部门迫切需要一个集群打包系统来处理庞大的打包业务。当时,我临危受命,接下了这个任务。后来,我开发的集群打包系统,其打包采用的任务调度,最核心底层代码架构近似于paq。当然,实际应用会比paq复杂很多,因为游戏打包流程是一个极其复杂而繁琐的过程,我只是抽离了最核心通用的调度思路来开源成一个通用库。

试想,如果一台打包机只能在同一时间执行单个打包任务,那么就太浪费硬件资源了。但受限于CPU核数、硬盘空间、内存容量、数据读写速度等因素,我们又不能粗暴地向打包机里加入并发执行的打包任务,所以这个时候,能控制好每台打包机的并发数显得尤为重要。既要保证效率,也要保证安全可靠。

paq适用场景

首先,我们必须明确的是,在绝大部分业务场景里面,你可能不需要paq。没设置并发数的paq,其任务默认是严格按照顺序执行,并发数始终维持为1,这在绝大部份情况已经降低执行效率。JavaScript原生支持的异步任务和事件循环,本来就是要充分发挥在单线程执行环境下,最大限度利用CPU多核的特性,从而提高程序执行效率。

可能不少同学都已经知道,在浏览器端,最大并发请求数,每个浏览器厂商都做了一定的限制,如:Chrome允许的最大并发请求数目为6,FireFox是4,每个浏览器版本之间又会存在一定的差异。总而言之,主流浏览器在网络请求方面已经帮我们做好了负载均衡的工作了。而在Node.js环境下,负载均衡问题则需要我们开发者自己来解决。

如果,在短时间内,一大批客户端产生大量的网络请求时候,服务器的承受能力肯定是有限的。这个时候,需要我们用一个像队列的数据结构容器来先存好这些请求,然后按照先进先出的原则来慢慢提供给服务端处理,压力会减少很多。说到这里,很多有服务端经验的同学,第一时间就会想到消息队列。没错,paq很像消息队列,但它没有遵守生产者消费者模式。所以paq不能单独处理分布式和集群业务的调度,它更适合放在MQ的下游。

paq特点

1.paq更小、更易用。

paq有效源码大概200行左右,Node.js环境下是非常精小的。但在浏览器端,打包压缩后的paq也有18KB,主要来自是ES6语法兼容性代码的冗余。

做前端开发者,无论是Web、移动端原生和游戏开发,最折磨的莫过于要兼容各种用户终端运行的环境和设备。

下面是paq最基础的用法,开箱即用:

const PAQ = require('priority-async-queue');
const paq = new PAQ();

paq.addTask(() => {
  console.log('Helo World!');
});

// Hello World!

接着,我们来看看字节的一道经典面试题。

class Scheduler {
  add(promiseCreator) {
    // 完善Scheduler,使其并发数为2
  }
}

const timeout = (time) => new Promise(resolve => {
  setTimeout(resolve, time);
})

const scheduler = new Scheduler();

const addTask = (time, order) => {
  scheduler.add(() => timeout(time)).then(() => console.log(order));
}

addTask(1000, 1);
addTask(500, 2);
addTask(300, 3);
addTask(400, 4);

// 要求输出顺序
// 2
// 3
// 1
// 4

大家可以稍加思考一下,怎么扩展 Scheduler 类能完成需求。如果见过或者已经知道怎么做的同学不妨看看用paq怎么轻松实现这个需求。

const PAQ = require('priority-async-queue');
// 实例化paq时,使其并发数为2
const paq = new PAQ(2);

class Scheduler {
  add(promiseCreator) {
    return new Promise(resolve => {
      paq.addTask({
        completed: (ctx, res) => {
          resolve(res);
        }
      }, () => promiseCreator());
    });
  }
}

...

至于,不借助paq又怎么实现这个需求呢?有兴趣的同学,可以在评论区分享自己的实现方式。

2.paq更贴合Node.js开发习惯

const PAQ = require('priority-async-queue');
const paq = new PAQ();

// 链式调用结构
paq.addTask(() => {
  console.log('one');
}).addTask(() => {
  console.log('two');
}).addTask(() => {
  console.log('three');
});

// one
// two
// three

// 支持原生async和promise等异步操作
paq.addTask(() => {
  return new Promise(resolve => {
    paq.sleep(1000).then(() => {
      console.log('sleep 1s');
      resolve();
    });
  });
});
paq.addTask(async () => {
  await paq.sleep(1000).then(() => {
    console.log('sleep 1s too');
  });
});

// sleep 1s
// sleep 1s too

3.使用灵活

只要paq设置的并发数足够大,或者和处理业务峰值相当,那么它就能近似Promise.all那样无限制并发执行,但是paq不会等所有任务都完成后才进行下一步操作。

const PAQ = require('priority-async-queue');
// 并发上限设置足够大
const paq = new PAQ(20);

const p1 = () => paq.sleep(1000).then(() => Promise.resolve('p1'));
const p2 = () => paq.sleep(1000).then(() => Promise.resolve('p2'));
const p3 = () => paq.sleep(1000).then(() => Promise.resolve('p3'));

paq.addTask(p1).addTask(p2).addTask(p3).on('completed', (opt, result) => {
  console.log(result);
});

Promise.all([p1(), p2(), p3()]).then(res => {
  console.log(res);
});

// p1
// p2
// p3
// [ 'p1', 'p2', 'p3' ]

如果paq只处理第一个返回状态的任务,则它的用法接近Promise.race的用法了。

const PAQ = require('priority-async-queue');
// 并发上限设置足够大
const paq = new PAQ(100);

const p1 = () => paq.sleep(3000).then(() => Promise.resolve('p1'));
const p2 = () => paq.sleep(2000).then(() => Promise.resolve('p2'));
const p3 = () => paq.sleep(1000).then(() => Promise.resolve('p3'));

let isFirst = false;
paq.addTask(p1).addTask(p2).addTask(p3).on('completed', (opt, result) => {
  if (!isFirst) {
    // TODO 只处理第一个改变状态的任务
    console.log('paq: ' + result);
    isFirst = true;
  }
});

Promise.race([p1(), p2(), p3()]).then(res => {
  console.log('race: ' + res);
});

// paq: p3
// race: p3

paq近似Promise.allSettledPromise.any的用法我就不再展开了。个人认为,日常开发中能用原生实现的,尽量用原生实现,本文只是起介绍作用,不构成使用建议。

学海无涯,剑圣说过:“真正的大师永远都怀着一颗学徒的心”,所以被人称作:“易大师”。我们这些普通人如果坚持学习,虽然最后可能成不了大师,但起码不会摆烂吧?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant