并发编程知识内容汇总

时间:2019-10-25
本文章向大家介绍并发编程知识内容汇总,主要包括并发编程知识内容汇总使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

并发编程知识总结

关于操作系统的发展史

多道技术(基于单核情况下研究)

空间上的复用:一CPU可以提供给多个用户使用。

时间上的复用:切换 + 保持状态。

若CPU遇到IO操作,会立即将当前执行程序CPU使用权断开。

若一个程序使用CPU的时间长,会立即将当前执行程序CPU使用权断开。

并发

看起来像同时运行,是通过切换 + 保持状态进程切换。运行也是在一个CPU中运行

并行

真正的同时运行,在多核CPU的情况下,同时执行多个程序。

进程

进程的产生必须是通过父进程的调用。

进程的三种状态

就绪态

运行态

阻塞态

进程的调度

现代操作系统调度:时间片轮转法 + 分级反馈队列

时间片轮转法:将时间切分成时间片,然后让程序在规定的时间内使用,如果时间到了,程序还没有执行完,就返回到就绪态排队,如果执行完了,就直接结束此进程。如果遇到阻塞,会直接进入阻塞态,让后边的程序先执行,当程序结束阻塞态的时候,程序会进入就绪态进行排队。

分级反馈队列:将进程分等级,首先执行的程序放在进程的第一级,按重要性一次向后排。

进程

同步异步,阻塞非阻塞

同步:一个程序结束才能执行另外一个程序

异步:两个程序可以同时执行

阻塞:阻塞态

非阻塞:就绪态,运行态

创建进程的两种方式

from multiprocessing import Process
第一种:
def f(name):
    pass

if __name__ == '__main__':
    p = Process(target=f,args=('wang',))
    p.start()

第二种:
class MyProcess(Process):
    def __init__(self,name):
        self.name = name

    def run(self):    # 通过类来定义的时候,必须写run()
        pass
    
if __name__ == '__main__':
    p = MyProcess('wang')    # 调用类实例化成一个对象必须要写在main下,不然会报错
    p.start()

进程的执行顺序

进程的启动顺序:

1.先执行主进程中的任务

2.若主进程中没有需要等待的任务,会执行子进程中的任务,若子进程有多个,会随机执行

3.若主进程中有需要等待执行的任务,时间很短会直接执行,如果等待时间很长,会先执行子进程中的任务,

4.若多个子进程中都有需要等待的任务,会再一个进程进入阻塞区的时候执行另外一个进程。

简单的说:进程的执行就是先主再子,之后的执行是按照执行时间的长短及由先度来进行执行的。

from multiprocessing import Process
from multiprocessing import current_process
import os
import time
def f(name):
    print('子进程开始')
    time.sleep(3)
    print('子进程已结束')

def m(name):
    print('第二个子进程开始')
    time.sleep(3)
    print('第二个子进程已结束')

if __name__ == '__main__':
    p = Process(target=f,args=('wang',))
    p1 = Process(target=m,args=('wang',))
    # p.daemon = True
    p.start()
    p1.start()
    print('主进程开启')
    time.sleep(1)
    print('主进程二号')
    
# 输出结果:
主进程开启
子进程开始
第二个子进程开始
主进程二号
子进程已结束
第二个子进程已结束

join

告诉操作系统,让子进程结束后父进程再结束。一定要写在 p.start() 下面

进程间的数据是相互隔离的

主进程与子进程会产生各自的名称空间,创建一个子进程就会产生一个子进程的名称空间,与主进程不在同一个名称空间内,而且产生不同的子进程也不会在同一个名称空间内,所以数据是没有办法仅从相互调用的。

from multiprocessing import Process
x = 200       # 如果这里的x注释掉,主进程中的x打印会报错
def f(name):
    x = 100
    print(x,1111)

if __name__ == '__main__':
    p = Process(target=f,args=('wang',))
    p.start()
    print(x,2222)     # 如果上面的x=200注释掉,这里的打印会报错
    
# 输出结果:
200 2222
100 1111

进程对象的属性

from multiprocessing import Process
from multiprocessing import current_process
import os
def f(name):
    x = 100
    print(x)

if __name__ == '__main__':
    p = Process(target=f,args=('wang',))
    # 告诉操作系统,开启子进程
    p.start()
    
    # 判断子进程是否还在运行
    print(p.is_alive())
    
    print(1200)
    
    # 打印进程号   作用与os.getpid()一样
    print(current_process().pid)  
    
    # 直接告诉操作系统终止子进程
    print(p.terminate())

回收进程号的两种方式

1.join,可以回收子进程与主进程

2.主进程正常结束,子进程与主进程也会被回收

僵尸进程与孤儿进程

僵尸进程:指的是子进程已经解释,但是pid号还存在,没有被销毁

​ 缺点:占用pid号,占用操作系统资源

孤儿进程:指的是子进程还在运行,但是父进程意外结束

​ 操作系统机制:提供一个福利院,帮你回收没有父进程的子进程

守护进程

指的是主进程结束后,该主进程产生的子进程跟着结束并回收。子进程将不在进行执行。

from multiprocessing import Process
def f(name):
    x = 100
    print(x)

if __name__ == '__main__':
    p = Process(target=f,args=('wang',))
    # 守护进程参数
    p.daemon = True   # True表示此进程为守护进程,必须写在start之前
    p.start()

    print(1200)
  
# 输出结果: 1200

进程互斥锁

让并发变成串行,牺牲了执行效率,保证了数据的安全。

在程序并发执行时,需要修改数据时使用。例如抢票系统。

mutex = Lock()

加锁 : mutex.acquire()

解锁 : mutex.release()

import json
import time
from multiprocessing import Process
from multiprocessing import Lock

# 查看余票
def search(user):
    # 打开data文件查看余票
    with open('data.txt', 'r', encoding='utf-8') as f:
        dic = json.load(f)
    print(f'用户{user}查看余票,还剩{dic.get("ticket_num")}...')

# 开始抢票
def buy(user):
    # 先打开获取车票数据
    with open('data.txt', 'r', encoding='utf-8') as f:
        dic = json.load(f)

    # 模拟网络延时
    time.sleep(1)

    # 若有票,修改data数据
    if dic.get("ticket_num") > 0:
        dic['ticket_num'] -= 1
        with open('data.txt', 'w', encoding='utf-8') as f:
            json.dump(dic, f)
        print(f'用户: {user}抢票成功!')

    else:
        print(f'用户: {user}抢票失败!')


# 开始抢票
def run(user, mutex):
    # 并发: 异步执行
    search(user)

    # 串行: 同步执行
    mutex.acquire()   # 加锁
    buy(user)
    mutex.release()  # 解锁

if __name__ == '__main__':
    # 调用Lock()类得到一个锁对象
    mutex = Lock()

    # 同时来10个用户抢票
    for i in range(10):
        # 并发开启10个子进程
        p = Process(target=run, args=(f'用户{i}', mutex))
        p.start()

队列

遵循先进先出的原则,相当于在内存中产生一个队列空间,可以存放多个数据,但数据的顺序是由先进去的排前面,取得时候也是先取后边的。

from mulitprocessing import Queue
# 调用队列类,实例化队列对象
q = Queue(5)  # 表示队列中只能放五个数据,也可以限制,

# 添加数据
q.put(1)    # 将数据1添加到队列中

# 查看队列中数据是否已经满了
print(q.full())    # 返回为True,表示已经满了。  

# 添加数据,若队列满了,则会报错
q.put_nowait(6)   # 括号内添加数据.
print(666)        # 此时可以将内容打印出啦,如果已经满了,会进行报错

# 判断队列是否为空
print(q.empty())  # 返回True为空。

# 数据取出
print(q.get())

# 写队列需要注意的地方
1.当队列中的数据已经写满了,此时再添加数据进入,会直接停住,
2.如果取出的数据已经取完,再进行取值的时候也会今停挺住。

IPC进程间通信

进程间数据是像胡隔离的,若想实现进程间的通信,可以利用队列。

from multiprocessing import Process
from multiprocessing import Queue


def test1(q):
    data = '数据hello'
    q.put(data)
    print('进程1开始添加数据到队列中..')


def test2(q):
    data = q.get()

    print(f'进程2从队列中获取数据{data}')


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=test1, args=(q, ))
    p2 = Process(target=test2, args=(q, ))

    p1.start()
    p2.start()

    print('主')

生产者与消费者

生产者:生产数据的

消费者:使用数据的

'''
生产者与消费者:
    生产者: 生产数据的
    消费者: 使用数据的

    生活中:
        比如: 卖油条, 一边生产油条, 一边卖油条, 供需不平衡.

    程序中:
        通过队列,生产者把数据添加队列中,消费者从队列中获取数据.
'''
from multiprocessing import Queue, Process
import time


# 生产者
def producer(name, food, q):  # 生产名, 食物, 队列
    for i in range(9):
        data = food, i
        msg = f'用户{name}开始制作{data}'
        print(msg)
        q.put(data)
        time.sleep(0.1)


# 消费者
def consumer(name, q):
    while True:
        data = q.get()
        if not data:
            break
        print(f'用户{name}开始吃{data}')


if __name__ == '__main__':

    q = Queue()

    # 创造生产者
    p1 = Process(target=producer, args=('tank', '油条', q))
    p2 = Process(target=producer, args=('华农兄弟', '竹鼠', q))

    # 生产消费者
    c1 = Process(target=consumer, args=('egon', q))
    c2 = Process(target=consumer, args=('jason', q))

    p1.start()
    p2.start()

    # c1.daemon = True
    # c2.daemon = True

    c1.start()
    c2.start()

    p2.join()
    print('主')

线程

开启一个进程,一定会有一个线程,线程才是真正的执行者。

使用线程的目的:节省内存资源,

开启进程:

1.开辟一个名称空间,每开启一个进程都会占用一份资源

2.每个进程的开启都会自带一个线程

开启线程:

1.一个进程可以开启多个线程。

2.线程的开销远远小于进程

注意:只要开启一个进程就会有一个线程(主线程),主线程会在进程结束的时候一起销毁。

在python中线程不能实现并行,一个进程下多个进程只能实现并发,不能实现并行

线程的两种创建方式

创建线程的函数是:from threading import Thread

线程的使用方法与进程一样,而且调用的方法也是一样的,线程也有子线程。

内存就像一个工厂,子进程就像一个工厂车间,线程就像车间内的流水线 。

from threading import Thread
import time

# 开启线程方式1:
def task():
    print('线程开启')
    time.sleep(1)
    print('线程结束')


# t = Thread()
if __name__ == '__main__':
    # 调用Thread线程类实例化得到线程对象
    t = Thread(target=task)
    t.start()


# 开启线程方式2:
class MyThread(Thread):
    def run(self):
        print('线程开启')
        time.sleep(1)
        print('线程结束')


t = MyThread()  # 线程可以直接写在外边,而进程不可以。
t.start() 
# 或者
if __name__ == '__main__':
    t = MyThread()
    t.start()   

线程的执行顺序

当子进程创建完成的时候,先执行子进程中的任务,遇到等待就执行下一个任务,等待系统的调度

from threading import Thread
import time


def task():
    print(f'线程一开启')
    time.sleep(3)
    print(f'线程一结束')


def task1():
    print(f'线程二开启')
    time.sleep(3)
    print(f'线程二结束')

if __name__ == '__main__':
   
    t = Thread(target=task)
    t1 = Thread(target=task1)

    t.start()
    t1.start()
    print('主')
    
# 输出:
线程一开启
线程二开启
主
线程一结束
线程二结束

线程对象的属性

创建线程的函数是:from threading import Thread
current_thread().name  # 获取当前线程对象的名字
# 返回一个列表,列表中包含当前执行的所有线程对象
print(enumerate())
# 获取当前执行线程的个数
print(activeCount())
is_alive()  # 判断线程是否存活

守护线程

from threading import Thread
import time

def say(name):
    time.sleep(1)
    print(name)

if __name__ == '__main__':
    t = Thread(target=say,args=('wang',))
    t.setDaemon(True)
    t.start()

    print('主进程')
    print(t.is_alive())

线程互斥锁

线程之间的数据是共享的

from threading import Thread, Lock
import time

mutex = Lock()
n = 100

def task(i):
    print(f'线程{i}启动...')
    global n
    mutex.acquire()   # 对线程进行加锁
    temp = n
    time.sleep(0.1)  # 一共等待10秒
    n = temp-1
    print(n) 
    mutex.release()    # 对系统进行解锁

if __name__ == '__main__':
    t_l=[]
    for i in range(100):
        t = Thread(target=task, args=(i, ))
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()

    # 100个线程都是在100-1
    print(n)
# 输出:
输出结果为:100---0,一次打印出来,内容太多不进行列举

GIL全局解释器锁(不清楚)

基于Cpython研究来研究全局解释器,这个GIL并不是python的特性,他是只在Cpython解释器里引入的一个概念。

1.本质上是一个互斥锁

2.为了阻止同一个进程内多个线程同时执行(单个进程下的多个进程无法实现并行,但能实现并发)

3.因为Cpython的内存管理不是 “线程安全” 的

GIL的存在就是为了保证线程的安全

验证多线程的作用

多线程的作用:

在计算密集型的情况下:使用多进程

在IO密集型的情况下:使用多线程

在高效执行多个进程,内有多个IO密集的程序:使用 多线程 + 多进程

死锁现象

死锁:指两个或两个以上的进程或线程在执行过程中,因争夺资源而在成的一种互相等待的现象,即两个线程在争夺资源的时候,a需要的资源在b这里,b需要的资源在a那里,这样a跟b就没有办法拿到自己需要的资源,就会卡住,此时就是死锁。

线程锁调用方式

from threading import Lock
例:
from threading import Lock, Thread, current_thread
import time

mutex_a = Lock()
mutex_b = Lock()

class MyThread(Thread):

    # 线程执行任务
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutex_a.acquire()
        # print(f'用户{current_thread().name}抢到锁a')
        print(f'用户{self.name}抢到锁a')
        mutex_b.acquire()
        print(f'用户{self.name}抢到锁b')
        mutex_b.release()
        print(f'用户{self.name}释放锁b')
        mutex_a.release()
        print(f'用户{self.name}释放锁a')

    def func2(self):
        mutex_b.acquire()
        print(f'用户{self.name}抢到锁b')
        # IO操作
        time.sleep(1)

        mutex_a.acquire()
        print(f'用户{self.name}抢到锁a')
        mutex_a.release()
        print(f'用户{self.name}释放锁a')
        mutex_b.release()
        print(f'用户{self.name}释放锁b')


for line in range(10):
    t = MyThread()
    t.start()

递归锁

使用递归锁可以讲死锁解开。

原理:两个锁指向统一个名称空间,使用递归锁的时候,会发生以下情况,只要有线程在调用这个递归锁,别的线程就没有办法对这个递归锁进行修改,遇到IO调用其他的进程也不能修改,只能等这个进程对递归锁的引用计数为0的时候,别的进程才能引用这个递归锁,但是这个程序可以再引用这个递归锁。

递归锁的使用方法:

# from threading import Lock   # 锁
# from threading import RLock   # 递归锁

from threading import RLock, Thread, Lock
import time

mutex_a = mutex_b = RLock()  # 两个锁id空间一致,指向同一片内存地址。

class MyThread(Thread):

    # 线程执行任务
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutex_a.acquire()
        # print(f'用户{current_thread().name}抢到锁a')
        print(f'用户{self.name}抢到锁a')
        mutex_b.acquire()
        print(f'用户{self.name}抢到锁b')
        mutex_b.release()
        print(f'用户{self.name}释放锁b')
        mutex_a.release()
        print(f'用户{self.name}释放锁a')

    def func2(self):
        mutex_b.acquire()
        print(f'用户{self.name}抢到锁b')
        # IO操作
        time.sleep(1)
        mutex_a.acquire()
        print(f'用户{self.name}抢到锁a')
        mutex_a.release()
        print(f'用户{self.name}释放锁a')
        mutex_b.release()
        print(f'用户{self.name}释放锁b')


for line in range(10):
    t = MyThread()
    t.start()

信号量(了解)

互斥锁:并发——》串行

信号量:可以一次性按照几个数量为单位来对线程进行操作

from threading import Semaphore, Lock
from threading import current_thread
from threading import Thread
import time

sm = Semaphore(5)  # 5个马桶
mutex = Lock()  # 5个马桶


def task():
    # mutex.acquire()
    sm.acquire()
    print(f'{current_thread().name}执行任务')
    time.sleep(1)
    sm.release()
    # mutex.release()


for line in range(20):
    t = Thread(target=task)
    t.start()

线程队列(了解)

FIFO:先进先出

LIFO:后进先出

# 使用方法
import queue

# 普通的线程队列: 先进先出
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())  # 1


# LIFO队列: 后进先出
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())  # 3


# 优先级队列
q = queue.PriorityQueue()  # 超级了解
# 若参数中传的是元组,会以元组中第一个数字参数为准
q.put(('a优', '先', '娃娃头', 4))  # a==97
q.put(('a先', '优', '娃娃头', 3))  # a==98
q.put(('a级', '级', '娃娃头', 2))  # a==99
'''
1.首先根据第一个参数判断ascii表的数值大小
2.判断第个参数中的汉字顺序.
3.再判断第二参数中数字--> 字符串数字 ---> 中文
4.以此类推
'''
print(q.get())

event事件

作用:用来控制线程的执行,

使用方法:

Event是threading中的一个类,调用里边的一些方法对线程进行一些操作。

e = Event()
在某一个线程中出现了e.wait()的时候,此时这个线程就不能执行,e.wait()可以在多个线程中。
接触e.wait()的方法是,在别的线程中使用e.set(),此时其他线程中的e.wait()的线程都可以继续运行。

例:
from threading import Event
from threading import Thread
import time
# 调用Event类实例化一个对象
e = Event()

# 若该方法出现在任务中,则为False,阻塞
# e.wait()  # False

# 若该方法出现在任务中,则将其他线程的Flase改为True,进入就绪态与运行态
# e.set()  # True


def light():
    print('红灯亮...')
    time.sleep(5)
    # 应该开始发送信号,告诉其他线程准备执行
    e.set()  # 将car中的False ---> True
    print('绿灯亮...')


def car(name):
    print('正在等红灯....')
    # 让所有汽车任务进入阻塞态
    e.wait()  # False
    print(f'{name}正在加速漂移....')


# 让一个light线程任务 控制多个car线程任务
t = Thread(target=light)
t.start()

for line in range(10):
    t = Thread(target=car, args=(f'童子军jason{line}号', ))
    t.start()

进程池与线程池

定义:线程池与进程池是用来控制当前程序允许进程/线程的数量。

问题:如果无限制的开启进程或线程,会将服务器卡崩。

作用:线程池与进程池的作用就是保证在硬件允许的范围内创建线程或进程的数量。

使用:

# 知识点一:(进程池与线程池的基本使用)
进程池:
from concurrent.futures import ProcessPoolExecutor

ProcessPoolExecutor(5)  # 5代表只能开启5个进程
ProcessPoolExecutor()   # 默认以CPU的个数限制进程数

线程池:
from concurrent.futures import ThreadPoolExecutor

ThreadPoolExecutor(5)  # 5代表只能开启5个进程
ThreadPoolExecutor()   # 默认以CPU个数 * 5限制线程数

# 知识点二:(利用进程池与线程池做的扩展)
pool.submit('传函数地址') # 异步提交任务
相当于下边的两步
t = Thread()
t.start()

# 会让所有线程池的任务结束后,才往下执行代码。
pool.shutdown()

# 知识点三:(回调函数:直接调用函数的返回值)
pool.submit(task, 1).add_done_callback(call_back)
被传函数的返回值,       将函数的返回值传给括号内的回调函数
注意:回调函数一定要写res.result(),因为不许通过res.result()才嫩刚拿到县城任务返回的结果。

# 例:
def task(res):
    print('线程任务开始了...')
    time.sleep(1)
    print('线程任务结束了...')
    return 123


# 回调函数
def call_back(res):
    print(type(res))
    # 注意: 赋值操作不要与接收的res同名
    res2 = res.result()
    print(res2)


for line in range(5):
    pool.submit(task, 1).add_done_callback(call_back)

print('hello')

协程

进程:资源单位

线程:执行单位

协程:为了在单线程下实现并发, 节约资源。

注意:协程不是操作系统的资源,他是程序起的名字,为了让单线程实现并发。

协程的目的:通过手动模拟操作系统 “多道技术” ,实现 切换 + 保存状态。为了让单个线程不停地切换去执行任务,让你第一个任务遇到IO操作了,会切换到另一个线程中进行操作,这样就可以使用一个线程就可以去完成之前需要几个线程才能完成的事情,但是在单线程计算密集行的情况下使用协程,这样做会让线程在计算任务之间来回切换,效率反而会更低。

协程的优点:在IO密集型的情况下,会提高效率

协程的缺点:在计算密集型的情况下,来回切换,反而效率会更低。

如何实现协程:切换 + 保存状态。

使用第三方模块:gevent

作用:可以帮助监听IO操作,并且切换。

使用gevent的目的:为了实现单线程下,实现遇到IO,实现 切换+保存状态。

from gevent import monkey
monkey.patch_all()  # 可以监听该程序下所有的IO操作
import time
from gevent import spawn, joinall  # 用于做切换 + 保存状态


def func1():
    print('1')
    # IO操作
    time.sleep(1)


def func2():
    print('2')
    time.sleep(3)


def func3():
    print('3')
    time.sleep(5)


start_time = time.time()

s1 = spawn(func1)
s2 = spawn(func2)
s3 = spawn(func3)

s2.join()  # 发送信号,相当于等待自己 (在单线程的情况下)
s1.join()
s3.join()
# 必须传序列类型
joinall([s1, s2, s3])

end_time = time.time()

print(end_time - start_time)

# 输出:
1
2
3
5.011829614639282

TCP端实现协程

# 服务端
from gevent import monkey
monkey.patch_all()
import socket
from gevent import spawn
server = socket.socket()
server.bind((
    '127.0.0.1', 9550
))
server.listen(5)
print('启动服务端。。。')
def working(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0:
                break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except Exception as e:
            print(e)
            break
    conn.close()d
def server2():
    while True:
        conn, addr = server.accept()
        spawn(working, conn)
        # join()
if __name__ == '__main__':
    # 协程,单线程下实现并发
    g1 = spawn(server2)
    g1.join()


# 客户端
import socket
from threading import current_thread, Thread
def client():
    client = socket.socket()

    client.connect(
        ('127.0.0.1', 9550)
    )
    number = 0
    while True:
        data = f'{current_thread().name} {number}'
        client.send(data.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))
        number += 1

# 模拟500个用户并发去访问服务端
for i in range(500):
    t = Thread(target=client)
    t.start()

IO模型(了解)

原文地址:https://www.cnblogs.com/whkzm/p/11740663.html