nodejs高效处理并发任务的原理:
1、每个Node.js进程只有一个主线程在执行程序代码,形成一个执行栈(execution context stack)。
2、主线程之外,还维护了一个"事件队列"(Event queue)。当用户的网络请求或者其它的异步操作到来时,node都会把它放到Event Queue之中,此时并不会立即执行它,代码也不会被阻塞,继续往下走,直到主线程代码执行完毕。
3、主线程代码执行完毕完成后,然后通过Event Loop,也就是事件循环机制,开始到Event Queue的开头取出第一个事件,从线程池中分配一个线程去执行这个事件。
接下来继续取出第二个事件,再从线程池中分配一个线程去执行,然后第三个,第四个。主线程不断的检查事件队列中是否有未执行的事件,直到事件队列中所有事件都执行完了。
此后每当有新的事件加入到事件队列中,都会通知主线程按顺序取出交EventLoop处理。当有事件执行完毕后,会通知主线程,主线程执行回调,线程归还给线程池。
4、主线程不断重复上面的第三步。
总结:
我们所看到的node.js单线程只是一个js主线程,本质上的异步操作还是由线程池完成的,node将所有的阻塞操作都交给了内部的线程池去实现,本身只负责不断的往返调度,并没有进行真正的I/O操作,从而实现异步非阻塞I/O,这便是node单线程和事件驱动的精髓之处了。
nodejs可以用eventproxy、async.mapLimit、async.queue控制并发:
1.用eventproxy实现控制并发 var EventProxy = require('eventproxy'); const most = 5;//并发数5 var urllist = [....];//待抓取url列表,100个 function foo(start){ var ep = new EventProxy(); ep.after('ok',most,function(){ foo(start+most);//一个批次任务完成,递归进行下一批任务 }); var q=0; for(var i=start;i<urllist.length;i++){ if(q>=most){ break;//最多添加most个任务 } http.get(urllist[i],function(res){ //.... res.on('end',function(){ ep.emit('ok');//一个任务完成,触发一次ok事件 }); }); q++; } } foo(0); 2.使用 async.mapLimit 控制并发 var async = require('async'); //模拟一组连接地址 var urls = []; for(var i = 0; i < 30; i++) { urls.push('http://datasource_' + i); } console.log(urls); // 并发连接数的计数器 var concurrencyCount = 0; // 并发抓取数据的过程 var fetchUrl = function (url, callback) { // delay 的值在 2000 以内,是个随机的整数 var delay = parseInt((Math.random() * 10000000) % 2000, 10); concurrencyCount++; console.log('现在的并发数是', concurrencyCount, ',正在抓取的是', url, ',耗时' + delay + '毫秒'); setTimeout(function () { concurrencyCount--; //抓取成功,调用回调函数 callback(null, url + ' html content'); }, delay); }; //使用 async.mapLimit 来 5 个并发抓取,并获取结果 async.mapLimit(urls, 5, function (url, callback) { fetchUrl(url, callback); }, function (err, result) { //所有连接抓取成功,返回回调结果列表 console.log('final:'); console.log(result); }); 3.使用async.queue 控制并发 "use strict" var http = require('http'); var cheerio = require('cheerio'); var URL = require('url'); var path = require('path'); var fs = require('fs'); var async = require('async'); var baseUrl = "http://cnodejs.org/"; var targetUrl = "http://cnodejs.org/"; var stime = new Date(); function sGet(url,callback){ var chunks = []; http.get(url,(res)=>{ if (res.statusCode != '200') { callback({message:"抓取失败,状态码:"+res.statusCode,url:url}); return; } res.on('data',(chunk)=>{ chunks.push(chunk); }); res.on('end',()=>{ callback(null,Buffer.concat(chunks).toString()); }); }).on('error',(e)=>{ callback({message:"抓取失败",url:url,err:e}); }); } sGet(targetUrl,(err,data)=>{ if (err) { console.log(err); return false; } var $ = cheerio.load(data); var anchors = $("#topic_list a.topic_title"); console.log('共'+anchors.length+'个任务'); const most=5;//并发数 //创建队列并指定并发数 var q=async.queue(function(url,callback){ var filename = path.basename(url)+'.txt'; sGet(url, (err, data)=> { if (err) { callback(err); return false; } fs.writeFile('./html/' + filename, data, function (err) { if (err) { throw err; } callback(null,filename); }); }); },most); q.drain = function() { console.log('任务全部完成,共耗时:'+(new Date()-stime)+'ms'); } anchors.each(function(){ var url = URL.resolve(baseUrl,$(this).attr('href')); q.push(url,function(err,filename){ if (err) { console.log(err); return; } console.log("finished:"+filename); }); }); });
以上就是nodejs如何高效处理并发任务?的详细内容,更多请关注0133技术站其它相关文章!