Python自定义进程池实例分析【生产者、消费者模型问题】

这篇文章主要介绍了Python自定义进程池,结合实例分析了Python使用自定义进程池实现的生产者、消费者模型问题,需要的朋友可以参考下

本文实例分析了Python自定义进程池。分享给大家供大家参考,具体如下:

代码说明一切:

 #encoding=utf-8 #author: walker #date: 2014-05-21 #function: 自定义进程池遍历目录下文件 from multiprocessing import Process, Queue, Lock import time, os #消费者 class Consumer(Process): def __init__(self, queue, ioLock): super(Consumer, self).__init__() self.queue = queue self.ioLock = ioLock def run(self): while True: task = self.queue.get()  #队列中无任务时,会阻塞进程 if isinstance(task, str) and task == 'quit': break; time.sleep(1)  #假定任务处理需要1秒钟 self.ioLock.acquire() print( str(os.getpid()) + ' ' + task) self.ioLock.release() self.ioLock.acquire() print 'Bye-bye' self.ioLock.release() #生产者 def Producer(): queue = Queue()  #这个队列是进程/线程安全的 ioLock = Lock() subNum = 4  #子进程数量 workers = build_worker_pool(queue, ioLock, subNum) start_time = time.time() for parent, dirnames, filenames in os.walk(r'D:\test'): for filename in filenames: queue.put(filename) ioLock.acquire() print('qsize:' + str(queue.qsize())) ioLock.release() while queue.qsize() > subNum * 10: #控制队列中任务数量 time.sleep(1) for worker in workers: queue.put('quit') for worker in workers: worker.join() ioLock.acquire() print('Done! Time taken: {}'.format(time.time() - start_time)) ioLock.release() #创建进程池 def build_worker_pool(queue, ioLock, size): workers = [] for _ in range(size): worker = Consumer(queue, ioLock) worker.start() workers.append(worker) return workers if __name__ == '__main__': Producer() 

ps:

 self.ioLock.acquire() ... self.ioLock.release() 

可用:

 with self.ioLock: ... 

替代。

再来一个好玩的例子:

 #encoding=utf-8 #author: walker #date: 2016-01-06 #function: 一个多进程的好玩例子 import os, sys, time from multiprocessing import Pool cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__)) g_List = ['a'] #修改全局变量g_List def ModifyDict_1(): global g_List g_List.append('b') #修改全局变量g_List def ModifyDict_2(): global g_List g_List.append('c') #处理一个 def ProcOne(num): print('ProcOne ' + str(num) + ', g_List:' + repr(g_List)) #处理所有 def ProcAll(): pool = Pool(processes = 4) for i in range(1, 20): #ProcOne(i) #pool.apply(ProcOne, (i,)) pool.apply_async(ProcOne, (i,)) pool.close() pool.join() ModifyDict_1() #修改全局变量g_List if __name__ == '__main__': ModifyDict_2() #修改全局变量g_List print('In main g_List :' + repr(g_List)) ProcAll() 

Windows7 下运行的结果:

 λ python3 demo.py In main g_List :['a', 'b', 'c'] ProcOne 1, g_List:['a', 'b'] ProcOne 2, g_List:['a', 'b'] ProcOne 3, g_List:['a', 'b'] ProcOne 4, g_List:['a', 'b'] ProcOne 5, g_List:['a', 'b'] ProcOne 6, g_List:['a', 'b'] ProcOne 7, g_List:['a', 'b'] ProcOne 8, g_List:['a', 'b'] ProcOne 9, g_List:['a', 'b'] ProcOne 10, g_List:['a', 'b'] ProcOne 11, g_List:['a', 'b'] ProcOne 12, g_List:['a', 'b'] ProcOne 13, g_List:['a', 'b'] ProcOne 14, g_List:['a', 'b'] ProcOne 15, g_List:['a', 'b'] ProcOne 16, g_List:['a', 'b'] ProcOne 17, g_List:['a', 'b'] ProcOne 18, g_List:['a', 'b'] ProcOne 19, g_List:['a', 'b'] 

Ubuntu 14.04下运行的结果:

 In main g_List :['a', 'b', 'c'] ProcOne 1, g_List:['a', 'b', 'c'] ProcOne 2, g_List:['a', 'b', 'c'] ProcOne 3, g_List:['a', 'b', 'c'] ProcOne 5, g_List:['a', 'b', 'c'] ProcOne 4, g_List:['a', 'b', 'c'] ProcOne 8, g_List:['a', 'b', 'c'] ProcOne 9, g_List:['a', 'b', 'c'] ProcOne 7, g_List:['a', 'b', 'c'] ProcOne 11, g_List:['a', 'b', 'c'] ProcOne 6, g_List:['a', 'b', 'c'] ProcOne 12, g_List:['a', 'b', 'c'] ProcOne 13, g_List:['a', 'b', 'c'] ProcOne 10, g_List:['a', 'b', 'c'] ProcOne 14, g_List:['a', 'b', 'c'] ProcOne 15, g_List:['a', 'b', 'c'] ProcOne 16, g_List:['a', 'b', 'c'] ProcOne 17, g_List:['a', 'b', 'c'] ProcOne 18, g_List:['a', 'b', 'c'] ProcOne 19, g_List:['a', 'b', 'c'] 

可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python URL操作技巧总结》、《Python图片操作技巧总结》、《Python数据结构与算法教程》、《Python Socket编程技巧总结》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总

希望本文所述对大家Python程序设计有所帮助。

以上就是Python自定义进程池实例分析【生产者、消费者模型问题】的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » python