浅谈 python multiprocessing(多进程)下如何共享变量

时间:2022-04-28
本文章向大家介绍浅谈 python multiprocessing(多进程)下如何共享变量,主要内容包括1、问题:、2、python 多进程共享变量的几种方式:、(2)Server process:、3、多进程的问题远不止这么多:数据的同步、4、最后的建议:、5、Refer:、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

1、问题:

群中有同学贴了如下一段代码,问为何 list 最后打印的是空值?

from multiprocessing import Process, Manager
import os

manager = Manager()
vip_list = []
#vip_list = manager.list()

def testFunc(cc):
    vip_list.append(cc)
    print 'process id:', os.getpid()

if __name__ == '__main__':
    threads = []

    for ll in range(10):
        t = Process(target=testFunc, args=(ll,))
        t.daemon = True
        threads.append(t)

    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print vip_list

其实如果你了解 python 的多线程模型,GIL 问题,然后了解多线程、多进程原理,上述问题不难回答,不过如果你不知道也没关系,跑一下上面的代码你就知道是什么问题了。

python aa.py
process id: 632
process id: 635
process id: 637
process id: 633
process id: 636
process id: 634
process id: 639
process id: 638
process id: 641
process id: 640
------------------------
process id: 619
[]

将第 6 行注释开启,你会看到如下结果:

process id: 32074
process id: 32073
process id: 32072
process id: 32078
process id: 32076
process id: 32071
process id: 32077
process id: 32079
process id: 32075
process id: 32080
------------------------
process id: 32066
[3, 2, 1, 7, 5, 0, 6, 8, 4, 9]

2、python 多进程共享变量的几种方式:

(1)Shared memory:

Data can be stored in a shared memory map using Value or Array. For example, the following code

http://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

结果:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

(2)Server process:

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array. 代码见开头的例子。

http://docs.python.org/2/library/multiprocessing.html#managers

3、多进程的问题远不止这么多:数据的同步

看段简单的代码:一个简单的计数器:

from multiprocessing import Process, Manager
import os

manager = Manager()
sum = manager.Value('tmp', 0)

def testFunc(cc):
    sum.value += cc

if __name__ == '__main__':
    threads = []

    for ll in range(100):
        t = Process(target=testFunc, args=(1,))
        t.daemon = True
        threads.append(t)

    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print sum.value

结果:

------------------------
process id: 17378
97

也许你会问:WTF?其实这个问题在多线程时代就存在了,只是在多进程时代又杯具重演了而已:Lock!

from multiprocessing import Process, Manager, Lock
import os

lock = Lock()
manager = Manager()
sum = manager.Value('tmp', 0)


def testFunc(cc, lock):
    with lock:
        sum.value += cc


if __name__ == '__main__':
    threads = []

    for ll in range(100):
        t = Process(target=testFunc, args=(1, lock))
        t.daemon = True
        threads.append(t)

    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print sum.value

这段代码性能如何呢?跑跑看,或者加大循环次数试一下。。。

再来看个多进程共享变量的例子:该脚本可以在集群中批量执行任意命令并返回结果。

#!/usr/bin/env python
# coding=utf-8
import sys

reload(sys)
sys.setdefaultencoding('utf-8')

import rpyc
from pyUtil import *
from multiprocessing import Pool as ProcessPool
from multiprocessing import Manager

hostDict = {
    '192.168.1.10': 11111
}

manager = Manager()
localResultDict = manager.dict()


def rpc_client(host_port_cmd):
    host = host_port_cmd[0]
    port = host_port_cmd[1]
    cmd = host_port_cmd[2]
    c = rpyc.connect(host, port)
    result = c.root.exposed_execCmd(cmd)
    localResultDict[host] = result
    c.close()


def exec_cmd(cmd_str):
    host_port_list = []
    for (host, port) in hostDict.items():
        host_port_list.append((host, port, cmd_str))

    pool = ProcessPool(len(hostDict))
    results = pool.map(rpc_client, host_port_list)
    pool.close()
    pool.join()
    for ip in localResultDict.keys():
        print ip + ":t" + localResultDict[ip]

if __name__ == "__main__":

    if len(sys.argv) == 2 and sys.argv[1] != "-h":
        print "======================"
        print "    Your command is:t" + sys.argv[1]
        print "======================"
        cmd_str = sys.argv[1]
    else:
        print "Cmd Error!"
        sys.exit(1)

    exec_cmd(cmd_str)

需要注意的是 manager.dict() 在遍历时一定要使用 .keys() 方法,否则会抛异常:

Traceback (most recent call last):
  File "test.py", line 83, in <module>
    exec_cmd(cmd_str)
  File "test.py", line 57, in exec_cmd
    for ip in localResultDict:
  File "<string>", line 2, in __getitem__
  File "/opt/soft/python-2.7.10/lib/python2.7/multiprocessing/managers.py", line 774, in _callmethod
    raise convert_to_error(kind, result)
KeyError: 0

4、最后的建议:

Note that usually sharing data between processes may not be the best choice, because of all the synchronization issues; an approach involving actors exchanging messages is usually seen as a better choice. See also Python documentation: As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes. However, if you really do need to use some shared data then multiprocessing provides a couple of ways of doing so.

5、Refer:

[0] 理解Python并发编程一篇就够了 - 线程篇

http://www.dongwm.com/archives/%E4%BD%BF%E7%94%A8Python%E8%BF%9B%E8%A1%8C%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B-%E7%BA%BF%E7%A8%8B%E7%AF%87/

[1] multiprocessing — Process-based “threading” interface

https://docs.python.org/2/library/multiprocessing.html

[2] Manager()出错,不知道为什么

http://m.newsmth.net/article/Python/100915

[3] Can't iterate over multiprocessing.managers.DictProxy

http://bugs.python.org/issue9733