使用concurrent.futures模块并发,实现进程池、线程池
一、关于concurrent.futures模块
Python标准库为我们提供了threading和multiprocessing模块编写相应的异步多线程/多进程代码。从Python3.2开始,标准库为我们提供了concurrent.futures
模块,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
两个类ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。实现了对threading
和multiprocessing
的更高级的抽象,对编写线程池/进程池提供了直接的支持。
concurrent.futures基础模块是executor和future。
concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
Future这个概念相信有java和nodejs下编程经验的朋友肯定不陌生了,你可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。
Executor中定义了submit()
方法,这个方法的作用是提交一个可执行的回调task
,并返回一个future实例。future对象代表的就是给定的调用。
二、submit()
方法实现进程池/线程池
进程池
from concurrent.futures import ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is running' %os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
p=ProcessPoolExecutor() #不填则默认为cpu的个数
l=[]
start=time.time()
for i in range(10):
obj=p.submit(task,i) #submit()方法返回的是一个future实例,要得到结果需要用obj.result()
l.append(obj)
p.shutdown() #类似用from multiprocessing import Pool实现进程池中的close及join一起的作用
print('='*30)
# print([obj for obj in l])
print([obj.result() for obj in l])
print(time.time()-start)
#上面方法也可写成下面的方法
# start = time.time()
# with ProcessPoolExecutor() as p: #类似打开文件,可省去.shutdown()
# future_tasks = [p.submit(task, i) for i in range(10)]
# print('=' * 30)
# print([obj.result() for obj in future_tasks])
# print(time.time() - start)
线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import threading
import os,time,random
def task(n):
print('%s:%s is running' %(threading.currentThread().getName(),os.getpid()))
time.sleep(2)
return n**2
if __name__ == '__main__':
p=ThreadPoolExecutor() #不填则默认为cpu的个数*5
l=[]
start=time.time()
for i in range(10):
obj=p.submit(task,i)
l.append(obj)
p.shutdown()
print('='*30)
print([obj.result() for obj in l])
print(time.time()-start)
#上面方法也可写成下面的方法
# start = time.time()
# with ThreadPoolExecutor() as p: #类似打开文件,可省去.shutdown()
# future_tasks = [p.submit(task, i) for i in range(10)]
# print('=' * 30)
# print([obj.result() for obj in future_tasks])
# print(time.time() - start)
默认为异步执行
#p.submit(task,i).result()即同步执行
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def task(n):
print('%s is running' %os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
p=ProcessPoolExecutor()
start=time.time()
for i in range(10):
res=p.submit(task,i).result()
print(res)
print('='*30)
print(time.time()-start)
三、回调函数
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
from threading import currentThread
def get_page(url):
print('%s:<%s> is getting [%s]' %(currentThread().getName(),os.getpid(),url))
response=requests.get(url)
time.sleep(2)
return {'url':url,'text':response.text}
def parse_page(res): #此处的res是一个p.submit获得的一个future对象,不是结果
res=res.result() #res.result()拿到的才是对应的结果
print('%s:<%s> parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
with open('db.txt','a') as f:
parse_res='url:%s size:%sn' %(res['url'],len(res['text']))
f.write(parse_res)
if __name__ == '__main__':
# p=ProcessPoolExecutor()
p=ThreadPoolExecutor()
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page)
p.submit(get_page, url).add_done_callback(parse_page) #与之前的回调函数拿到的结果不同,这里拿到的是前面submit方法执行完后返回的对象,要.result才能拿到对应的结果
p.shutdown()
print('主',os.getpid())
四、map方法
和内置函数map差不多的用法,这个方法返回一个map(func, *iterables)迭代器,迭代器中的回调执行返回的结果有序的。
以下是通过concurrent.futures模块下类ThreadPoolExecutor和ProcessPoolExecutor实例化的对象的map方法实现进程池、线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
def task(n):
print('%s is running' %os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
# p=ProcessPoolExecutor()
p=ThreadPoolExecutor()
start = time.time()
obj=p.map(task,range(10))
p.shutdown()
print('='*30)
print(list(obj))
print(time.time() - start)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 【一起学系列】之适配器模式:还有外观模式呢
- 【翻译】200行代码讲透RUST FUTURES (5)
- Unable to preventDefault inside passive event listener
- js 停止事件冒泡 阻止浏览器的默认行为(阻止a标签跳转 )
- EmitMapper的使用小结
- js .map方法
- 【一起学系列】之模板方法:写SSO我只要5分钟
- ConcurrentDictionary线程不安全么,你难道没疑惑,你难道弄懂了么?
- 【一起学系列】之迭代器&组合:虽然有点用不上啦
- 移动端touch事件影响click事件以及在touchmove添加preventDefault导致页面无法滚动的解决方法
- 使用ActionFilterAttribute 记录 WebApi Action 请求和返回结果记录
- scipy.stats连续分布的基本操作
- InvocationHandler中invoke方法中的第一个参数proxy的用途
- height、offsetheight、clientheight、scrollheight、innerheight、outerheight
- mysql sql-mode 解析和设置