python并发——进程间同步和通信(二)

时间:2020-04-09
本文章向大家介绍python并发——进程间同步和通信(二),主要包括python并发——进程间同步和通信(二)使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

转自:https://github.com/chaseSpace/IPC-Inter-Process-Communication

六种进程间通信方式的Python3.6代码实现

  1. msg_queue (消息队列)
  2. pipeline for single duplex (单工管道)
  3. pipeline for half duplex (半双工管道)
  4. name pipeline (命名管道)
  5. share memory (共享内存)
  6. semaphore (信号量)
#消息队列
from
multiprocessing import Process from multiprocessing import Queue from time import sleep def write(q): for i in range(5): # sleep(1) print('put %s to queue..' % i) q.put(i) def read(q): while 1: sleep(0.5) v = q.get(True) print('get %s from queue..' % v) if __name__ == '__main__': q = Queue() p1 = Process(target=write, args=(q,)) p2 = Process(target=read, args=(q,)) p1.start() p2.start() p1.join() # 等待p1进程跑完后再往下执行 while not q.empty():# 队列不为空时阻塞在这 sleep(1) p2.terminate() # 结束p2进程
'''
#标准库提供的管道是普通管道,单工的,读写方向固定
特点:只允许具有亲缘关系的两个进程通信
'''
import os, sys

print("The child will write text to a pipe and ")
print("the parent will read the text written by child...")

# file descriptors r, w for reading and writing
r, w = os.pipe()

processid = os.fork() #fork 方法仅能在linux系统上运行,跨平台就用multiprocess

print(processid)

if processid:
    # This is the paunameent process
    # Closes file descriptor w
    os.close(w)
    r = os.fdopen(r)
    print("Parent reading")
    str = r.read()
    print("text =", str)
    sys.exit(0)
else:
    # This is the child process
    os.close(r)
    w = os.fdopen(w, 'w')
    print("Child writing")
    w.write("Text written by child...")
    w.close()
    print("Child closing")
    sys.exit(0)
'''
使用多进程中的管道,它是半双工的,读写方向不固定
特点:只允许具有亲缘关系的两个进程通信
'''
# 参考 http://www.cnblogs.com/konglinqingfeng/p/9696484.html
from multiprocessing import Pipe, Process

def func(conn1,conn2):
    conn2.close() #子进程只需使用connection1,故关闭connection2
    while True:
        try:
            msg = conn1.recv() #无数据则阻塞
            print(msg)
        except EOFError:  #对端关闭后,就无法继续接收了
            print(1111)
            conn1.close()
            break

if __name__ == '__main__':
    conn1,conn2 = Pipe()#建立一个管道,管道返回两个connection,
    p = Process(target=func, args=(conn1,conn2))
    p.daemon = True  #子进程必须和主进程一同退出,防止僵尸进程
    p.start()

    conn1.close() #主进程只需要一个connection,故关闭一个
    for i in range(20):
        conn2.send('吃了吗') #主进程发送
    conn2.close()   #主进程关闭connection2
'''
使用FIFO(命名管道)来实现任意两个进程间的通信
特点:
    1. 和无名管道一样,半双工
    2. 每个FIFO管道都与一个路径名相关联,类似一个文件
    3. 进程间通过读写FIFO来通信
**注意:这种方式对子进程的生命周期管理并不方便,不建议在python中使用**
'''
import os, time
pipe_name = 'pipe_test'

def child():
    #子进程负责写
    pipeout = os.open(pipe_name, os.O_WRONLY)  # 必须以只写模式
    counter = 0
    while counter < 10: #不要写死循环,控制不好就是僵尸进程
        print('write Number %03d\n' % counter)
        os.write(pipeout, b'Number %03d\n' % counter)   #这里是非阻塞的
        counter +=1
        time.sleep(1)

def parent():
    #父进程负责读
    # pipein = open(pipe_name, 'r') #也可以用内置函数open,和操作文件一样
    fd = os.open(pipe_name, os.O_RDONLY)  # 必须以只读模式,获取一个file descriptor
    #fd是一个非负整数,只在单个进程中有意义,单个进程中一个fd对应一个待操作文件。
    while True:
        # line = pipein.readlines()[:-1]
        line = os.read(fd, 20)  # 每次读20个字节,读出后FIFO管道中会马上清除数据
        #open和os.read都是非阻塞的
        if line:
            print('Parent %d got "%s" at %s' % (os.getpid(), line, time.time()))
        else:
            print('Parent %d no data'% os.getpid())
        time.sleep(1)

if not os.path.exists(pipe_name):
    os.mkfifo(pipe_name)  #创建FIFO管道

pid = os.fork()  #创建子进程
if pid:
    parent()
else:
    child()

'''
这种模式允许多个进程(>=2)间通信
'''
'''
进程间通信之---信号量
特点:不传输数据,用于多进程的同步
参考: https://blog.csdn.net/sinat_27864123/article/details/78490164
'''

import multiprocessing
import time

def consumer(s):
    s.acquire() #信号量的使用与互斥量类似
    print(multiprocessing.current_process().name+' 正在执行')
    time.sleep(2) #执行时会发现同一时刻只有2个进程在执行
    s.release()
    print(multiprocessing.current_process().name+' release')


if __name__ == '__main__':
    s = multiprocessing.Semaphore(5) #把信号量值设置为2,一次提供给两个消费者服务
    for i in range(5): #启5个进程
        p = multiprocessing.Process(target=consumer,args=(s,))
        p.daemon = True #跟随主进程死亡,如果生产环境使用,最好加上这个,否则主进程结束时,子进程还在运行的话,就会造成孤儿进程(内存泄露)
        p.start()

    time.sleep(3) #等一下子进程
    print('main end')
'''
进程间通信之---共享内存
特点:
    1. 最快的IPC方式
    2. 因为多个进程可以同时操作,所以需要进行同步
    3. 可通过与信号量或锁结合使用实现多个进程间同步
共享方案:
    a. 消息队列
    b. 文件映射(为了在进程间共享内存,内核专门留有一个内存区,主进程将需要共享的数据映射到这块内存中,其他进程访问这个共享内存)
'''

from multiprocessing import Process,Queue,Semaphore,current_process
from time import sleep
my_semaphore = Semaphore(2)

'''
错误的示例!
'''
global_list = list(range(5)) #进程间不共享内存,所以每个进程访问的是这个对象的副本,并不是同一个对象

def change_global_list(global_list,s):
    s.acquire()
    print('process_id:%s before change'% current_process().name, global_list)
    global_list.pop()
    sleep(1)
    s.release()
    print('process_id:%s after change'% current_process().name,global_list)


if __name__ == '__main__':
    from multiprocessing import Manager, freeze_support
    manager = Manager()
    print(dir(manager))#'address', 'connect', 'dict', 'get_server', 'join', 'list', 'register', 'shutdown', 'start'
    global_list = manager.list()

    global_list.extend(list(range(5)))

    for i in range(5):
        p = Process(target=change_global_list,args=(global_list,my_semaphore))
        p.daemon = True
        p.start()

    sleep(3)  # 等一下子进程
    print('over!')

原文地址:https://www.cnblogs.com/wangbin2188/p/12669124.html