python学习之路day10(锁)

时间:2019-08-31
本文章向大家介绍python学习之路day10(锁),主要包括python学习之路day10(锁)使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1.#  ####锁
from multiprocessing import Lock,Process
import json,time
#创建一把锁
lock=Lock()
#上锁
lock.acquire()
print(123)
#解锁
lock.release()
#死锁(只上锁不解锁会产生死锁)程序添加了阻塞,代码不能往下执行
'''如果上锁一定要解锁,上锁解锁锁一对'''
lock.acquire()
# lock.release()
#无法打印
# print(456)

#读取票数
def wr_info(sign,dic=None):
if sign == "r":
with open("ticket",mode="r",encoding="utf-8") as fp:
dic=json.load(fp)
return dic
elif sign=="w":
with open("ticket",mode="w",encoding="utf-8") as fp:
json.dump(dic,fp)

# print(wr_info("r"),type(wr_info("r")))
# dic={"coutn":2}
# wr_info("w",dic) #{"count": 2}
#抢票方法
def get_ticket(person):
dic=wr_info("r")
time.sleep(0.1)
if dic["count"]>0:
print("%s抢到票了"%person)
dic["count"]-=1
#更新数据库
wr_info("w",dic)
else:
print("%s没有买到票"%person)

# get_ticket("里斯")

#用ticket 方法进行统一调用
def ticket(person,lock):
#先读取票数
dic=wr_info("r")
time.sleep(0.1)
#查询余票
print("%s 查询余票:%s"%(person,dic["count"]))
lock.acquire()
#开始抢票
get_ticket(person)
lock.release()

if __name__ == '__main__':
lock=Lock()
for i in range(10):
p=Process(target=ticket,args=("person%s"%i,lock))
p.start()
"""
创建进程的时候是异步程序,在上锁的时候是同步程序
"""
2.# ###信号量 semaphore 本质上就是锁,只不过可以控制锁的数量
from multiprocessing import Lock,Process,Semaphore
import os,time
def ktv(person,sem):
sem.acquire()
print("%s进入KTV开始唱歌"%person)
print(os.getpid())
time.sleep(3)
print("%s走出KTV,离开歌房"%person)
sem.release()

if __name__ == '__main__':
sem=Semaphore(1)
for i in range(10):
p=Process(target=ktv,args=("person%s"%i,sem))
p.start()
3.# ###事件
from multiprocessing import Process,Event
import time,random
"""
#阻塞事件
e=Event() 生成事件对象e
e.wait() 动态给程序加阻塞,程序当中是否加阻塞完全取决于该对象中的is_set()
(默认返回:False)

#如果是True 不加阻塞
#如果是False 加阻塞

#控制属性的值
set() 方法 将这个属性的值改成True
clear() 方法 将这个属性的值改成False
is_set() 方法 获取当前属性值是False 还是True
"""

#基本语法
#(1).
"""
e=Event()
print(e.is_set()) #False
# e.wait()
#最多阻塞时间为1s
e.wait(1)
print(123)
"""
#(2).
"""
e=Event()
# 将内部的一个属性改成True
e.set()
e.wait()
print(222)
# 将内部的一个属性改成False
e.clear()
e.wait()
# print(333) #不打印
"""
#(3). 模拟交通灯的效果
def traffic_light(e):
#默认红灯亮
print("红灯亮")
while True:
if e.is_set():
# 当前是绿灯,等待1秒
time.sleep(1)
#等完1s后,变成红灯
print("红灯亮")
e.clear()
else:
#当前是红灯
time.sleep(1)
#等完1s变成绿灯
print("绿灯亮")
e.set()
# e=Event()
# traffic_light(e)

#模拟小车遇到红灯停,绿灯行的操作
def car(e,i):
#e.is_set() 默认返回False 代表的是绿灯
if not e.is_set():
print("car%s在等待"%i)
e.wait()
print("car%s通行了"%i)
"""
if __name__ == '__main__':
e=Event()
# 模拟启动交通灯
p1=Process(target=traffic_light,args=(e,))
p1.daemon=True
p1.start()
#模拟20辆小车
for i in range(20):
time.sleep(random.uniform(0,2))
p2=Process(target=car,args=(e,i))
p2.start()
print("程序彻底结束!")
"""
#优化版: 等小车全部跑完,再让程序彻底结束
if __name__ == '__main__':
lst=[]
e=Event()
# 模拟启动交通灯
p1=Process(target=traffic_light,args=(e,))
#设置红绿灯为守护进程,等小车跑完,也终止红绿灯
p1.daemon=True
p1.start()
#模拟20辆小车
for i in range(20):
#小车创建速度太快,所以加一点延迟效果,生动表现小车的行为
time.sleep(random.uniform(0,2))
p2=Process(target=car,args=(e,i))
p2.start()
lst.append(p2)
#等小车都跑完再终止红绿灯;加一个等待
for i in lst:
i.join()
print("程序彻底结束!")
4.# ###进程队列
from multiprocessing import Process,Queue
#(1) 基本语法
"""先进先出,后进后出"""
q=Queue()
#1.把数据放到q队列中 put
q.put(111)
#2.把数据从队列中取出 get
res=q.get()
print(res) #111
#3.当队列里面的值都拿出来了,已经没有数据的时候,在获取就会出现阻塞
# res=q.get() erro
#4.gey_nowait() 无论有没有都去那数据,如果拿不到,直接报错
# res=q.get_nowait() #erro:queue.Empty

#抑制get_nowait() 报错 用try 方法实现
try:
res=q.get_nowait()
except:
pass

#(2) 可以使用queue,指定队列长度
"""
q=Queue()
q.put(11)
q.put(22)
q.put(33)
print(q.get())
print(q.get())
print(q.get())
# q.put(44) #阻塞,如果队列已经存满了,,再放值,直接阻塞
#无论如何都往队列中存值,如果存满了,直接报错
q.put_nowait(555)
try:
q.put_nowait(444)
except:
pass
"""
#(3) 进程之间的交流
#子进程
def func(q):
#子进程获取数据
res=q.get()
print(res)
# 子进程添加数据
q.put("cccc")

if __name__ == '__main__':
q=Queue()
p=Process(target=func,args=(q,))
p.start()
#1.主进程添加数据
q.put("abc")
p.join()
#主进程获取
res=q.get()
print(res)
print("程序结束!")
5.# ###线程
from threading import Thread
from multiprocessing import Process
import os,time,random

#(1) 一个进程可以有多个线程,这些线程共享同一份资源
"""进程是异步并发程序"""
"""
def func(num):
time.sleep(random.uniform(0.1,1))
print("子线程",num,os.getpid())

if __name__ == '__main__':
for i in range(10):
t=Thread(target=func,args=(i,))
t.start()
"""

#(2) 并发多线程 和 并发多进程 速度对比:多线程快
"""
def func(i):
print("子线程",i,os.getpid())

if __name__ == '__main__':
lst1=[]
#1.计算多线程时间
start_time=time.perf_counter()
for i in range(1000):
t=Thread(target=func,args=(i,))
t.start()
lst1.append(t)
for i in lst1:
i.join()
end_time=time.perf_counter()
#0.16742968700009442 主线程执行结束------
print(end_time-start_time,"主线程执行结束------")

#2.计算多进程时间
lst2=[]
start_time = time.perf_counter()
for i in range(1000):
p = Process(target=func, args=(i,))
p.start()
lst2.append(p)
for i in lst2:
i.join()
end_time = time.perf_counter()
#6.530655490999834 主进程执行结束------
print(end_time - start_time, "主进程执行结束------")
"""

#(3) 多线程之间共享一份进程资源
"""
num=1000000
lst=[]
def func(i):
global num
num-=1
for i in range(100000):
t=Thread(target=func,args=(i,))
t.start()
lst.append(t)
# i.join可以保证每一个线程都执行一遍,然后再去打印num
for i in lst:
i.join()
print(num)
"""

#(4)线程相关的函数
"""
线程 is_alive() 检测线程是否仍然存在
线程 set_Name() 设置线程的名字
线程 get_Name() 获取线程的名字

1.currentThread() .ident 查看线程ID号
2.enumarate() 返回目前正在运行的线程列表
3.attiveCount() 返回目前正在运行的线程数量
"""
"""
def func():
# pass False
time.sleep(3) #True

t=Thread(target=func)
t.start()
#is_alive()
print(t.is_alive())

#setName
t.setName("hello_world")
print(t) #<Thread(hello_world, started 123145314967552)>
print(t.getName()) #hello_world

#1.currentThread() .ident 查看线程ID号
from threading import currentThread
def func():
print("子线程",currentThread().ident)
t=Thread(target=func)
t.start()
print("主线程:",currentThread().ident,os.getpid())

print("==================")
"""
#2.enumarate() 返回目前正在运行的线程列表
"""
from threading import enumerate,currentThread
def func():
print("子线程",currentThread().ident)
time.sleep(3)
for i in range(10):
t=Thread(target=func)
t.start()
#[[<_MainThread(MainThread, started 4413027776)>, <Thread(Thread-1, started 123145474478080)>, <Thread(Thread-2, started 123145479733248)>, <Thread(Thread-3, started 123145484988416)>, <Thread(Thread-4, started 123145490243584)>, <Thread(Thread-5, started 123145495498752)>, <Thread(Thread-6, started 123145500753920)>, <Thread(Thread-7, started 123145506009088)>, <Thread(Thread-8, started 123145511264256)>, <Thread(Thread-9, started 123145516519424)>, <Thread(Thread-10, started 123145521774592)>]]
#10个子线程+1个主线程=11个
print(enumerate())
"""

#3.attiveCount() 返回目前正在运行的线程数量
from threading import enumerate,currentThread,activeCount
def func():
print("子线程",currentThread().ident)
time.sleep(3)
for i in range(10):
t=Thread(target=func)
t.start()
print(activeCount()) #11
6.# ###守护线程:等待所有线程执行结束之后,在自动结束,守护所有线程;
from threading import Thread
import time
def func1():
while True:
time.sleep(2)
print("我是线程1,func1任务")

def func2():
print("我是线程2,start")
time.sleep(3)
print("我是线程2,end")
#启动线程1
"""线程可以选择不加 if __name__ == '__main__': 因为线程共享一份资源,当然加上更完美"""
t1=Thread(target=func1)
#设置守护线程,等所有线程结束后,守护线程结束
t1.setDaemon(True)
t1.start()

t2=Thread(target=func2)
t2.start()
print("主线程结束")
7.# ###线程的数据安全,依赖Lock
"""用上锁的方法来保证数据的安全,代价就是会牺牲一点执行的速度"""
from threading import Thread,Lock
n=0

def func1():
global n
#方法一:上锁
lock.acquire()
for i in range(1000000):
n-=1
#解锁
lock.release()

def func2():
global n
"""with语法 自动实现上锁解锁 ,语法:with lock:"""
# 方法二:上锁解锁
for i in range(1000000):
with lock:
n+=1


if __name__ == '__main__':
lock=Lock()
lst=[]
for i in range(10):
t1=Thread(target=func1)
t2=Thread(target=func2)
t1.start()
t2.start()

lst.append(t1)
lst.append(t2)
for i in lst:
i.join()
print("主线程结束")
print(n)

"""
总结:
当运行range=100000,n=0;
当运行range=10000000,n的值不确定性;
原因:
多个线程,第一个线程和第二个线程执行速度比较快,时间间隔也比较短,虽然都是先拿数据,然后修改数据,最后把数据放回去
这个过程由于太快,可能第二次的值会覆盖第一次的,相当于同一个值放了2次

执行结果:
主线程结束
0
"""
8.# ###死锁,递归锁,互斥锁
from threading import Thread,Lock
import time
lock=Lock()
#死锁现象:只上锁不解锁,是死锁
# lock.acquire()
# lock.acquire()
# print(123)
"""
noodle=Lock()
chopsticke=Lock()

def eat1(name):
noodle.acquire()
print("%s 拿到面条"%name)
chopsticke.acquire()
print("%s 拿到筷子"%name)

print("开始吃面")
time.sleep(1)

noodle.release()
print("%s放下面条"%name)
chopsticke.release()
print("%s放下筷子"%name)
def eat2(name):
chopsticke.acquire()
print("%s 拿到筷子" % name)
noodle.acquire()
print("%s 拿到面条" % name)

print("开始吃面")
time.sleep(1)

chopsticke.release()
print("%s放下筷子" % name)
noodle.release()
print("%s放下面条" % name)

if __name__ == '__main__':
name_list1=["Jake","Rose"]
name_list2=["Jane","Lily"]

for name in name_list1:
t1=Thread(target=eat1,args=(name,))
t1.start()
for name in name_list2:
t2=Thread(target=eat2,args=(name,))
t2.start()
"""
# ###2.递归锁
'''
上几把锁,就对应几个解锁,无论上了几把锁,只要解锁的数据相同,就可以解锁
针对于应急情况下的解锁

递归锁专门用于解决死锁现象
临时用于快速解决服务区奔溃死锁的问题
'''
'''
from threading import Thread,Lock,RLock
rlock=RLock()
def func():
rlock.acquire()
rlock.acquire()
rlock.acquire()

rlock.release()
rlock.release()
rlock.release()

func()
#解决吃面条的问题
# noodle=Lock()
# chopsticke=Lock()
noodle=chopsticke=RLock()

def eat1(name):
noodle.acquire()
print("%s 拿到面条"%name)
chopsticke.acquire()
print("%s 拿到筷子"%name)

print("开始吃面")
time.sleep(1)

noodle.release()
print("%s放下面条"%name)
chopsticke.release()
print("%s放下筷子"%name)
def eat2(name):
chopsticke.acquire()
print("%s 拿到筷子" % name)
noodle.acquire()
print("%s 拿到面条" % name)

print("开始吃面")
time.sleep(1)

chopsticke.release()
print("%s放下筷子" % name)
noodle.release()
print("%s放下面条" % name)

if __name__ == '__main__':
name_list1=["Jake","Rose"]
name_list2=["Jane","Lily"]

for name in name_list1:
t1=Thread(target=eat1,args=(name,))
t1.start()
for name in name_list2:
t2=Thread(target=eat2,args=(name,))
t2.start()
'''

# ###4 互斥锁
"""
从语法上来锁,锁可以互相嵌套,但是不要使用
上一次锁,就应解开一把锁,形成互斥锁
吃面条和拿筷子锁同时的,拿一把锁就够了,不需要分开上锁
"""
mylock=Lock()

def eat1(name):
mylock.acquire()
print("%s 拿到面条"%name)
print("%s 拿到筷子"%name)

print("开始吃面")
time.sleep(1)

print("%s放下面条"%name)
print("%s放下筷子"%name)
mylock.release()
def eat2(name):
mylock.acquire()
print("%s 拿到筷子" % name)
print("%s 拿到面条" % name)

print("开始吃面")
time.sleep(1)

print("%s放下筷子" % name)
print("%s放下面条" % name)
mylock.release()

if __name__ == '__main__':
name_list1=["Jake","Rose"]
name_list2=["Jane","Lily"]

for name in name_list1:
t1=Thread(target=eat1,args=(name,))
t1.start()
for name in name_list2:
t2=Thread(target=eat2,args=(name,))
t2.start()
9.
# ###线程队列
from queue import Queue
"""
put 往线程队列中放值
get 往线程队列中取值
put_nowait 如果放入的值,长度超过了队列的长度,直接报错
get_nowait 如果获取的值,已经没有了,直接报错
"""

#(1)queue 先进先出# ####信号量(线程)
q=Queue()
q.put(11)
q.put(22)
print(q.get()) #11
print(q.get()) #22
# q.get() 发生阻塞
# q.get_nowait() 发生报错

#指定队列长度
q2=Queue(2)
q2.put(1)
q2.put(2)
# q2.put(3) 发生阻塞
# q2.put_nowait(3) #发生报错

# LifoQueue 后进后出(数据结构中,内存栈队列的一种存储结构)
from queue import LifoQueue
lq=LifoQueue()
lq.put(44)
lq.put(55)
print(lq.get()) #55
print(lq.get()) #44

#(3)PriorityQueue
"""
默认按照数字大小排序
如果是字母,会按照ascii编码大小排序:从小打大
先写先排
"""
from queue import PriorityQueue
pq=PriorityQueue()
pq.put((12,"zhangsan"))
pq.put((6,"lisi"))
pq.put((18,"wangwu"))
pq.put((18,"zhaoliu"))

print(pq.get()) #(6, 'lisi')
print(pq.get()) #(12, 'zhangsan')
print(pq.get()) #(18, 'wangwu')
print(pq.get()) #(18, 'zhaoliu')

#单一 的一个元素,只能是同一类型 数字
pq=PriorityQueue()
pq.put(13)
pq.put(18)
pq.put(3)
print(pq.get())
print(pq.get())
print(pq.get())

#单一 的一个元素,只能是同一类型 字符串
pq=PriorityQueue()
pq.put("acc")
pq.put("aa")
pq.put("za")
# pq.put(123) erro
print(pq.get())
print(pq.get())
print(pq.get())
10.# ####信号量(线程)
from threading import Semaphore,Thread
import time,random
def func(i,sem):
#异步并发线程
time.sleep(random.uniform(0.1,1))
with sem:
print(i)
time.sleep(2)
if __name__ == '__main__':
sem=Semaphore(5) #5个一组执行
for i in range(20):
t=Thread(target=func,args=(i,sem))
t.start()
11.# ###新版 进程池 和 线程池

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
"""
#(1) 进程池,可以允许CPU并行
def func(i):
print("Process:",i,os.getpid())
time.sleep(2)
print("Process:end")
return 5488

if __name__ == '__main__':
#cpu_count 获取的是逻辑处理器
print(os.cpu_count())
#1.创建进程池对象
'''最多默认创建cpu_count 这么多个进程执行任务,只创建6个进程来执行所有任务,不会有额外的进程创建出来了'''
p=ProcessPoolExecutor()
#2.异步触发进程
# res=p.submit(func,1)
# print(res) #<Future at 0x105258eb8 state=running> 该对象存放了函数的返回值

for i in range(10):
res=p.submit(func,i)
#3. 获取进程任务的返回值
res2=res.result()
print(res2) #5488
#4.shutdowm 类似 join ,等待所有子进程执行完毕之后,再向下执行 相当于join
p.shutdown() #添加阻塞,无实际意义
print("主进程执行完毕")
"""
#(2) 线程池,as相当于起一个别名
"""
from threading import currentThread as cthread
import os,time
def func(i):
print("thread:",i,cthread().ident)
time.sleep(5)
print("thread %s end"%(i))
#max_workers = (os.cpu_count() or 1) * 5 默认值是CPU逻辑核心数 * 5(本机默认值=4)
'''最多默认创建(os.cpu_count() or 1) * 5 这么多个进程执行任务,只创建6个进程来执行所有任务,不会有额外的进程创建出来了'''
tp=ThreadPoolExecutor()
for i in range(50):
tp.submit(func,i)
tp.shutdown()
print("主线程结束")
"""

#(3) 线程池的返回值
"""
from threading import current_thread as cthread
def func(i):
#打印线程号
print("thread",i,cthread().ident)
time.sleep(1)
return cthread().ident

tp=ThreadPoolExecutor(5)
lst=[]
setvar=set()
for i in range(10):
res=tp.submit(func,i)
#把对象塞到列表里面,如果直接获取值会出现阻塞,就不能并发了所有都放到列表中,统一处理
lst.append(res)
# print(res.result()) #速度慢,因此要插入列表

for i in lst:
#获取该对象的返回值
print(i.result())
setvar.add(i.result())
print(setvar)
print("主线程结束!")
"""

#(4)返回迭代器 线程池版本的高阶map函数 升级的map版本
from threading import current_thread as cthread
def func(i):
time.sleep(2)
print("thread",i,cthread().ident)
print("thread end %s"%i)
return "*" * i
tp=ThreadPoolExecutor(5)
it=map(func,range(20))
tp.shutdown()
print("主线程结束!")
from collections import Iterator,Iterable
res=isinstance(it,Iterator)
print(res) #True
print(list(it))
12.
# ###生产者与消费者模型
"""
#爬虫例子:
1号负责抓取网页中所有的内容
2号负责匹配提取网页中所有的关键字

1号进程就可以看成一个生产者
2号进程就可以看成一个消费者

有时可能生产者毕消费者快,或者慢
所以为了减少生产者跟消费者速度上的差异化,增加了缓冲队列,帮助保持均匀

比较理想的模型,两者之间的速度相对平均
生产者和消费者模型从程序上来讲,就是存数据和取数据之间的过程
"""
from multiprocessing import Queue,Process
import time,random
#消费者模型(负责取值)
def consumer(q,name):
while True:
#拿出数据
food=q.get()
#如果拿取的数据是None,代表已经拿到最后一个数据来,到头来,这个时候将循环结束
if food==None:
break
time.sleep(random.uniform(0.1,1))
print("%s 吃了一个%s。"%(name,food))

#生产者模型(负责存值)
def producer(q,name,food):
for i in range(3):
time.sleep(random.uniform(0.1, 1))
print("%s 生产了%s ,%s"%(name,food,i))
q.put(food+str(i))

if __name__ == '__main__':
q=Queue()
#消费者
c1=Process(target=consumer,args=(q,"sophine"))
c1.start()
#生产者
p1=Process(target=producer,args=(q,"Janme","cake"))
p1.start()
#等待生产者把所有的数据生产完毕,保证队列里面有3个数据
p1.join()
q.put(None)
 
 
 



原文地址:https://www.cnblogs.com/vivian0119/p/11427117.html