nodejs如何高效处理并发任务?

Node可以在不新增额外线程的情况下,依然可以对任务进行并发处理 —— Node.js是单线程的。它通过事件循环(event loop)来实现并发操作,对此,我们应该要充分利用这一点 —— 尽可能的避免阻塞操作,取而代之,多使用非阻塞操作。

nodejs高效处理并发任务的原理:

1、每个Node.js进程只有一个主线程在执行程序代码,形成一个执行栈(execution context stack)。

2、主线程之外,还维护了一个"事件队列"(Event queue)。当用户的网络请求或者其它的异步操作到来时,node都会把它放到Event Queue之中,此时并不会立即执行它,代码也不会被阻塞,继续往下走,直到主线程代码执行完毕。

3、主线程代码执行完毕完成后,然后通过Event Loop,也就是事件循环机制,开始到Event Queue的开头取出第一个事件,从线程池中分配一个线程去执行这个事件。

接下来继续取出第二个事件,再从线程池中分配一个线程去执行,然后第三个,第四个。主线程不断的检查事件队列中是否有未执行的事件,直到事件队列中所有事件都执行完了。

此后每当有新的事件加入到事件队列中,都会通知主线程按顺序取出交EventLoop处理。当有事件执行完毕后,会通知主线程,主线程执行回调,线程归还给线程池。

4、主线程不断重复上面的第三步。

总结:

我们所看到的node.js单线程只是一个js主线程,本质上的异步操作还是由线程池完成的,node将所有的阻塞操作都交给了内部的线程池去实现,本身只负责不断的往返调度,并没有进行真正的I/O操作,从而实现异步非阻塞I/O,这便是node单线程和事件驱动的精髓之处了。

nodejs可以用eventproxy、async.mapLimit、async.queue控制并发:

1.用eventproxy实现控制并发

var EventProxy = require('eventproxy');

const most = 5;//并发数5
var urllist = [....];//待抓取url列表,100个

function foo(start){
    var ep = new EventProxy();
    ep.after('ok',most,function(){
        foo(start+most);//一个批次任务完成,递归进行下一批任务
    });
    var q=0;
    for(var i=start;i<urllist.length;i++){
        if(q>=most){
            break;//最多添加most个任务
        }
        http.get(urllist[i],function(res){
            //....
            res.on('end',function(){
                ep.emit('ok');//一个任务完成,触发一次ok事件
            });
        });
        q++;
    }
}
foo(0);

2.使用 async.mapLimit 控制并发
var async = require('async');

//模拟一组连接地址
var urls = [];
for(var i = 0; i < 30; i++) {
    urls.push('http://datasource_' + i);
}
console.log(urls);

// 并发连接数的计数器
var concurrencyCount = 0;

// 并发抓取数据的过程
var fetchUrl = function (url, callback) {
    // delay 的值在 2000 以内,是个随机的整数
    var delay = parseInt((Math.random() * 10000000) % 2000, 10);
    concurrencyCount++;
    console.log('现在的并发数是', concurrencyCount, ',正在抓取的是', url, ',耗时' + delay + '毫秒');
    setTimeout(function () {
        concurrencyCount--;
        //抓取成功,调用回调函数
        callback(null, url + ' html content');
    }, delay);
};

//使用 async.mapLimit 来 5 个并发抓取,并获取结果
async.mapLimit(urls, 5, function (url, callback) {
    fetchUrl(url, callback);
}, function (err, result) {
    //所有连接抓取成功,返回回调结果列表
    console.log('final:');
    console.log(result);
});

3.使用async.queue 控制并发

"use strict"
var http = require('http');
var cheerio = require('cheerio');
var URL = require('url');
var path = require('path');
var fs = require('fs');
var async = require('async');

var baseUrl = "http://cnodejs.org/";
var targetUrl = "http://cnodejs.org/";
var stime = new Date();

function sGet(url,callback){
  var chunks = [];
  http.get(url,(res)=>{
    if (res.statusCode != '200') {
      callback({message:"抓取失败,状态码:"+res.statusCode,url:url});
      return;
    }
    res.on('data',(chunk)=>{
      chunks.push(chunk);
    });
    res.on('end',()=>{
      callback(null,Buffer.concat(chunks).toString());
    });
  }).on('error',(e)=>{
    callback({message:"抓取失败",url:url,err:e});
  });
}

sGet(targetUrl,(err,data)=>{
  if (err) {
    console.log(err);
    return false;
  }
  var $ = cheerio.load(data);
  var anchors = $("#topic_list a.topic_title");
  console.log('共'+anchors.length+'个任务');

  const most=5;//并发数
    //创建队列并指定并发数
  var q=async.queue(function(url,callback){
    var filename = path.basename(url)+'.txt';
    sGet(url, (err, data)=> {
      if (err) {
        callback(err);
        return false;
      }
      fs.writeFile('./html/' + filename, data, function (err) {
        if (err) {
          throw err;
        }
        callback(null,filename);
      });
    });
  },most);

  q.drain = function() {
    console.log('任务全部完成,共耗时:'+(new Date()-stime)+'ms');
  }

  anchors.each(function(){
    var url = URL.resolve(baseUrl,$(this).attr('href'));
    q.push(url,function(err,filename){
      if (err) {
        console.log(err);
        return;
      }
      console.log("finished:"+filename);
    });
  });
});

以上就是nodejs如何高效处理并发任务?的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » Node.js答疑