基于进程的并发

时间:2019-09-23
本文章向大家介绍基于进程的并发,主要包括基于进程的并发使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
基于进程的并发

一、什么是进程

进程指的是应用的执行实例,比如,双击桌面上的Internet浏览器图标就会开启一个运行该浏览器的进程。

二、进程与程序的区别

程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。

进程一般由程序、数据集、进程控制块三部分组成。

举个例子:一位厨师正在做蛋糕,他有做蛋糕的食谱和所需的原料,做到一半的时候这位厨师的儿子玩耍受伤了,于是这位厨师放下手头工作并记录他按照食谱做到哪儿了(保存进程的当前状态),然后拿出一本急救手册,按照其中的指示处理伤口。当伤口处理完之后,这位厨师又回来做蛋糕,从他离开时的那一步继续做下去。

在这个例子中:做蛋糕的食谱相当于程序,厨师就是处理器cpu,而做蛋糕的所需原料就是数据集,进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和。

三、并发与并行

无论是并行还是并发,在用户看来都是同时运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务。

并发是指系统具有处理多个任务(动作)的能力,是伪并行,即看起来是用时运行。单个cpu+多道技术就可以实现并发,其实并行也属于并发。

并行是指系统具有同时处理多个任务(动作)的能力,是同时运行,只有具备多个cpu才能实现并行。

单核下,可以利用多道技术。多个核中的每个核也都可以利用多道技术。

有四个核,六个任务,同一时间被执行只有四个被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4, 一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术,而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算),可能被分配给四个cpu中的任意一个去执行。

多道技术概念:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另外一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并发,以此来区分多处理器操作系统的真正硬件并行。

四、同步与异步

同步就是在发出一个功能调用时,在没有得到结果之前,该调用就不会返回。按照这个定义,其实绝大多数函数都是同步调用。

异步就是当一个异步功能调用发出后,调用者不能立刻得到结果。当该异步功能完成后,通过状态、通知或回调来通知调用者。如果异步功能用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低(有些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这其实是一 种很严重的错误)。如果是使用通知的方式,效率则很高,因为异步功能几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。

简单的来讲,同步就是当进程执行到一个IO(等待外部数据)的时候,如果等,那么就是同步。如果不等,就是异步。

同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步情况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候通过状态、通知、事件等方式通知进程任务完成。

五、阻塞和非阻塞

阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。

非阻塞指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。

阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程。

六、进程的状态

七、创建进程

创建进程需要用到multiprocessing模块下Process类。该类有四个参数,下面我们来进行参数介绍:

#第一个参数 group:线程组
#第二个参数 target:要执行的方法
#第三个参数 name:进程名
#第四个参数 args/kwargs:要传人方法的参数

该类的实例方法介绍:

#is_alive():返回进程是否在运行
#join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或到达指定的timeout
#start():进程准备就绪,等待cpu调度
#run():start()调用run方法,如果实例进程时未制定传入target,这start执行默认run()方法
#terminate():不管任务是否完成,立即停止工程进程

该类的属性介绍:

#daemon:和线程的setDaemon功能一样
#name:进程名字
#pid:进程号

我们要注意一点:在Windows中process()必须放在if __name__ == '__main__':下。

创建多进程有两种方式,分别是:

from multiprocessing import Process
import time

def f(name):
    time.sleep(1)
    print('hello',name,time.ctime())

if __name__ == '__main__':
    p_list = []
    for i in range(3):
        p = Process(target=f,args=('alvin',))   #必须加,号
        p_list.append(p)
        p.start()
    for i in p_list:
        i.join()
    print('end...')
方式一
from multiprocessing import Process
import time

class MyProcess(Process):

    def run(self):
        time.sleep(1)
        print('hello',self.name,time.ctime())

if __name__ == '__main__':
    p_list = []
    for i in range(3):
        p = MyProcess()
        p_list.append(p)
        p.start()
    for i in p_list:
        i.join()
    print('end...')
方式二

我们可以利用多进程把socket通信变成并发的形式:

from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': #windows下start进程一定要写到这下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()
服务端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
客户端

这里可以开多个客户端,每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。所以我们必须解决这个问题,解决方法:进程池。

下面我们来讲一下Process类的实例join方法。首先我们来看一段代码:

from multiprocessing import Process
import time

def f(name):
    time.sleep(1)
    print('hello',name,time.ctime())

if __name__ == '__main__':
    p_list = []
    for i in range(3):
        p = Process(target=f,args=('alvin',))
        p_list.append(p)
        p.start()
    # for i in p_list:
    #     i.join()
    print('end...')

可以看出上面代码有两行是注释的,我们要讲的知识就在这里。我们先来看看注释与不注释的运行结果:

#注释
end...
hello alvin Wed Sep 18 16:31:15 2019
hello alvin Wed Sep 18 16:31:15 2019
hello alvin Wed Sep 18 16:31:15 2019
#不注释
hello alvin Wed Sep 18 16:36:22 2019
hello alvin Wed Sep 18 16:36:22 2019
hello alvin Wed Sep 18 16:36:22 2019
end...

从运行结果可以看出一个是最先执行print('end...'),一个最后执行print('end...')。很明显join方法是让主线程等待p的结束,卡住的是主线程而绝非进程p。

八、守护进程

守护进程会在主进程代码执行结束后就终止,守护进程内无法再开启子进程,否则抛出异常。

下面我们创建守护进程:

from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(5)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':

    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True  #守护进程必须要在start之前设定
    p1.start()
    p2.start()
    print("main-------")
守护进程

打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止。

九、进程锁

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

from multiprocessing import Process,Lock

def f(l,i):
    l.acquire()
    print('hello world %s'%i)
    l.release()

if __name__ == '__main__':
    lock = Lock()  #加锁
    for num in range(10):
        Process(target=f,args=(lock,num)).start()

 加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

十、队列

进程间通信方式一:队列(推荐),创建队列语句Queue(maxsize),其中参数maxsize是队列中运行最大项数,省略则无大小限制。

下面我们来介绍队列的一些主要方法:

q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
 
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
主要方法

简单的实现队列的代码:

from multiprocessing import Process,Queue
q=Queue(3)

q.put(3)
q.put(3)
q.put(3)
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了

下面我们来讲生产者消费者模型。那么什么是生产者消费者模型?

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

基于队列实现生产者消费者模型:

from multiprocessing import Process,Queue
import time,random,os

def producer(q):   #生产者
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))

def consumer(q):   #消费者
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()

    p1=Process(target=producer,args=(q,))

    c1=Process(target=consumer,args=(q,))

    p1.start()
    c1.start()
    print('主进程')
生产者消费者模型

程序中有两类角色,一类负责生产数据即生产者,一类负责处理数据即消费者。

上述代码存在一个问题,就是生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。下面我们来实现解决方法:

from multiprocessing import Process,Queue
import time,random,os

def producer(q):     #生产者
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
    q.put(None) #发送结束信号

def consumer(q):    #消费者
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    p1=Process(target=producer,args=(q,))

    c1=Process(target=consumer,args=(q,))

    p1.start()
    c1.start()
    print('')
解决问题

我们这里利用让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

十一、管道

进程间通信方式二:管道(不推荐),创建管道语句Pipe(duplex),在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道,参数duplex默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

下面我们来介绍管道的一些主要方法:

conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

基于管道实现生产者消费者模型:

from multiprocessing import Process,Pipe
import time,os

def producer(seq,p):    #生产者
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
    else:
        left.close()

def consumer(p,name):   #消费者
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break

if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()

    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主进程')
管道

十二、进程池

使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

创建进程池的语句为Pool(numprocess,initializer,initargs)。其中参数numprocess表示要创建的进程数,如果省略,将默认使用cpu_count()的值。参数initializer表示是每个工作进程启动时要执行的可调用对象,默认为None。参数initargs表示是要传给initializer的参数组。

下面我们来介绍进程池的主要方法:

p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
   
p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
主要方法

我们来应用进程池实现:

from multiprocessing import Process,Pool
import time,os

def Foo(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':

    pool = Pool(5)
    for i in range(100):
        pool.apply(func=Foo,args=(i,))   #同步调用

    pool.close()
    pool.join()
    print('end')
同步调用apply
from multiprocessing import Process,Pool
import time,os

def Foo(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':

    pool = Pool(5)
    for i in range(100):
        pool.apply_async(func=Foo,args=(i,))

    pool.close()
    pool.join()
    print('end')
异步调用apply_async

回调函数:就是某个动作或者函数执行成功后再去执行的函数。

下面来简单实现回调函数:

from multiprocessing import Process,Pool
import time,os

def Foo(i):
    time.sleep(1)
    print(i)
    return i+100

def Bar(arg):
    print('logger:',arg)

if __name__ == '__main__':

    pool = Pool(5)

    for i in range(100):
        pool.apply_async(func=Foo,args=(i,),callback=Bar)

    pool.close()  #必须放在join前
    pool.join()   
    print('end')
回调函数callback

原文地址:https://www.cnblogs.com/lzc69/p/11543977.html