Python3 与 C# 并发编程之~ 进程实战篇
1.6.进程间状态共享
应该尽量避免进程间状态共享,但需求在那,所以还是得研究,官方推荐了两种方式:
1.共享内存( Value
or Array
)
之前说过 Queue
:在 Process
之间使用没问题,用到 Pool
,就使用 Manager().xxx
, Value
和 Array
,就不太一样了:
看看源码:(Manager里面的Array和Process共享的Array不是一个概念,而且也没有同步机制)
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/managers.pyclass Value(object): def __init__(self, typecode, value, lock=True): self._typecode = typecode self._value = value def get(self): return self._value def set(self, value): self._value = value def __repr__(self): return '%s(%r, %r)' % (type(self).__name__, self._typecode, self._value) value = property(get, set) # 给value设置get和set方法(和value的属性装饰器一样效果)def Array(typecode, sequence, lock=True): return array.array(typecode, sequence)
以 Process
为例看看怎么用:
from multiprocessing import Process, Value, Arraydef proc_test1(value, array): print("子进程1", value.value) array[0] = 10 print("子进程1", array[:])def proc_test2(value, array): print("子进程2", value.value) array[1] = 10 print("子进程2", array[:])def main(): try: value = Value("d", 3.14) # d 类型,相当于C里面的double array = Array("i", range(10)) # i 类型,相当于C里面的int print(type(value)) print(type(array)) p1 = Process(target=proc_test1, args=(value, array)) p2 = Process(target=proc_test2, args=(value, array)) p1.start() p2.start() p1.join() p2.join() print("父进程", value.value) # 获取值 print("父进程", array[:]) # 获取值 except Exception as ex: print(ex) else: print("No Except")if __name__ == '__main__': main()
输出:( Value
和 Array
是 进程|线程
安全的)
<class 'multiprocessing.sharedctypes.Synchronized'><class 'multiprocessing.sharedctypes.SynchronizedArray'>子进程1 3.14子进程1 [10, 1, 2, 3, 4, 5, 6, 7, 8, 9]子进程2 3.14子进程2 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]父进程 3.14父进程 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]No Except
类型方面的对应关系:
typecode_to_type = { 'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong, 'f': ctypes.c_float, 'd': ctypes.c_double}
这两个类型其实是 ctypes
类型,更多的类型可以去 multiprocessing.sharedctypes
查看,来张图:
回头解决 GIL
的时候会用到 C
系列或者 Go
系列的共享库(讲线程的时候会说)
关于进程安全的补充说明:对于原子性操作就不用说,铁定安全,但注意一下 i+=1
并不是原子性操作:
from multiprocessing import Process, Valuedef proc_test1(value): for i in range(1000): value.value += 1def main(): value = Value("i", 0) p_list = [Process(target=proc_test1, args=(value, )) for i in range(5)] # 批量启动 for i in p_list: i.start() # 批量资源回收 for i in p_list: i.join() print(value.value)if __name__ == '__main__': main()
输出:(理论上应该是:5×1000=5000)
2153
稍微改一下才行:(进程安全:只是提供了安全的方法,并不是什么都不用你操心了)
# 通用方法def proc_test1(value): for i in range(1000): if value.acquire(): value.value += 1 value.release()# 官方案例:(Lock可以使用with托管)def proc_test1(value): for i in range(1000): with value.get_lock(): value.value += 1# 更多可以查看:`sharedctypes.SynchronizedBase` 源码
输出:(关于锁这块,后面讲线程的时候会详说,看看就好【语法的确比C#麻烦点】)
5000
看看源码:(之前探讨如何优雅的杀死子进程,其中就有一种方法使用了 Value
)
def Value(typecode_or_type, *args, lock=True, ctx=None): '''返回Value的同步包装器''' obj = RawValue(typecode_or_type, *args) if lock is False: return obj # 默认支持Lock if lock in (True, None): ctx = ctx or get_context() # 获取上下文 lock = ctx.RLock() # 获取递归锁 if not hasattr(lock, 'acquire'): raise AttributeError("%r has no method 'acquire'" % lock) # 一系列处理 return synchronized(obj, lock, ctx=ctx)def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None): '''返回RawArray的同步包装器''' obj = RawArray(typecode_or_type, size_or_initializer) if lock is False: return obj # 默认是支持Lock的 if lock in (True, None): ctx = ctx or get_context() # 获取上下文 lock = ctx.RLock() # 递归锁属性 # 查看是否有acquire属性 if not hasattr(lock, 'acquire'): raise AttributeError("%r has no method 'acquire'" % lock) return synchronized(obj, lock, ctx=ctx)
扩展部分可以查看这篇文章:http://blog.51cto.com/11026142/1874807
2.服务器进程( Manager
)
官方文档:https://docs.python.org/3/library/multiprocessing.html#managers
有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象
通过返回的经理 Manager()
将支持类型 list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue
举个简单例子(后面还会再说):(本质其实就是 多个进程通过代理,共同操作服务端内容
)
from multiprocessing import Pool, Managerdef test1(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse()def test2(d, l): print(d) print(l)def main(): with Manager() as manager: dict_test = manager.dict() list_test = manager.list(range(10)) pool = Pool() pool.apply_async(test1, args=(dict_test, list_test)) pool.apply_async(test2, args=(dict_test, list_test)) pool.close() pool.join()if __name__ == '__main__': main()
输出:
{1: '1', '2': 2, 0.25: None}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存慢(毕竟有了 “中介”
)
同步问题依然需要注意一下,举个例子体会一下:
from multiprocessing import Manager, Process, Lockdef test(dict1, lock): for i in range(100): with lock: # 你可以把这句话注释掉,然后就知道为什么加了 dict1["year"] += 1def main(): with Manager() as m: lock = Lock() dict1 = m.dict({"year": 2000}) p_list = [Process(target=test, args=(dict1, lock)) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(dict1)if __name__ == '__main__': main()
扩展补充:
-
multiprocessing.Lock
是一个进程安全对象,因此您可以将其直接传递给子进程并在所有进程中安全地使用它。 - 大多数可变Python对象(如list,dict,大多数类)不能保证进程中安全,所以它们在进程间共享时需要使用
Manager
- 多进程模式的缺点是创建进程的代价大,在
Unix/Linux
系统下,用fork
调用还行,在Windows
下创建进程开销巨大。
Manager这块官方文档很详细,可以看看:https://docs.python.org/3/library/multiprocessing.html#managers
WinServer
的可以参考这篇 or 这篇埋坑记(Manager一般都是部署在Linux的,Win的客户端不影响)
扩展补充
还记得之前的:无法将multiprocessing.Queue对象传递给Pool方法吗?其实一般都是这两种方式解决的:
- 使用Manager需要生成另一个进程来托管Manager服务器。 并且所有获取/释放锁的调用都必须通过IPC发送到该服务器。
- 使用初始化程序在池创建时传递常规
multiprocessing.Queue()
这将使Queue
实例在所有子进程中全局共享
再看一下Pool的 __init__
方法:
# processes:进程数# initializer,initargs 初始化进行的操作# maxtaskperchild:每个进程执行task的最大数目# contex:上下文对象def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None):
第一种方法不够轻量级,在讲案例前,稍微说下第二种方法:(也算把上面留下的悬念解了)
import osimport timefrom multiprocessing import Pool, Queuedef error_callback(msg): print(msg)def pro_test1(): print("[子进程1]PPID=%d,PID=%d" % (os.getppid(), os.getpid())) q.put("[子进程1]小明,今晚撸串不?") # 设置一个简版的重试机制(三次重试) for i in range(3): if not q.empty(): print(q.get()) break else: time.sleep((i + 1) * 2) # 第一次1s,第二次4s,第三次6sdef pro_test2(): print("[子进程2]PPID=%d,PID=%d" % (os.getppid(), os.getpid())) print(q.get()) time.sleep(4) # 模拟一下网络延迟 q.put("[子进程2]不去,我今天约了妹子")def init(queue): global q q = queuedef main(): print("[父进程]PPID=%d,PID=%d" % (os.getppid(), os.getpid())) queue = Queue() p = Pool(initializer=init, initargs=(queue, )) p.apply_async(pro_test1, error_callback=error_callback) p.apply_async(pro_test2, error_callback=error_callback) p.close() p.join()if __name__ == '__main__': main()
输出:(就是在初始化Pool的时候,传了初始化执行的方法并传了参数: alizer=init,initargs=(queue,))
)
[父进程]PPID=13157,PID=24864[子进程1]PPID=24864,PID=24865[子进程2]PPID=24864,PID=24866[子进程1]小明,今晚撸串不?[子进程2]不去,我今天约了妹子real 0m6.105suser 0m0.071ssys 0m0.042s
Win下亦通用(win下没有 os.getgid
)
1.7.分布式进程的案例
有了 1.6
的基础,咱们来个例子练练:
BaseManager
的缩略图:
服务器端代码:
from multiprocessing import Queuefrom multiprocessing.managers import BaseManagerdef main(): # 用来身份验证的 key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92" get_zhang_queue = Queue() # 小张消息队列 get_ming_queue = Queue() # 小明消息队列 # 把Queue注册到网络上, callable参数关联了Queue对象 BaseManager.register("get_zhang_queue", callable=lambda: get_zhang_queue) BaseManager.register("get_ming_queue", callable=lambda: get_ming_queue) # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥 manager = BaseManager(address=("192.168.36.235", 5438), authkey=key) # 运行serve manager.get_server().serve_forever()if __name__ == '__main__': main()
客户端代码1:
from multiprocessing.managers import BaseManagerdef main(): """客户端1""" key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92" # 注册对应方法的名字(从网络上获取Queue) BaseManager.register("get_ming_queue") BaseManager.register("get_zhang_queue") # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥 m = BaseManager(address=("192.168.36.235", 5438), authkey=key) # 连接到服务器 m.connect() q1 = m.get_zhang_queue() # 在自己队列里面留言 q1.put("[小张]小明,老大明天是不是去外地办事啊?") q2 = m.get_ming_queue() # 获取小明说的话 print(q2.get())if __name__ == '__main__': main()
客户端代码2:
from multiprocessing.managers import BaseManagerdef main(): """客户端2""" key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92" # 注册对应方法的名字(从网络上获取Queue) BaseManager.register("get_ming_queue") BaseManager.register("get_zhang_queue") # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥 m = BaseManager(address=("192.168.36.235", 5438), authkey=key) # 连接到服务器 m.connect() q1 = m.get_zhang_queue() # 获取小张说的话 print(q1.get()) q2 = m.get_ming_queue() # 在自己队列里面留言 q2.put("[小明]这几天咱们终于可以不加班了(>_<)")if __name__ == '__main__': main()
输出图示:
服务器运行在Linux的测试:
其实还有一部分内容没说,明天得出去办点事,先到这吧,后面找机会继续带一下
参考文章:
进程共享的探讨:python-sharing-a-lock-between-processes
多进程锁的探讨:trouble-using-a-lock-with-multiprocessing-pool-pickling-error
JoinableQueue扩展:https://www.cnblogs.com/smallmars/p/7093603.html
Python多进程编程:https://www.cnblogs.com/kaituorensheng/p/4445418.html
有深度但需要辩证看的两篇文章:
跨进程对象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue
关于Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process
- 如何实现两台服务器间无密码的传输数据和操作
- 一步到位Linux中安装配置MySQL及补坑
- 我是如何处理大并发量订单处理的 KafKa部署总结
- 一步到位分布式开发Zookeeper实现集群管理
- 备胎的养成记KeepAlived实现热备负载
- 0基础搭建Hadoop大数据处理-初识
- 入坑系列之HAProxy负载均衡
- 如何开发自己的搜索帝国之Elasticsearch
- NET中解决KafKa多线程发送多主题的问题
- mysql数据与Hadoop之间导入导出之Sqoop实例
- 如何将mysql数据导入Hadoop之Sqoop安装
- 常见的几种Flume日志收集场景实战
- 教你一步搭建Flume分布式日志系统
- 几十条业务线日志系统如何收集处理?
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 万字图解Java多线程
- 金九银十要来了?不要慌,这些Android BAT高级面试题刷一刷
- 【SpringBoot DB 系列】Jooq 初体验
- Android轻量级APM性能监测方案
- 保持 Go 模块兼容
- Go 模块:v2 及更高版本
- 发布 Go Modules
- SRA toolkit下载数据
- 【测试开发-1】基于Springboot+layui实现接口自动化平台
- 【SpringBoot-2】SLF4J+logback进行日志记录
- 【JMeter-3】JMeter参数化4种实现方式
- 【JMeter-1】JMeter安装与接口测试入门
- 【JMeter-2】JMeter接口测试之断言实现
- 【UI自动化-1】UI自动化环境搭建与简单示例
- 【UI自动化-2】UI自动化元素定位专题