网络编程并发编程总结

时间:2019-10-26
本文章向大家介绍网络编程并发编程总结,主要包括网络编程并发编程总结使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一. 网络编程

1.C/S架构

网络编程就是要通过网络来访问另一台计算机的数据,这样必然需要至少两台计算机,一台计算机存放用于分享的数据和用于分享数据的程序,另一台计算机上运行访问数据的程序。提供数据的一方称为服务器(Server),访问数据的一方称为客户端(Client).

指的是客户端/服务器架构。(B/S架构也是C/S架构的一种,只不过客户端是浏览器)

2.网络通讯的基本要素

1.物理连接介质

包括网线,无线电,光纤等。

2.通信协议

什么是协议?协议指的是标准,大家要遵循相同的标准才能正常交流通讯。

3.网络通信协议

OSI七层模型,开放式系统互联参考模型.把整个通信过程分为七层.

3.1 物理层

计算机之间必须完成组网

物理层功能:主要是基于电器特性发送高低电压(电信号),高电压对应数字1,低电压对应数字0。

3.2 数据链路层

数据链路层由来:单纯的电信号0和1没有任何意义,必须规定电信号多少位一组,每组什么意思。

数据链路层的功能:定义了电信号的分组方式。

以太网协议

定义了统一的分组方式,及以太网协议ethernet:

ethernet 规定:

  • 一组电信号构成一个数据包,叫帧
  • 每一数据帧分成:报头head和数据data两部分

head包含:(固定18个字节)

  • 发送者/源地址,6个字节
  • 接收者/目标地址,6个字节
  • 数据类型,6个字节

data包含:(最短46字节,最长1500字节)

  • 数据包的具体内容

head长度+data长度=最短64字节,最长1518字节,超过最大限制就分片发送

Mac地址

head中包含的源和目标地址由来:ethernet规定接入internet的设备都必须具备网卡,发送端和接收端的地址便是指网卡的地址,即mac地址

mac地址:每块网卡出厂时都被烧制上一个世界唯一的mac地址,长度为48位2进制,通常由12位16进制数表示(前六位是厂商编号,后六位是流水线号)

广播

有了mac地址,同一网络内的两台主机就可以通信了(一台主机通过arp协议获取另外一台主机的mac地址)

ethernet采用最原始的方式,广播的方式进行通信,即计算机通信基本靠吼

3.3 网络层

网络层由来:有了ethernet、mac地址、广播的发送方式,世界上的计算机就可以彼此通信了,问题是世界范围的互联网是由 一个个彼此隔离的小的局域网组成的,那么如果所有的通信都采用以太网的广播方式,那么一台机器发送的包全世界都会收到, 引发广播风暴

上图结论:必须找出一种方法来区分哪些计算机属于同一广播域,哪些不是,如果是就采用广播的方式发送,如果不是,就采用路由的方式(向不同广播域/子网分发数据包),mac地址是无法区分的,它只跟厂商有关

网络层功能:引入一套新的地址来区分不同的广播域/子网,这套地址即网络地址.

IP协议

  • 规定网络地址的协议叫ip协议,它定义的地址称之为ip地址,广泛采用的v4版本即ipv4,它规定网络地址由32位2进制表示
  • 范围0.0.0.0-255.255.255.255
  • 一个ip地址通常写成四段十进制数,例:172.16.10.1

ip地址分成两部分

  • 网络部分:标识子网
  • 主机部分:标识主机

注意:单纯的ip地址段只是标识了ip地址的种类,从网络部分或主机部分都无法辨识一个ip所处的子网

例:172.16.10.1与172.16.10.2并不能确定二者处于同一子网

子网掩码

所谓”子网掩码”,就是表示子网络特征的一个参数。它在形式上等同于IP地址,也是一个32位二进制数字,它的网络部分全部为1,主机部分全部为0。比如,IP地址172.16.10.1,如果已知网络部分是前24位,主机部分是后8位,那么子网络掩码就是11111111.11111111.11111111.00000000,写成十进制就是255.255.255.0。

知道”子网掩码”,我们就能判断,任意两个IP地址是否处在同一个子网络。方法是将两个IP地址与子网掩码分别进行AND运算(两个数位都为1,运算结果为1,否则为0),然后比较结果是否相同,如果是的话,就表明它们在同一个子网络中,否则就不是。

比如,已知IP地址172.16.10.1和172.16.10.2的子网掩码都是255.255.255.0,请问它们是否在同一个子网络?两者与子网掩码分别进行AND运算,

172.16.10.1:10101100.00010000.00001010.000000001

255255.255.255.0:11111111.11111111.11111111.00000000

AND运算得网络地址结果:10101100.00010000.00001010.000000001->172.16.10.0

172.16.10.2:10101100.00010000.00001010.000000010

255255.255.255.0:11111111.11111111.11111111.00000000

AND运算得网络地址结果:10101100.00010000.00001010.000000001->172.16.10.0

结果都是172.16.10.0,因此它们在同一个子网络。

总结一下,IP协议的作用主要有两个,一个是为每一台计算机分配IP地址,另一个是确定哪些地址在同一个子网络。

ip数据包

ip数据包也分为head和data部分,无须为ip包定义单独的栏位,直接放入以太网包的data部分

head:长度为20到60字节

data:最长为65,515字节。

而以太网数据包的”数据”部分,最长只有1500字节。因此,如果IP数据包超过了1500字节,它就需要分割成几个以太网数据包,分开发送了。

以太网头 ip 头 ip数据

ARP协议

arp协议由来:计算机通信基本靠吼,即广播的方式,所有上层的包到最后都要封装上以太网头,然后通过以太网协议发送,在谈及以太网协议时候,我们了解到

通信是基于mac的广播方式实现,计算机在发包时,获取自身的mac是容易的,如何获取目标主机的mac,就需要通过arp协议

arp协议功能:广播的方式发送数据包,获取目标主机的mac地址

协议工作方式

每台主机ip都是已知的

例如:主机172.16.10.10/24访问172.16.10.11/24

a:首先通过ip地址和子网掩码区分出自己所处的子网

场景 数据包地址
同一子网 目标主机mac,目标主机ip
不同子网 网关mac,目标主机ip

b:分析172.16.10.10/24与172.16.10.11/24处于同一网络(如果不是同一网络,那么下表中目标ip为172.16.10.1,通过arp获取的是网关的mac)

源mac 目标mac 源ip 目标ip 数据部分
发送端主机 发送端mac FF:FF:FF:FF:FF:FF 172.16.10.10/24 172.16.10.11/24 数据

c:这个包会以广播的方式在发送端所处的自网内传输,所有主机接收后拆开包,发现目标ip为自己的,就响应,返回自己的mac。

3.4 传输层

传输层的由来:网络层的ip帮我们区分子网,以太网层的mac帮我们找到主机,然后大家使用的都是应用程序,你的电脑上可能同时开启qq,暴风影音,等多个应用程序,

那么我们通过ip和mac找到了一台特定的主机,如何标识这台主机上的应用程序,答案就是端口,端口即应用程序与网卡关联的编号。

传输层功能:建立端口到端口的通信

补充:端口范围0-65535,0-1023为系统占用端口

开发中常用软件的默认端口号:
mysql: 3306
mongodb: 27017
Django: 8000
Tomcat: 8080
Flask: 5000
Redis: 6379

tcp协议

可靠传输,TCP数据包没有长度限制,理论上可以无限长,但是为了保证网络的效率,通常TCP数据包的长度不会超过IP数据包的长度,以确保单个TCP数据包不必再分割。

以太网头 ip 头 tcp头 数据
1.tcp报文
2.三次握手建连接
-建立双向通道,建立好连接。
    - listen: 监听
    - established: 确认请求建立连接
- 发送数据:
    write
    read
    客户端往服务端发送数据,数据存放在内存中,需要服务端确认收到,数据才会在内存中释放掉。

    否则,会隔一段时间发送一次,让服务端返回确认收到。
    在一段时间内,若服务端还是不返回确认收到,则取消发送。并释放内存中的数据。
  • 第一次握手:客户端给服务器发送一个 SYN 报文。
  • 第二次握手:服务器收到 SYN 报文之后,会应答一个 SYN+ACK 报文。
  • 第三次握手:客户端收到 SYN+ACK 报文之后,会回应一个 ACK 报文。
  • 服务器收到 ACK 报文之后,三次握手建立完成。

为什么只有三次握手才能确认双方接收发送正常

第一次握手:客户端发送网络包,服务端收到了。
    这样服务端就能得出结论:客户端的发送能力、服务端的接收能力   是正常的。
第二次握手:服务端发包,客户端收到了。
    这样客户端就能得出结论:服务端的接收、发送能力,客户端的接   收、发送能力是正常的。                                 不过此时服务器并不能确认客户端的接收能力是否正常。
第三次握手:客户端发包,服务端收到了。
    这样服务端就能得出结论:客户端的接收、发送能力正常,服务器   自己的发送、接收能力也正常。
因此,需要三次握手才能确认双方的接收与发送能力是否正常。
三次握手的作用
  • 确认双方的接受能力、发送能力是否正常。
  • 指定自己的初始化序列号,为后面的可靠传送做准备。
  • 如果是 HTTPS 协议的话,三次握手这个过程,还会进行数字证书的验证以及加密密钥的生成。
3.四次挥手断连接

刚开始双方都处于 establised 状态,假如是客户端先发起关闭请求,则:

- 第一次挥手:客户端发送一个 FIN 报文,报文中会指定一个序列号。此时客户端处于 FIN_WAIT1 状态。
- 第二次握手:服务端收到 FIN 之后,会发送 ACK 报文,且把客户端的序列号值 +1 作为 ACK 报文的序列号值,表明已经收到客户端的报文了,此时服务端处于 CLOSE_WAIT 状态。
- 第三次挥手:如果服务端也想断开连接了,和客户端的第一次挥手一样,发给 FIN 报文,且指定一个序列号。此时服务端处于 LAST_ACK 的状态。
- 第四次挥手:客户端收到 FIN 之后,一样发送一个 ACK 报文作为应答,且把服务端的序列号值 +1 作为自己 ACK 报文的序列号值,此时客户端处于 TIME_WAIT 状态。

需要过一阵子以确保服务端收到自己的 ACK 报文之后才会进入 CLOSED 状态
- 服务端收到 ACK 报文之后,就处于关闭连接了,处于 CLOSED 状态。
time_wait的作用

为什么客户端发送 ACK 之后不直接关闭,而是要等一阵子才关闭。

要确保服务器是否已经收到了我们的 ACK 报文,如果没有收到的话,服务器会重新发 FIN 报文给客户端,客户端再次收到 ACK 报文之后,就知道之前的 ACK 报文丢失了,然后再次发送 ACK 报文。
至于 TIME_WAIT 持续的时间至少是一个报文的来回时间。一般会设置一个计时,如果过了这个计时没有再次收到 FIN 报文,则代表对方成功,就是 ACK 报文,此时处于 CLOSED 状态。

udp协议

不可靠传输,”报头”部分一共只有8个字节,总长度不超过65,535字节,正好放进一个IP数据包。

以太网头 ip头 udp头 数据

3.5 应用层

应用层由来:用户使用的都是应用程序,均工作于应用层,互联网是开发的,大家都可以开发自己的应用程序,数据多种多样,必须规定好数据的组织形式

应用层功能:规定应用程序的数据格式。

 例:TCP协议可以为各种各样的程序传递数据,比如Email、WWW、FTP等等。那么,必须有不同协议规定电子邮件、网页、FTP数据的格式,这些应用程序协议就构成了”应用层”。 

4. Scoket

Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。

在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议。

所以,我们无需深入理解tcp/udp协议,socket已经为我们封装好了,我们只需要遵循socket的规定去编程,写出的程序自然就是遵循tcp/udp标准的。

我们知道两个进程如果需要进行通讯最基本的一个前提能能够唯一的标示一个进程,在本地进程通讯中我们可以使用PID来唯一标示一个进程,但PID只在本地唯一,网络中的两个进程PID冲突几率很大,这时候我们需要另辟它径了,我们知道IP层的ip地址可以唯一标示主机,而TCP层协议和端口号可以唯一标示主机的一个进程,这样我们可以利用ip地址+协议+端口号唯一标示网络中的一个进程。

能够唯一标示网络中的进程后,它们就可以利用socket进行通信了,什么是socket呢?我们经常把socket翻译为套接字,socket是在应用层和传输层之间的一个抽象层,它把TCP/IP层复杂的操作抽象为几个简单的接口供应用层调用已实现进程在网络中通信。

socket起源于UNIX,在Unix一切皆文件哲学的思想下,socket是一种"打开—读/写—关闭"模式的实现,服务器和客户端各自维护一个"文件",在建立连接打开后,可以向自己文件写入内容供对方读取或者读取对方内容,通讯结束时关闭文件。

服务端套接字函数
s.bind()    绑定(主机,端口号)到套接字
s.listen()  开始TCP监听
s.accept()  被动接受TCP客户的连接,(阻塞式)等待连接的到来

客户端套接字函数
s.connect()     主动初始化TCP服务器连接
s.connect_ex()  connect()函数的扩展版本,出错时返回出错码,而不是抛出异常

公共用途的套接字函数
s.recv()            接收TCP数据
s.send()            发送TCP数据(send在待发送数据量大于己端缓存区剩余空间时,数据丢失,不会发完)
s.sendall()         发送完整的TCP数据(本质就是循环调用send,sendall在待发送数据量大于己端缓存区剩余空间时,数据不丢失,循环调用send直到发完)
s.recvfrom()        接收UDP数据
s.sendto()          发送UDP数据
s.getpeername()     连接到当前套接字的远端的地址
s.getsockname()     当前套接字的地址
s.getsockopt()      返回指定套接字的参数
s.setsockopt()      设置指定套接字的参数
s.close()           关闭套接字

面向锁的套接字方法
s.setblocking()     设置套接字的阻塞与非阻塞模式
s.settimeout()      设置阻塞套接字操作的超时时间
s.gettimeout()      得到阻塞套接字操作的超时时间

面向文件的套接字的函数
s.fileno()          套接字的文件描述符
s.makefile()        创建一个与该套接字相关的文件

4.1 TCP协议代码实现

server.py
import socket
soc = socket.socket()
soc.bind(('192.168.12.64', 50000))  # 指定ip和端口号,其中这两个元素是一个元组
soc.listen()  # 监听接口
client, address = soc.accept()  # 接收客户端的连接请求
data = client.recv(1024)  # 接收数据,都是bytes类型
print(data.decode('utf-8'))  # 打印接收到的数据信息
client.send('hello'.encode('utf-8'))  # 发送数据
soc.close()  # 关闭socket,回收资源

# 在服务器中接收和发送数据都是由客户端的socket来完成,服务器的socket只是用来处理连接

client.py
import socket
client = socket.socket()  # 创建socket对象
client.connect(('192.168.12.64', 50000))  # 连接服务器,就是在做三次握手
client.send('1234456'.encode('utf-8'))  # 发送数据
data = client.recv(1024)  # 接收数据
print(data.decode('utf-8'))  # 打印数据
client.close()  # 关闭连接

1.简单使用

'''server服务端'''
import  socket
# 1.获得socket对象server,默认指定TCP协议
server = socket.socket()

# 2.传入服务端的(IP + 端口) 
server.bind(
    ('127.0.0.1',8848)   #(用户回环地址,自定义端口值)
)

# 3.开始监听
server.listen(5)  # listen(5) 半连接池

# 4.监听是否发送消息,并查看客户端的地址
    # conn :服务端至客户端的管道 addr:客户端的地址
conn,addr= server.accept()
# print(addr)

# 5.服务端接受客户端消息,需解码(设定可以接受字节大小数据)
data = conn.recv(1024).decode('utf-8')
print(data)

# 6.服务端通过单向管道向客户端发送消息
conn.send(('我也好').encode('utf-8'))

# 7.服务端通道关闭
conn.close()

# 8.关闭服务器
server.close()

'''
('127.0.0.1', 49941)
你好'''
'''client客户端'''
import socket
# 1.获得socket对象client
client = socket.socket()

# 2.寻找服务端地址 (服务端的IP + 端口号)
    # client:相当于客户端至服务端的单向通道
client.connect(
    ('127.0.0.1',8848)   # (IP + port )寻找服务端
)

# 3.客户端向服务端发送消息(客户端主动)
client.send(('你好').encode('utf-8'))

# 4.客户端接受服务端的消息,设定接受数据字节大小
data = client.recv(1024).decode('utf-8')
print(data)

# 5.客户端主动断开连接
client.close()

'''我也好'''

2.循环通信套接字

'''server服务端'''
import  socket
# 1.获得socket对象server,默认指定TCP协议
server = socket.socket()

# 2.传入服务端的(IP + 端口)
server.bind(
    ('127.0.0.1',8848)   #(用户回环地址,自定义端口值)
)

# 3.开始监听
server.listen(5)  # listen(5) 半连接池

# 4.监听是否发送消息,并查看客户端的地址
    # conn :服务至客户的管道 addr:客户端的地址
conn,addr= server.accept()
print(addr)

# 5.打印时进行循环
while True:

    # 服务端接受客户端消息,需解码(设定可以接受字节大小数据)
    data = conn.recv(1024).decode('utf-8')
    print(data)
    # 判断客户端发送的是否为q 就退出
    if data == 'q':
        break

    # 6.服务端通过单向管道向客户端发送消息,自定义人工输入
    say = input('请输入向客户端发送的消息:').encode('utf-8')
    conn.send(say)

# 7.服务端通道关闭
conn.close()
'''client客户端'''
import socket
# 1.获得socket对象client
client = socket.socket()

# 2.寻找服务端地址 (服务端的IP + 端口号)
    # client:相当于客户端至服务端的单向通道
client.connect(
    ('127.0.0.1',8848)   # (IP + port )寻找服务端
)

# 3.发送消息时进行循环
while True:
    # 自定义输入消息
    msg = input('请输入客户端向服务端发送的消息:')

    # 4.客户端向服务端发送消息(客户端主动)
    client.send(msg.encode('utf-8'))

    # 判断输入为q,则主动退出
    if msg == 'q':
        break

    # 5.不是q,则客户端接受服务端的消息,设定接受数据字节大小
    data = client.recv(1024).decode('utf-8')
    print(data)

# 6.客户端主动断开连接
client.close()

3.异常处理版

import socket
# 1获取对象
s = socket.socket()
# 2 地标
s.bind(
    ('127.0.0.1',8848)
)
# 3.监听
s.listen(5)
while True:
    # 4.建立管道
    conn, addr = s.accept()
    # 5.接受信息
    while True:
        try:
            data = conn.recv(1024).decode('utf-8')
            print(data)
            # 判断bug
            if len(data) == 0:
                continue
            if data == 'q':
                break
            # 6.发送信息
            conn.send(data.encode('utf-8'))
        except Exception as e:
                print(e)
                break
    # 7.关闭通道
    conn.close()
import socket
# 1.对象
c = socket.socket()
# 2.获取地址,建立通道
c.connect(
    ('127.0.0.1',8848)
)
while True:
    # 3.发送信息
    msg = input('客户端到用户端:')
    c.send(msg.encode('utf-8'))
    if msg =='q':
        break
    # 4.接收信息
    data = c.recv(1024).decode('utf-8')
    print(data)

c.close()

4.循环收发

-------------------socket.serve-----------
import socket
​
# 买手机 默认得到是一个TCP的socket
server = socket.socket()
​
# 两行代码的效果是一样的
# socket的家族   AF_INET
# socket的类型
# SOCK_STREAM 对应的是TCP     SOCK_DGRAM 对应的是UDP
​
# server = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
# 创建基于UDP的socket
# server = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
​
​
server.bind(("127.0.0.1",16888))  # 绑定手机卡
​
server.listen() # 开始待机
​
# 连接循环 可以不断接受新连接
while True:
    client, addr = server.accept()
​
    # 通讯循环 可以不断的收发数据
    while True:
        try:
            # 如果是windows 对方强行关闭连接 会抛出异常
            # 如果是linux 不会抛出异常 会死循环收到空的数据包
            data = client.recv(1024)
            if not data:
                client.close()
                break
​
            print("收到客户端发来的数据:%s" % data.decode("utf-8"))
            client.send(data)
        except ConnectionResetError:
            print("客户端强行关闭了连接")
            client.close()
            break
client.close() #挂断电话
server.close() # 关机
​
---------------socket.client--------------
import socket
​
client = socket.socket()
client.connect(("127.0.0.1",16888))
while True:
    msg = input(">:")
    client.send(msg.encode("utf-8"))
    data = client.recv(1024)
    print("收到服务器:%s" % data.decode("utf-8"))
client.close()

4.2 UDP协议代码实现

------------------socket.serve--------------
import socket
​
# UDP协议 在创建socket是 只有一个类型不同
server = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,proto=0)
server.bind(("127.0.0.1",8888))
​
while True:
    data,addr = server.recvfrom(1024) # 阻塞 直到收到数据为止
    print("收到来自%s的消息:%s" % (data.decode("utf-8"),addr[0]))
    # 返回值为 数据 和 对方ip地址 和端口号
    server.sendto(data.upper(),addr)
​
print(res)
server.close()
​
---------------socket.client1--------------
import socket
​
client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
​
while True:
    data = input(">>:").encode("utf-8")
    client.sendto(data,("127.0.0.1",8888))
    d,addr = client.recvfrom(1024)
    print(d.decode("utf-8"))
​
client.close()
​
------------socket.client2--------------
import socket
​
client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
​
while True:
    data = input(">>:").encode("utf-8")
    client.sendto(data,("127.0.0.1",8888))
    d,addr = client.recvfrom(1024)
    print(d.decode("utf-8"))
​
client.close()

4.3 subprocess()

1.定义

1.可以帮你通过代码执行操作系统的终端命令
2.并返回终端执行命令后的结果

2.方法

subprocess 模块来运行系统命令.subprocess模块允许我们创建子进程,连接他们的输入/输出/错误管道,还有获得返回值。

1.subprocess模块中只定义了一个类: Popen。可以使用Popen来创建进程,并与进程进行复杂的交互。

2.如果参数shell设为true,程序将通过shell来执行。

3.subprocess.PIPE
  在创建Popen对象时,subprocess.PIPE可以初始化stdin, stdout或stderr参数。表示与子进程通信的标准流。

3.代码

初始版

import subprocess

# 执行系统dir命令,把执行的正确结果放到管道中
obj = subprocess.Popen(
    'tasklist',  # cmd 命令  /dir/tasklist
    shell= True,    #  Shell=True
    stderr=subprocess.PIPE, #  返回错误结果参数 error
    stdout=subprocess.PIPE  #  返回正确结果参数
)
# 拿到正确结果的管道,读出里面的内容
data = obj.stdout.read() + obj.stderr.read()
# cmd中默认为gkb,解码需要gbk
print(data.decode('gbk'))

循环打印

客户端与服务端交互,cmd命令在客户端打印

'''服务端'''
import socket
import subprocess
'''客户端输入cmd命令,服务端接受命令并传给cmd,得到正确的数据,利用subprocess返回数据'''

s = socket.socket()

s.bind(
    ('127.0.0.1',8848)
)
s.listen(5)
print('等待客户端连接')

while True:
    conn,addr = s.accept()
    print(f'有客户端{addr}成功连接')
    while True:
        try:
            # 1.接受用户输入的cmd命令
            cmd = conn.recv(1024).decode('utf-8')
            if cmd == 'q':
                break
            # 2.将用户输入命令利用subprocess得到正确返回
            obj = subprocess.Popen(
                cmd,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
            # 3.取出正确和错误结果
            data = obj.stdout.read() + obj.stderr.read()
            # 将结果发送给客户端
            conn.send(data)
        except Exception:
            break
    conn.close()
==========================================================
'''客户端'''

import socket
import subprocess

c = socket.socket()
c.connect(
    ('127.0.0.1',8848)
)
while True:
    data = input('请输入CMD命令:')
    # 发送
    c.send(data.encode('utf-8'))
    if  data == 'q':
        break
    # 接收 进行解码,cmd默认gbk形
    msg = c.recv(1024).decode('gbk')
    print(msg)

5.粘包问题

5.1 什么是粘包

粘包是指基于TCP协议传输数据时,一次传输的数据与接受的数据不匹配的情况,粘包问题分为两种,一种是由发送端引起的,一种是由接受方引起的。例如:使用socket实现一个远程控制cmd的命令程序,输入一个执行命令(如dir)得到的信息与在终端输入该命令得到的结果不同(通常提现为不全(数据过大,接受有限,未全部取走)或者不匹配(上一个命令没有拿完剩下的数据信息))

注:只有TCP有粘包现象,UDP永远不会粘包。

5.2 为何TCP会产生粘包现象

  发送端发送数据的大小与接受端接受数据的大小是可以由程序员随意设置的(发送端可以是一K一K地发送数据,而接收端的应用程序可以两K两K地提走数据,当然也有可能一次提走3K或6K数据,或者一次只提走几个字节的数据),也就是说,应用程序所看到的数据是一个整体,或说是一个流(stream),应用程序无法得知一个消息对应由多少个字节,因此TCP协议是面向流的协议,这也是容易出现粘包问题的原因。

  而UDP是面向消息的协议,每个UDP段都是一条消息(以包的形式存在系统缓冲区),应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据,这一点和TCP是很不同的。怎样定义消息呢?可以认为对方一次性write/send的数据为一个消息,需要明白的是当对方send一条信息的时候,无论底层怎样分段分片,TCP协议层会把构成整条消息的数据段排序完成后才呈现在内核缓冲区。

例如基于tcp的套接字客户端往服务端上传文件,发送时文件内容是按照一段一段的字节流发送的,在接收方看了,根本不知道该文件的字节流从何处开始,在何处结束。

  所谓粘包问题主要还是因为接收方不知道消息之间的界限,不知道一次性提取多少字节的数据所造成的。

  此外,发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一个TCP段。若连续几次需要send的数据都很少,通常TCP会根据优化算法把这些数据合成一个TCP段后一次发送出去,这样接收方就收到了粘包数据。

  1. TCP(transport control protocol,传输控制协议)是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发往接收端的包,更有效的发到对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样,接收端,就难于分辨出来了,必须提供科学的拆包机制。 即面向流的通信是无消息保护边界的。
  2. UDP(user datagram protocol,用户数据报协议)是无连接的,面向消息的,提供高效率服务。不会使用块的合并优化算法,, 由于UDP支持的是一对多的模式,所以接收端的skbuff(套接字缓冲区)采用了链式结构来记录每一个到达的UDP包,在每个UDP包中就有了消息头(消息来源地址,端口等信息),这样,对于接收端来说,就容易进行区分处理了。 即面向消息的通信是有消息保护边界的。
  3. tcp是基于数据流的,于是收发的消息不能为空,这就需要在客户端和服务端都添加空消息的处理机制,防止程序卡住,而udp是基于数据报的,即便是你输入的是空内容(直接回车),那也不是空消息,udp协议会帮你封装上消息头。

udp的recvfrom是阻塞的,一个recvfrom(x)必须对唯一一个sendinto(y),收完了x个字节的数据就算完成,若是y>x数据就丢失,这意味着udp根本不会粘包,但是会丢数据,不可靠。

tcp的协议数据不会丢,没有收完包,下次接收,会继续上次继续接收,己端总是在收到ack时才会清除缓冲区内容。数据是可靠的,但是会粘包。

5.3 解决粘包问题

为了避免粘包现象,可采取以下几种措施:

问题的根源在于,接收端不知道发送端将要传送的字节流的长度,所以解决粘包的方法就是围绕,如何让发送端在发送数据前,把自己将要发送的字节流总大小让接收端知晓,然后接收端来一个死循环接收完所有数据。

  a、可以通过导入time模块让程序在发完一段数据后睡眠一段时间,让另一端接受玩后在发送下一段数据,这种方法严重影响程序的运行速度,因此不建议使用。

  b、为字节流加上自定义固定长度报头(用struct模块来pack个定长的报头),报头中包含字节流长度,然后一次send到对端,对端在接收时,先从缓存中取出定长的报头,然后再取真实数据

(1)对于发送方引起的粘包现象,用户可通过编程设置来避免,TCP提供了强制数据立即传送的操作指令push,TCP软件收到该操作指令后,就立即将本段数据发送出去,而不必等待发送缓冲区满;

(2)对于接收方引起的粘包,则可通过优化程序设计、精简接收进程工作量、提高接收进程优先级等措施,使其及时接收数据,从而尽量避免出现粘包现象;

(3)由接收方控制,将一包数据按结构字段,人为控制分多次接收,然后合并,通过这种手段来避免粘包。

  代码编写思路:

  我们可以把报头做成字典,字典里包含将要发送的真实数据的详细信息,然后json序列化,然后用struct将序列化后的数据长度打包成4个字节(4个字节已够用)

  发送时:

    1、先发送报头长度;

    2、再编码报头内容然后发送;

    3、最后发真实内容 。

  接收时:

    1、先收取报头长度,用struct取出来;

    2、根据取出的长度收取报头内容,然后解码,反序列化;

    3、从反序列化的结果中取出待取数据的详细信息,然后去取真实的数据内容。

struct模块

可以将发送的数据长度提前发送至服务端,服务端接受到数据长度,自定义接受.

必须先定义报头,发送报头,再发送真实数据.

是一个可以将很长的数据的长度,压缩成固定的长度的一个标记(数据报头)

`i:模式`,会将数据长度压缩成4个bytes
代码
'''服务端'''
import socket
import struct
import subprocess

s= socket.socket()
s.bind(
    ('127.0.0.1',8848)
)

s.listen(5)
print('等待客户端连接')
while True :
    conn,addr= s.accept()
    print(f'客户端{addr}已连接')
    while True:
        try:
            # 1.接受用户输入的cmd命令
            cmd = conn.recv(1024).decode('utf-8')
            if cmd == 'q':
                break
            # 2.将用户输入命令利用subprocess得到正确返回
            obj = subprocess.Popen(
                cmd,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
            # 3.取出正确和错误结果
            data = obj.stdout.read() + obj.stderr.read()
            # 4.打包压缩,成只有4位的报头信息,i 模式
            headers = struct.pack('i',len(data))
            # 5.将报头先行发送,让客户端准备接受的大小
            conn.send(headers)
            # 6.再发送真实数据
            conn.send(data)
        except Exception as e:
            print(e)
            break
    conn.close()
=========================================================
'''客户端'''
import socket
import subprocess
import struct

c = socket.socket()
c.connect(
    ('127.0.0.1',8848)
)
while True:
    msg = input('请输入cmd命令')
    if msg == 'q':
        break
    # 1.将cmd 命令传至服务端处理
    c.send(msg.encode('utf-8'))
    # 2.接受服务端发送的4位报头
    headers = c.recv(4)
    # 3.将报头解包(unpack),获得元组,索引取数据长度
    data_len = struct.unpack('i',headers)[0]
    # 4.接受真实的数据信息
    data = c.recv(data_len)
    print(data.decode('gbk'))
c.close()
代码2

发送文件的描述信息(字典)

'''客户端发送字典给服务端
send_dic:{
    file_name : 文件名
    file_size : 文件的真实长度}
服务端接受到字典,并接受文件的真实数据'''

服务端

import socket
import struct
import json

s = socket.socket()
s.bind(
    ('127.0.0.1',8848)
)
s.listen(5)
print('等待客户端连接')

while True:
    conn,addr = s.accept()
    print(f'用户{addr}已连接')
    while True:
        try:
            # 1.接受到客户端发送的报头
            beaders = conn.recv(4)
            # print(beaders)  #  b'M\x00\x00\x00'

            # 2.报头解压缩,索引获得数据长度
            data__len = struct.unpack('i',beaders)[0]
            # print(data__len)    # 77

            # 3.接受真实数据(序列化的数据)
            bytes_data = conn.recv(data__len)
            # print(bytes_data)   # b'{"file_name": "abc\\u7684\\u9017\\u6bd4\\u4eba\\u751f.txt", "file_size": 12345678}'

            # 4.反序列化获得数据
            dic = json.loads(bytes_data.decode('utf-8'))
            # print(dic)  #  {'file_name': 'abc的逗比人生.txt', 'file_size': 12345678}

        except Exception as e :
            print(e)
            break
    conn.close()
=========================================================
'''客户端'''
import socket
import struct
import json
import time

c = socket.socket()
c.connect(
    ('127.0.0.1',8848)
)
while True:
    # 1.用户文件的字典
    send_dic = {
        'file_name':'abc的逗比人生.txt',
        'file_size':12345678
    }
    # 2.json序列化,并转码成bytes类型数据(为的是struct的len长度)
    json_data = json.dumps(send_dic)
    bytes_data = json_data.encode('utf-8')
    # 3.压缩数据做成报头,发送至服务端
    headers = struct.pack('i',len(bytes_data))
    # print(bytes_data)    # b'{"file_name": "abc\\u7684\\u9017\\u6bd4\\u4eba\\u751f.txt", "file_size": 12345678}'
    # print(len(bytes_data))    # 77
    # print(headers)    # b'M\x00\x00\x00'

    c.send(headers)
    # 4.发送真实字典
    c.send(bytes_data)

    time.sleep(10)
    # {'file_name': 'abc的逗比人生.txt', 'file_size': 12345678}
    # 会一直打印,所以手动停止5秒
上传大文件

利用while循环进行一段一段的上传防止粘包.

服务端

import socket
import struct
import json

s = socket.socket()
s.bind(
    ('127.0.0.1',8848)
)
s.listen(5)
print('等待客户端连接')


while True:
    conn, addr = s.accept()
    print(f'客户端{addr}已连接')
    try:
        # 1.接受客户端传来的字典报头
        headers = conn.recv(4)
        # 2.解压索引获得字典的长度
        data_len = struct.unpack('i',headers)[0]
        # 3.接受文件字典的bytes信息
        bytes_data = conn.recv(data_len)
        # 4.反序列化得到字典数据
        data_dic = json.loads(bytes_data.decode('utf-8'))
        print(data_dic)
        # 5.获得文件字典的名字与大小
        file_name = data_dic.get('file_name')
        file_size = data_dic.get('file_size')

        # 6.以文件名打开文件(循环控制打开资源占用)
        size = 0
        with open(file_name,'wb') as f:
            while size < file_size:
                # 每次接受1024大小
                data = conn.recv(1024)
                # 每次写入data大小
                f.write(data)
                # 写完进行追加
                size += len(data)
            print(f'{file_name}接受完毕')
    except Exception as e:
        print(e)
        break
conn.close()

客户端

import socket
import struct
import json

c= socket.socket()
c.connect(
    ('127.0.0.1',8848)
)
# 1.打开一个视频文件,获取数据大小
with open(r'F:\老男孩12期开课视频\day 27\5 上传大文件.mp4','rb') as f :
    movie_bytes = f.read()   #获得的是二进制流
    # 文件自动关闭
# 2.为视频文件组织一个信息字典,字典有名称大小
movie_info = {
    'file_name':'大视频.mp4',
    'file_size':len(movie_bytes)
}
# 3.序列化字典,发送文件字典的报头(客户端获得文件的名字与大小)
json_data = json.dumps(movie_info)
bytes_data = json_data.encode('utf-8')
    # 获得字典的报头
headers = struct.pack('i',len(bytes_data))
    # 发送报头
c.send(headers)
    # 发送真实文件的字典
c.send(bytes_data)
# 4.发送真实的文件数据(大文件循环发送减少占用)
size = 0
num = 1
with open(r'F:\老男孩12期开课视频\day 27\5 上传大文件.mp4','rb') as f :
    while size < len(movie_bytes):
        # 打开时每次读取1024大小数据
        send_data = f.read(1024)  # 获得的是二进制流
        print(send_data,num)
        num += 1
        # 每次发送都是1024大小数据
        c.send(send_data)
        # 为初始数据增加发送的大小,控制循环
        size += len(send_data)

二. 并发编程

1.进程

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础

1.1 什么是进程

进程指的是正在运行的程序,是一系列过程的统称,也是操作系统调度和进行资源分配的基本单位。

进程是实现并发的一种方式,在学习并发编程之前要先了解进程的基本概念以及多进程的实现原理,这就必须提到操作系统了,因为进程这个概念来自于操作系统,没有操作系统就没有进程。

1.2 什么是并发编程

并发指的是多个任务(看起来像是)同时被执行

在之前的TCP通讯中,服务器在建立连接后需要一个循环来与客户端循环的收发数据,但服务器并不知道客户端什么时候会发来数据,导致没有数时服务器进入了一个等待状态,此时其他客户端也无法链接服务器,很明显这是不合理的,学习并发编程就是要找到一种方案,让一个程序中的的多个任务可以同时被处理。

1.3 多进程的实现原理 - 多道技术

1、操作系统介绍

下图是操作系统在整个计算机中所在的位置:

操作系统位于应用软件和硬件设备之间,本质上也是一个软件,

它由系统内核(管理所有硬件资源)与系统接口(提供给程序员使用的接口)组成

操作系统是为了方便用户操作计算机而提供的一个运行在硬件之上的软件

2、操作系统的两个核心作用

1、为用户屏蔽了复杂繁琐的硬件接口,为应用程序提供了,清晰易用的系统接口。

有了这些接口以后程序员不用再直接与硬件打交道了

例子:有了操作系统后我们就可以使用资源管理器来操作硬盘上的数据,而不用操心,磁头的移动啊,数据的读写等等

2、操作系统将应用程序对硬件资源的竞争变成了有序的使用。

例子:所有软件 qq啊 微信啊 吃鸡啊都共用一套硬件设备 假设现有三个程序都在使用打印机,如果不能妥善管理竞争问题,可能一个程序打印了一半图片后,另一个程序抢到了打印机执行权于是打印了一半文本,导致两个程序的任务都没能完成,操作系统的任务就是将这些无序的操作变得有序

3、操作系统与应用程序的区别

二者的区别不在于的地位,它们都是软件,而操作系统可以看做一款特殊的软件

1.操作系统是受保护的:无法被用户修改(应用软件如qq不属于操作系统可以随便卸载)。

2.大型:linux或widows源代码都在五百万行以上,这仅仅是内核,不包括用户程序,如GUI,库以及基本应用软件(如windows Explorer等),很容易就能达到这个数量的10倍或者20倍之多。

3.长寿:由于操作系统源码量巨大,编写是非常耗时耗力的,一旦完成,操作系统所有者便不会轻易的放弃重写,二是在原有基础上改进,基本上可以把windows95/98/Me看成一个操作系统。

4. 发展历史:

多道技术出现在第三代操作系统中,是为了解决前两代操作系统存在的种种问题而出现,那么前两代操作系统都有哪些问题呢?一起来看看操作系统的发展历史:

第一代计算机(1940~1955):真空管和穿孔卡片

第一代计算机的产生背景:

第一代之前人类是想用机械取代人力,第一代计算机的产生是计算机由机械时代进入电子时代的标志,从Babbage失败之后一直到第二次世界大战,数字计算机的建造几乎没有什么进展,第二次世界大战刺激了有关计算机研究的爆炸性进展。

lowa州立大学的john Atanasoff教授和他的学生Clifford Berry建造了据认为是第一台可工作的数字计算机。该机器使用300个真空管。大约在同时,Konrad Zuse在柏林用继电器构建了Z3计算机,英格兰布莱切利园的一个小组在1944年构建了Colossus,Howard Aiken在哈佛大学建造了Mark 1,宾夕法尼亚大学的William Mauchley和他的学生J.Presper Eckert建造了ENIAC。这些机器有的是二进制的,有的使用真空管,有的是可编程的,但都非常原始,设置需要花费数秒钟时间才能完成最简单的运算。

在这个时期,同一个小组里的工程师们,设计、建造、编程、操作及维护同一台机器,所有的程序设计是用纯粹的机器语言编写的,甚至更糟糕,需要通过成千上万根电缆接到插件板上连成电路来控制机器的基本功能。没有程序设计语言(汇编也没有),操作系统则是从来都没听说过。使用机器的过程更加原始,详见下‘工作过程’

特点: 没有操作系统的概念 所有的程序设计都是直接操控硬件

工作过程: 程序员在墙上的机时表预约一段时间,然后程序员拿着他的插件版到机房里,将自己的插件板街道计算机里,这几个小时内他独享整个计算机资源,后面的一批人都得等着(两万多个真空管经常会有被烧坏的情况出现)。

后来出现了穿孔卡片,可以将程序写在卡片上,然后读入机而不用插件板

优点:

程序员在申请的时间段内独享整个资源,可以即时地调试自己的程序(有bug可以立刻处理)

缺点:

浪费计算机资源,一个时间段内只有一个人用。 注意:同一时刻只有一个程序在内存中,被cpu调用执行,比方说10个程序的执行,是串行的

第二代计算机(1955~1965):晶体管和批处理系统

第二代计算机的产生背景:

由于当时的计算机非常昂贵,自认很自然的想办法较少机时的浪费。通常采用的方法就是批处理系统。

特点: 设计人员、生产人员、操作人员、程序人员和维护人员直接有了明确的分工,计算机被锁在专用空调房间中,由专业操作人员运行,这便是‘大型机’。

有了操作系统的概念

有了程序设计语言:FORTRAN语言或汇编语言,写到纸上,然后穿孔打成卡片,再将卡片盒带到输入室,交给操作员,然后喝着咖啡等待输出接口

工作过程:

第二代如何解决第一代的问题/缺点: 1.把一堆人的输入攒成一大波输入, 2.然后顺序计算(这是有问题的,但是第二代计算也没有解决) 3.把一堆人的输出攒成一大波输出

现代操作系统的前身:(见图)

优点:批处理,节省了机时

缺点:1.整个流程需要人参与控制,将磁带搬来搬去(中间俩小人)

2.计算的过程仍然是顺序计算-》串行

3.程序员原来独享一段时间的计算机,现在必须被统一规划到一批作业中,等待结果和重新调试的过程都需要等同批次的其他程序都运作完才可以(这极大的影响了程序的开发效率,无法及时调试程序)

第三代计算机(1965~1980):集成电路芯片和多道程序设计

第三代计算机的产生背景:

20世纪60年代初期,大多数计算机厂商都有两条完全不兼容的生产线。

一条是面向字的:大型的科学计算机,如IBM 7094,见上图,主要用于科学计算和工程计算

另外一条是面向字符的:商用计算机,如IBM 1401,见上图,主要用于银行和保险公司从事磁带归档和打印服务

开发和维护完全不同的产品是昂贵的,同时不同的用户对计算机的用途不同。

IBM公司试图通过引入system/360系列来同时满足科学计算和商业计算,360系列低档机与1401相当,高档机比7094功能强很多,不同的性能卖不同的价格

360是第一个采用了(小规模)芯片(集成电路)的主流机型,与采用晶体管的第二代计算机相比,性价比有了很大的提高。这些计算机的后代仍在大型的计算机中心里使用,此乃现在服务器的前身,这些服务器每秒处理不小于千次的请求。

如何解决第二代计算机的问题1: 卡片被拿到机房后能够很快的将作业从卡片读入磁盘,于是任何时刻当一个作业结束时,操作系统就能将一个作业从磁带读出,装进空出来的内存区域运行,这种技术叫做 同时的外部设备联机操作:SPOOLING,该技术同时用于输出。当采用了这种技术后,就不在需要IBM1401机了,也不必将磁带搬来搬去了(中间俩小人不再需要)

如何解决第二代计算机的问题2:

第三代计算机的操作系统广泛应用了第二代计算机的操作系统没有的关键技术:多道技术

cpu在执行一个任务的过程中,若需要操作硬盘,则发送操作硬盘的指令,指令一旦发出,硬盘上的机械手臂滑动读取数据到内存中,这一段时间,cpu需要等待,时间可能很短,但对于cpu来说已经很长很长,长到可以让cpu做很多其他的任务,如果我们让cpu在这段时间内切换到去做其他的任务,这样cpu不就充分利用了吗。这正是多道技术产生的技术背景

多道技术:

多道技术中的多道指的是多个程序,多道技术的实现是为了解决多个程序竞争或者说共享同一个资源(比如cpu)的有序调度问题,解决方式即多路复用,多路复用分为时间上的复用和空间上的复用。

空间上的复用:将内存分为几部分,每个部分放入一个程序,这样,同一时间内存中就有了多道程序。

时间上的复用:当一个程序在等待I/O时,另一个程序可以使用cpu,如果内存中可以同时存放足够多的作业,则cpu的利用率可以接近100%,类似于我们小学数学所学的统筹方法。(操作系统采用了多道技术后,可以控制进程的切换,或者说进程之间去争抢cpu的执行权限。这种切换不仅会在一个进程遇到io时进行,一个进程占用cpu时间过长也会切换,或者说被操作系统夺走cpu的执行权限)

空间上的复用最大的问题是:程序之间的内存必须分割,这种分割需要在硬件层面实现,由操作系统控制。如果内存彼此不分割,则一个程序可以访问另外一个程序的内存,

首先丧失的是安全性,比如你的qq程序可以访问操作系统的内存,这意味着你的qq可以拿到操作系统的所有权限。

其次丧失的是稳定性,某个程序崩溃时有可能把别的程序的内存也给回收了,比方说把操作系统的内存给回收了,则操作系统崩溃。

多道技术案例:

生活中我们进程会同时做多个任务,但是本质上一个人是不可能同时做执行多个任务的,

例1:吃饭和打游戏,同时执行,本质上是在两个任务之间切换执行,吃一口饭然后打打游戏,打会儿游戏再吃一口饭;

例2:做饭和洗衣服,如果没有多道技术,在电饭煲做饭的时候我们就只能等着,假设洗米花费5分钟,煮饭花费40分钟,相当于40分钟是被浪费的时间。那就可以在煮饭的等待过程中去洗衣服,假设把衣服装进洗衣机花费5分钟,洗衣服花费40分钟,那么总耗时为 5(洗米)+5(装衣服)+40(最长等待时间) 大大提高了工作效率

多道技术也是在不同任务间切换执行,由于计算机的切换速度非常快,所以用户是没有任何感觉的,看起来就像是两个任务都在执行,但是另一个问题是,仅仅是切换还不行,还需要在切换前保存当前状态,切换回来时恢复状态,这些切换和保存都是需要花费时间的!在上述案例中由于任务过程中出现了等待即IO操作所以进行了切换,而对于一些不会出现IO操作的程序而言,切换不仅不能提高效率,反而会降低效率

例如:做一百道乘法题和做一百道除法题,两个任务都是计算任务是不需要等待的,此时的切换反而降低了运行效率!

第三代计算机的操作系统仍然是批处理

许多程序员怀念第一代独享的计算机,可以即时调试自己的程序。为了满足程序员们很快可以得到响应,出现了分时操作系统

如何解决第二代计算机的问题3:

分时操作系统: 多个联机终端+多道技术

20个客户端同时加载到内存,有17在思考,3个在运行,cpu就采用多道的方式处理内存中的这3个程序,由于客户提交的一般都是简短的指令而且很少有耗时长的,索引计算机能够为许多用户提供快速的交互式服务,所有的用户都以为自己独享了计算机资源

CTTS:麻省理工(MIT)在一台改装过的7094机上开发成功的,CTSS兼容分时系统,第三代计算机广泛采用了必须的保护硬件(程序之间的内存彼此隔离)之后,分时系统才开始流行

MIT,贝尔实验室和通用电气在CTTS成功研制后决定开发能够同时支持上百终端的MULTICS(其设计者着眼于建造满足波士顿地区所有用户计算需求的一台机器),很明显真是要上天啊,最后摔死了。

后来一位参加过MULTICS研制的贝尔实验室计算机科学家Ken Thompson开发了一个简易的,单用户版本的MULTICS,这就是后来的UNIX系统。基于它衍生了很多其他的Unix版本,为了使程序能在任何版本的unix上运行,IEEE提出了一个unix标准,即posix(可移植的操作系统接口Portable Operating System Interface)

后来,在1987年,出现了一个UNIX的小型克隆,即minix,用于教学使用。芬兰学生Linus Torvalds基于它编写了Linux

第四代计算机(1980~至今):个人计算机

第四代也就是我们常见的操作系统,大多是具备图形化界面的,例如:Windows,macOS ,CentOS等

由于采用了IC设计,计算机的体积下降,性能增长,并且成本以及可以被普通消费者接受,而第三代操作系统大都需要进行专业的学习才能使用,于是各个大佬公司开始开发那种不需要专业学习也可以快速上手的操作系统,即上述操作系统!

它们都是用了GUI 图形化用户接口,用户只需要通过鼠标点击拖拽界面上的元素即可完成大部分操作

1.4 进程的创建

方式1:实例化Process类

from multiprocessing import Process
import time

def task(name):
    print('%s is running' %name)
    time.sleep(3)
    print('%s is done' %name)
if __name__ == '__main__':
    # 在windows系统之上,开启子进程的操作一定要放到这下面
    # Process(target=task,kwargs={'name':'egon'})
    p=Process(target=task,args=('jack',))
    p.start() # 向操作系统发送请求,操作系统会申请内存空间,然后把父进程的数据拷贝给子进程,作为子进程的初始状态
    print('======主')

方式2:继承Process类 并覆盖run方法

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self,name):
        super(MyProcess,self).__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(3)
        print('%s is done' %self.name)
if __name__ == '__main__':
    p=MyProcess('jack')
    p.start()
    print('主')

需要注意的是 在windows下 开启子进程必须放到__main__下面,因为windows在开启子进程时会重新加载所有的代码造成递归

1.5 进程属性方法

进程是正在运行的程序,程序是程序员编写的一堆代码,也就是一堆字符,当这堆代码被系统加载到内存中并执行时,就有了进程 ,一个程序可以产生多个进程.

1.current_process().pid

在子进程内使用,可以获取子进程号

from multiprocessing import Process
from multiprocessing import current_process

def task():
    print('开始执行...',current_process().pid)
    print('结束执行')
    
if __name__ == '__main__':
    p = Process(target=task)  # 获得process对象
    p.start()  # 创建子进程
    p.join()    # 执行完子进程在执行主进程
    print('主进程')
'''开始执行... 2092
结束执行
主进程'''

2.os.getpid()

在主进程中获得主进程号. 系统会给每一个进程分配一个进程编号即PID.

tasklist 用于查看所有的进程信息

# 在python中可以使用os模块来获取pid
import os
print(os.getpid())

#还可以通过current_process模块来获得
current_process().pid   #可获得当前运行的程序的pid
from multiprocessing import Process
from multiprocessing import current_process
import os

def task():
    print('开始执行...',current_process().pid)
    print('结束执行')

if __name__ == '__main__':
    p = Process(target=task)  # 获得process对象
    p.start()  # 创建子进程
    p.join()    # 执行完子进程在执行主进程
    print('主进程',os.getpid())
'''
开始执行... 11072
结束执行
主进程 13892
'''

3.os.getppid

可以查看主主进程的进程号

当一个进程a开启了另一个进程b时,a称为b的父进程,b称为a的子进程 ,在python中可以通过os模块来获取父进程的pid

# 在python中可以使用os模块来获取ppid
import os
print("self",os.getpid()) # 当前进程自己的pid
print("parent",os.getppid()) # 当前进程的父进程的pid

如果是在pycharm中运行的py文件,那pycahrm就是这个python.exe的父进程,当然你可以从cmd中来运行py文件,那此时cmd就是python.exe的父进程

def task():
    print('开始执行...',current_process().pid)
    time.sleep(1)
    print('结束执行')

if __name__ == '__main__':
    p = Process(target=task)  # 获得process对象
    p.start()  # 创建子进程
    p.join()    # 执行完子进程在执行主进程
    print('主进程',os.getpid())
    print('主主进程',os.getppid())
    time.sleep(1000)
'''
开始执行... 12368
结束执行
主进程 10332   # python.exe
主主进程 4152  # pycharm64.exe
'''

4.join()方法

join函数就可以是父进程等待子进程结束后继续执行

# 让子进程结束后,父进程才结束
from multiprocessing import Process
import time

def task(name):
    print('任务开始')
    time.sleep(1)
    print('任务结束')

if __name__ == '__main__':
    p = Process(target= task,args=('你好',))
    p.start() # 告诉操作系统,开启子进程
    print('join上面的不算')
    p.join()   # 告诉操作系统,等子进程结束后,父进程再结束
    print('主进程')
'''
join上面的不算
任务开始
任务结束
主进程'''

多个子程序的运行

# 多个子进程的运行
from multiprocessing import Process
import time

def task(name,n):
    print(f'{name}任务{n}开始')
    time.sleep(1)
    print(f'{name}任务{n}结束')

if __name__ == '__main__':
    p1 = Process(target= task,args=('AAA',1))
    p2 = Process(target= task,args=('BBB',2))

    p1.start() # 告诉操作系统,开启子进程
    p2.start() # 告诉操作系统,开启子进程
    print('join上面的不算')
    p1.join()   # 告诉操作系统,等子进程结束后,父进程再结束
    p2.join()   # 告诉操作系统,等子进程结束后,父进程再结束
    print('主进程')
'''
join上面的不算
AAA任务1开始
BBB任务2开始
AAA任务1结束
BBB任务2结束
主进程
'''
打印时会出现任务1,2顺序的不一致,貌似是因为程序并行导致cpu分配执行打印速度导致

5.is_alive()

判断子进程是否存活

def task():
    print('开始执行...',current_process().pid)

if __name__ == '__main__':
    p = Process(target=task)  # 获得process对象
    p.start()  # 创建子进程
    print(p.is_alive())   # 判断子进程是否存活
    print('主进程',os.getpid())
'''
True
主进程 3740
开始执行... 7004'''

6..terminate()

直接告诉操作系统,终止子程序

def task():
    print('开始执行...',current_process().pid)

if __name__ == '__main__':
    p = Process(target=task)  # 获得process对象
    p.start()  # 创建子进程
    # 判断子进程是否存活
    print(p.is_alive())
    # 告诉操作系统直接终止掉子进程
    p.terminate()
    time.sleep(0.1)
    # 判断子进程是否存活
    print(p.is_alive())
    print('主进程',os.getpid())
'''
True
False
主进程 7976'''

1.6 进程的回收

1.进程回收的两种条件

  • join,可以回收子进程与主进程
  • 主进程正常结束,子进程与主进程也会被回收
正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)

出错退出(自愿,python a.py中a.py不存在)

严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)

被其他进程杀死(非自愿,如kill -9)

2.僵尸进程

指的是子进程已经结束,但PID号还存在,但是父进程 没有去处理这些残留信息,就导致残留信息占用系统内存 .

缺点: 占用PID号,占用操作系统资源

PID号:PID是各进程的代号,运行时系统随机分配,但是进程终止后PID标识符就会被系统回收,进程号具有固定数量.

3.孤儿进程

指的是子进程还在执行,但父进程意外结束.

操作系统优化机制:自动回收此类子进程

4.守护进程

指的是主进程结束后,该主进程产生的所有子进程跟着结束,并回收.

1、定义:一个进程守护另一个进程,当被守护进程运行结束后,守护进程不管是否运行结束,也都跟着结束。

特点:a、守护进程一般为子进程,进程之间是相互独立的,守护进程会在主进程执行结束后就终止;

b、守护进程内无法再开启子进程,否则会抛出异常。

p.daemon = true设置为守护进程

p.daemon = true  (设置为守护进程)
p.start
# 必须在p.start之前设置,如果在之后会报错

1.7 并发与并行,阻塞与非阻塞

1、并发指的是,多个事件同时发生了

例如洗衣服和做饭,同时发生了,但本质上是两个任务在切换,给人的感觉是同时在进行,也被称为伪并行

2、并行指的是,多个事件都是进行着

例如一个人在写代码另一个人在写书,这两件事件是同时在进行的,要注意的是一个人是无法真正的并行执行任务的,在计算机中单核CPU也是无法真正并行的,之所以单核CPU也能同时运行qq和微信其实就是并发执行

3、阻塞与非阻塞指的是程序的状态

阻塞状态是因为程序遇到了IO操作,或是sleep,导致后续的代码不能被CPU执行

非阻塞与之相反,表示程序正在正常被CPU执行

4. 进程三种状态

就绪态,运行态,和阻塞态

多道技术会在进程执行时间过长或遇到IO时自动切换其他进程,意味着IO操作与,进程被剥夺CPU执行权都会造成进程阻塞

1.8 进程互斥锁

from multiprocessing import Lock

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,但是共享带来的是竞争,竞争带来的结果就是错乱,那么如何控制错乱呢,此时就用到了加锁处理。

1. 并发变串行

让并发变成串行,牺牲了执行效率,保证了数据的安全在程序并发执行时,需要修改数据时使用.

运行时出现全部购票成功的情况(余票为1),是因为并发编程,每个子进程都获得res为1,每个都以自己的res执行get,最后成功购票,可以在p.start后面加上p.join使其变为串行,前面结束才能运行后面的
from multiprocessing import Process
import json,time

def search():
    '''查询余票'''
    time.sleep(1)
    with open('db.txt','r',encoding='utf-8') as f:
        res = json.load(f)
        print(f"还剩{res.get('count')}")

def get():
    '''购票'''
    with open('db.txt','r',encoding='utf-8') as f:
        res = json.load(f)
    time.sleep(1)  # 模拟网络io延迟
    if res['count'] > 0:
        res['count'] -= 1
        with open('db.txt', 'w', encoding='utf-8') as f:
            json.dump(res,f)
            print('抢票成功')
        time.sleep(1)
    else:
        print('车票已经售罄')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(10):
        p =Process(target=task)
        p.start()
        # 加上join运行结果正确
        p.join()  

2. 使用lock进程锁

使第一个进程进去第二个必须等待结束才能进,把锁住的代码变成了串行.

lock.acquire()加锁, lock.release()解锁

# 设置进程串行,进程锁/互斥锁,使第一个进程进去第二个必须等待结束才能进
from multiprocessing import Process
import json,time
from multiprocessing import Lock

def search():
    time.sleep(1)
    with open('db.txt','r',encoding='utf-8') as f:
        res = json.load(f)
        print(f"还剩{res.get('count')}")

def get():
    with open('db.txt','r',encoding='utf-8') as f:
        res = json.load(f)
    time.sleep(1)  # 模拟网络io
    if res['count'] > 0:
        res['count'] -= 1
        with open('db.txt', 'w', encoding='utf-8') as f:
            json.dump(res,f)
            print('抢票成功')
        time.sleep(1)
    else:
        print('车票已经售罄')

def task(lock):
    search()
    # lock = Lock()  # 写在主进程时为了让子进程拿到同一把锁
    # 锁住
    lock.acquire()
    get()   # 同一时间只能一个进程执行get()
    # 释放锁
    lock.release()


if __name__ == '__main__':
    lock = Lock()   # 写在主进程时为了让子进程拿到同一把锁
    for i in range(10):
        p =Process(target=task,args=(lock,))  # 将lock当做参数传入
        p.start()

总结

进程锁: 是把锁住的代码变成了串行

join: 是把所有的子进程变成了串行

3. 总结

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,速度是慢了,但牺牲了速度却保证了数据安全。虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:

1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题。

这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中

队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

jion和锁的区别
1、join中顺序是固定的;
2、join是完全串行,而锁可以使部分代码串行,其他代码仍是并发的。

1.9 进程间通信IPC

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。

1、实现进程之间相互通讯的几种方式:

a、使用共享文件,多个进程同时读写同一个文件(IO速度慢,传输数据大小不受限制);

b、管道,管道是基于内存的额,速度快,但是单向的,用起来比较麻烦;

c、队列,申请共享内存空间,多个进程可以共享这个内存区域(速度快,但是数据量不能太大)。

管道

pipe: 基于共享的内存空间

队列

Queue: 管道+锁

from multiprocessing import Queue

队列:先进先出

相当于内存中的产生一个队列空间,可以存放多个数据,但数据的顺序是由先进去的排在前面

堆栈:先进后出
1.Queue()

调用队列类,实例化对象q

q = Queue(5)   #若传参队列中可以放5个数据
q = Queue()     #若不传参,队列中可以存放无限大的数据,前提是硬件跟得上
2.put()

添加数据,若队列中的数据满了放值会阻塞

from multiprocessing import Queue
q = Queue(2)
q.put('山')
q.put('水')
q.put([1,2,3])  # 阻塞住

========================================================

from multiprocessing import Queue
q = Queue(2)
#  block=true  是默认会阻塞, timeout = 2  等待时间超时2s
q.put('风',block=True,timeout=2)
q.put('风',block=True,timeout=2)
# 如果满了会进行等待,但最多等待2s否则报错
q.put('风',block=True,timeout=2)
3.q.get()

遵循先进先出

from multiprocessing import Queue
q = Queue(2)
q.put('山')
q.put('水')
q.put([1,2,3])  

print(q.get())  # 山
print(q.get())  # 水
print(q.get())  # [1, 2, 3]
print(q.get())  # 默认存值没有就会阻塞在这
4.empty()

判断队列是否为空

print(q.empty())  # False
5.q.get_nowait()

获取数据,队列中若没有则会报错

print(q.get_nowait())
6.q.put_nowait()

添加数据 若队列满了, 则会报错

q.put_nowait(6)
7.q.full()

判断队列是否满

print(q.full())  # True
8.代码
from multiprocessing import Process,Queue

def test1(q):
    data = f'你好啊,赛利亚'
    # 将数据data传入队列中
    q.put(data)
    print('进程1开始添加数据到队列中...')

def test2(q):
    data = q.get()
    print(f'进程2从队列中获取数据[{data}]')

if __name__ == '__main__':
    # 获得队列的对象
    q = Queue()
    # 获取进程对象,并将队列对象传入
    p1 = Process(target=test1,args=(q,))
    p2 = Process(target=test2,args=(q,))

    # 启动进程1,2
    p1.start()
    p2.start()
'''
进程1开始添加数据到队列中...
进程2从队列中获取数据[你好啊,赛利亚]
'''

1.10 生产者消费者模型

生产者:生产数据的

消费者:使用数据的

生产者 --- 队列(容器) --- 消费者

通过队列,生产者把数据添加进去,消费者从队列中获取(不适合传大文件)

生产者可以不停的生产,并可以存放在容器队列中,消费者也可以不停的取,没有因生产者的生产效率低下而等待(一种设计思想)

1.定义

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

2.作用

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

3.应用

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

4.代码实现

简单版

from multiprocessing import Queue,Process

def producer(q,name,food):
    '''生产者'''
    for i in range(10):
        print(f'{name}生产出{food}数据{i}')
        # 传输的消息
        res = f'{food}{i}'
        # 放入队列中
        q.put(res)
    # 循环结束,传入q结束生产
    q.put('q')


def consumer(q,name):
    '''消费者'''
    # 消费者不停的获取
    while True:
        # 从队列中获取
        res = q.get()
        if res == 'q':
            break
        print(f'{name}消费使用了{res}数据')

if __name__ == '__main__':
    q = Queue() # 创建队列

    p1 = Process(target=producer,args=(q,'小明','电影'))  # 传入子进程中
    p2 = Process(target=consumer,args=(q,'小红'))

    p1.start()
    p2.start()

加强版

from multiprocessing import Queue,Process

def producer(q,name,food):
    '''生产者'''
    for i in range(3):
        print(f'{name}生产出{food}数据{i}')
        # 传输的消息
        res = food,i
        # 放入队列中
        q.put(res)

def consumer(q,name):
    '''消费者'''
    # 消费者不停的获取
    while True:
        # 从队列中获取
        res = q.get()
        # 判断是否有数据
        if not res:
            break
        print(f'{name}消费使用了{res}数据')

if __name__ == '__main__':
    q = Queue() # 创建队列

    p1 = Process(target=producer,args=(q,'小明','电影'))  # 传入子进程中
    p2 = Process(target=producer,args=(q,'小黑','书籍'))  # 传入子进程中
    p3 = Process(target=producer,args=(q,'小亮','动漫'))  # 传入子进程中

    c1 = Process(target=consumer,args=(q,'小红'))
    c2 = Process(target=consumer,args=(q,'小绿'))

    p1.start()
    p2.start()
    p3.start()

    # 将c1,2设置为守护者模式
    c1.daemon = True
    c2.daemon = True

    c1.start()
    c2.start()
    # join 设置子程序结束后主程序再结束
    p3.join()
    print('主')

2.线程

线程与进程都是虚拟单位,目的是为了更好的描述某种事物.

2.1 什么是线程

进程:资源单位
进程是程序的分配资源的最小单元;一个程序可以有多个进程,但只有一个主进程;进程由程序、数据集、控制器三部分组成。

线程:执行单位
线程是程序最小的执行单元;一个进程可以有多个线程,但是只有一个主线程;线程切换分为两种:一种是I/O切换,一种是时间切换

开启一个进程一定会有一个线程,线程才是真正执行单位,节省内存资源

多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。

2.2 与进程的区别

进程中包含了运行该程序的所需要的所有资料,进程是一个资源单位,线程是CPU的最小执行单位,每一个进程一旦被创建,就默认开启了一条线程,称之为主线程。一个进程可以包含多个线程,进程包含线程,线程依赖进程。

进程对于操作系统的资源耗费非常高,而线程非常低(比进程低10-100倍); 2.在同一个进程中,多个线程之间资源是共享的。

开启进程

  1. 开辟一个名称空间,每开启一个进程都会占用一份内存资源

  2. 会自带一个线程

开启线程

  1. 一个线程可以开启多个线程,
  2. 线程的开销远小于进程,

注意: 线程不能实现并行,线程只能实现并发

2.3 创建线程

from threading import Thread调用线程

线程不需要在if __name__ == '__main__':下启动也可以执行.

方式1: 实例化线程对象

from threading import Thread
import time

def task():
    print('线程开启')
    time.sleep(1)
    print('线程结束')

# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
    # 实例化线程t
    t = Thread(target=task)
    t.start()

方式2:创建类

from threading import Thread
import time
class MyTh(Thread):
    def run(self):
        print('线程开启')
        time.sleep(1)
        print('线程结束')
if __name__ == '__main__':
    t = MyTh()
    t.start()

2.4 属性方法

# Thread实例对象的方法
  # isAlive(): 返回线程是否活动的。
  # getName(): 返回线程名。
  # setName(): 设置线程名。

# threading模块提供的一些方法:
  # threading.currentThread(): 返回当前的线程变量。
  # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

1.current_thread.name()

获得线程号

from threading import Thread
import time
from threading import current_thread

def task():
    print(f'线程开启{current_thread().name}')
    time.sleep(1)
    print(f'线程结束{current_thread().name}')

# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
    for i in range(3):
        # 实例化线程t
        t = Thread(target=task)
        t.start()

2.isAlive()

判断线程是否存活

def task():
    print(f'线程开启{current_thread().name}')
    time.sleep(1)
    print(f'线程结束{current_thread().name}')

# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
    # 实例化线程t
    t = Thread(target=task)
    print(t.isAlive())  # false
    t.start()
    print(t.isAlive())  # True
    print(t.is_alive()) # True

3. 守护线程

.daemon = True, 守护线程会等待主线程运行完毕后被销毁

#1.对主进程来说,运行完毕指的是主进程代码运行完毕

#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,

#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread
import time

def task():
    print('线程开启')
    time.sleep(0.1)
    print('线程结束')
if __name__ == '__main__':
    # 实例化线程t
    t = Thread(target=task)
    t.daemon = True
    t.start()
    print('主')

'''# 主进程结束线程即结束
线程开启
主'''

4 线程队列

queue.Queue()线程队列,线程之间使用,与进程的差不多

import queue

q = queue.Queue() # 默认创建多个线程
q.put(1)
q.put(2)
q.put(3)
print(q.get())  # 1
先进先出FIFO

普通的使用,默认就是先进先出

后进先出LIFO

queue.LifoQueue()

q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())  # 3
优先级队列

根据参数内数字的大小进行分级,数字值越小,优先级越高

传参时将元组传入,以数字或ascll码进行排序

import queue
q = queue.PriorityQueue()
q.put((3,'abc'))
q.put((10,'js'))
print(q.get())
'''abc'''

2.5 线程互斥锁

当多个进程或线程同时修改同一份数据时,可能会造成数据的错乱,所以必须要加锁。线程之间的数据是共享的

锁的使用方法与进程中锁的使用方法完全相同。

# 每个线程都会执行task修改操作,但每个人都是并发的,所以使用加锁进来串行操作

from threading import Thread
import time

from threading import Thread, Lock
import time

mutex = Lock()
n = 100
def task(i):
    print(f'线程{i}启动...')
    global n
    mutex.acquire()
    temp = n
    time.sleep(0.1)  # 一共等待10秒
    n = temp-1
    print(n)
    mutex.release()

if __name__ == '__main__':
    t_l=[]
    for i in range(100):
        t = Thread(target=task, args=(i, ))
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()

    # 100个线程都是在100-1
    print(n)

2.6 信号量

a、定义:信号量也是一种锁,他的特点是可以设置一个数据可以被几个线程(进程)共享。

b、与普通锁的区别:

普通锁一旦加锁,则意味着这个数据在同一时间只能被一个线程使用;而信号量可以让这个数据在统一时间被限定个数的多个进程使用。

semaphore是一个内置的计数器,用于控制进入指定数量的锁。

每当调用acquire()时,内置计数器-1
每当调用release()时,内置计数器+1

计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
应用场景
比如说在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件)。
又比如在做爬虫的时候,有时候爬取速度太快了,会导致被网站禁止,所以这个时候就需要控制爬虫爬取网站的频率。
from threading import  Semaphore
from threading import  current_thread
from threading import  Thread,Lock
import time

mutex = Lock()
sm = Semaphore(5) # 设置进入锁的数量
def task():
    # mutex.acquire()  # 一次进入1个(锁住)
    sm.acquire()    # 一次5个进入锁
    print(f'{current_thread().name}抢到执行权限')
    time.sleep(1)  # 进行延时操作
    sm.release()
    # mutex.release()  #  一次进入1个(解锁)

for l in range(20):
    t = Thread(target=task)
    t.start()

2.7 GIL全局解释器锁

GIL的优点:

  • 保证了CPython中的内存管理是线程安全的

GIL的缺点:

  • 互斥锁的特性使得多线程无法并行

1.定义

官方解释:
'''
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
'''

释义:
在CPython中,这个全局解释器锁,也称为GIL,是一个互斥锁,防止多个线程在同一时间执行Python字节码,这个锁是非常重要的,因为CPython的内存管理非线程安全的,很多其他的特性依赖于GIL,所以即使它影响了程序效率也无法将其直接去除

总结:
在CPython中,GIL会把线程的并行变成串行,导致效率降低

需要知道的是,解释器并不只有CPython,还有PyPy,JPython等等。GIL也仅存在与CPython中,这并不是Python这门语言的问题,而是CPython解释器的问题!

2. GIL带来的问题

首先必须明确执行一个py文件,分为三个步骤

  1. 从硬盘加载Python解释器到内存
  2. 从硬盘加载.py文件到内存
  3. 解释器解析.py文件内容,交给CPU执行

其次需要明确的是每当执行一个py文件,就会立即启动一个python解释器,

当执行test.py时其内存结构如下:

GIL,叫做全局解释器锁,加到了解释器上,并且是一把互斥锁,那么这把锁对应用程序到底有什么影响?

这就需要知道解释器的作用,以及解释器与应用程序代码之间的关系

py文件中的内容本质都是字符串,只有在被解释器解释时,才具备语法意义,解释器会将py代码翻译为当前系统支持的指令交给系统执行。

当进程中仅存在一条线程时,GIL锁的存在没有不会有任何影响,但是如果进程中有多个线程时,GIL锁就开始发挥作用了。如下图:

开启子线程时,给子线程指定了一个target表示该子线程要处理的任务即要执行的代码。代码要执行则必须交由解释器,即多个线程之间就需要共享解释器,为了避免共享带来的数据竞争问题,于是就给解释器加上了互斥锁!

由于互斥锁的特性,程序串行,保证数据安全,降低执行效率,GIL将使得程序整体效率降低!

3.为什么需要GIL

垃圾回收机制GC

在使用Python中进行编程时,程序员无需参与内存的管理工作,这是因为Python有自带的内存管理机制,简称GC。那么GC与GIL有什么关联?

要搞清楚这个问题,需先了解GC的工作原理,Python中内存管理使用的是引用计数,每个数会被加上一个整型的计数器,表示这个数据被引用的次数,当这个整数变为0时则表示该数据已经没有人使用,成了垃圾数据。

当内存占用达到某个阈值时,GC会将其他线程挂起,然后执行垃圾清理操作,垃圾清理也是一串代码,也就需要一条线程来执行。

实例代码

from threading import  Thread
def task():
    a = 10
    print(a)

# 开启三个子线程执行task函数
Thread(target=task).start()
Thread(target=task).start()
Thread(target=task).start()

上述代码内存结构如下:

通过上图可以看出,GC与其他线程都在竞争解释器的执行权,而CPU何时切换,以及切换到哪个线程都是无法预知的,这样一来就造成了竞争问题,假设线程1正在定义变量a=10,而定义变量第一步会先到到内存中申请空间把10存进去,第二步将10的内存地址与变量名a进行绑定,如果在执行完第一步后,CPU切换到了GC线程,GC线程发现10的地址引用计数为0则将其当成垃圾进行了清理,等CPU再次切换到线程1时,刚刚保存的数据10已经被清理掉了,导致无法正常定义变量。

当然其他一些涉及到内存的操作同样可能产生问题问题,为了避免GC与其他线程竞争解释器带来的问题,CPython简单粗暴的给解释器加了互斥锁,如下图所示:

有了GIL后,多个线程将不可能在同一时间使用解释器,从而保证了解释器的数据安全。

4. GIL加锁与解锁时机

加锁的时机:在调用解释器时立即加锁

解锁时机:

  • 当前线程遇到了IO时释放
  • 当前线程执行时间超过设定值时释放,解释器会检测线程的执行时间,一旦到达某个阈值,通知线程保存状态切换线程,以此来保证数据安全

5.自定义线程锁与GIL的区别

GIL保护的是解释器级别的数据安全,比如对象的引用计数,垃圾分代数据等等,具体参考垃圾回收机制详解。

对于程序中自己定义的数据则没有任何的保护效果,这一点在没有介绍GIL前我们就已经知道了,所以当程序中出现了共享自定义的数据时就要自己加锁,如下例:

from threading import Thread,Lock
import time

a = 0
def task():
    global a
    temp = a
    time.sleep(0.01) 
    a = temp + 1

t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)

过程分析:

1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL

2.线程2获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL

3.线程1睡醒后获得CPU执行权,并获取GIL执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU并释放GIL

4.线程2睡醒后获得CPU执行权,并获取GIL执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU并释放GIL,最后a的值也就是1

之所以出现问题是因为两个线程在并发的执行同一段代码,解决方案就是加锁!

from threading import Thread,Lock
import time

lock = Lock()
a = 0
def task():
    global a
    lock.acquire()
    temp = a
    time.sleep(0.01)
    a = temp + 1
    lock.release()

t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)

过程分析:

1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL,不释放lock

2.线程2获得CPU执行权,并获取GIL锁,尝试获取lock失败,无法执行,释放CPU并释放GIL

3.线程1睡醒后获得CPU执行权,并获取GIL继续执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为1

4.线程2获得CPU执行权,获取GIL锁,尝试获取lock成功,执行代码,得到a的值为1后进入睡眠,释放CPU并释放GIL,不释放lock

5.线程2睡醒后获得CPU执行权,获取GIL继续执行代码 ,将temp的值1+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为2

6. GIL性能属性

GIL的优点:

  • 保证了CPython中的内存管理是线程安全的

GIL的缺点:

  • 互斥锁的特性使得多线程无法并行

但我们并不能因此就否认Python这门语言,其原因如下:

  1. GIL仅仅在CPython解释器中存在,在其他的解释器中没有,并不是Python这门语言的缺点

  2. 在单核处理器下,多线程之间本来就无法真正的并行执行

  3. 在多核处理下,运算效率的确是比单核处理器高,但是要知道现代应用程序多数都是基于网络的(qq,微信,爬虫,浏览器等等),CPU的运行效率是无法决定网络速度的,而网络的速度是远远比不上处理器的运算速度,则意味着每次处理器在执行运算前都需要等待网络IO,这样一来多核优势也就没有那么明显了

    举个例子:

    任务1 从网络上下载一个网页,等待网络IO的时间为1分钟,解析网页数据花费,1秒钟

    任务2 将用户输入数据并将其转换为大写,等待用户输入时间为1分钟,转换为大写花费,1秒钟

单核CPU下:1.开启第一个任务后进入等待。2.切换到第二个任务也进入了等待。一分钟后解析网页数据花费1秒解析完成切换到第二个任务,转换为大写花费1秒,那么总耗时为:1分+1秒+1秒 = 1分钟2秒

多核CPU下:1.CPU1处理第一个任务等待1分钟,解析花费1秒钟。1.CPU2处理第二个任务等待1分钟,转换大写花费1秒钟。由于两个任务是并行执行的所以总的执行时间为1分钟+1秒钟 = 1分钟1秒

可以发现,多核CPU对于总的执行时间提升只有1秒,但是这边的1秒实际上是夸张了,转换大写操作不可能需要1秒,时间非常短!

上面的两个任务都是需要大量IO时间的,这样的任务称之为IO密集型,与之对应的是计算密集型即没有IO操作全都是计算任务。

对于计算密集型任务,Python多线程的确比不上其他语言!为了解决这个弊端,Python推出了多进程技术,可以良好的利用多核处理器来完成计算密集任务。

总结:

1.单核下无论是IO密集还是计算密集GIL都不会产生任何影响

2.多核下对于IO密集任务,GIL会有细微的影响,基本可以忽略

3.Cpython中IO密集任务应该采用多线程,计算密集型应该采用多进程

另外:之所以广泛采用CPython解释器,就是因为大量的应用程序都是IO密集型的,还有另一个很重要的原因是CPython可以无缝对接各种C语言实现的库,这对于一些数学计算相关的应用程序而言非常的happy,直接就能使用各种现成的算法

2.8 Event事件

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。

1. 定义

Event事件的作用: 用来控制线程

事件处理的机制:全局定义了一个内置标志Flag,如果Flag值为 False,那么当程序执行 event.wait方法时就会阻塞,如果Flag值为True,那么event.wait 方法时便不再阻塞。 

2. 方法

  • set() : 将标志设为True,使处于阻塞状态的线程恢复运行状态。
  • wait(): 如果标志为True将立即返回,否则线程阻塞(可传时间)
  • clear(): 将标志设为False。
  • isSet(): 获取内置标志状态,返回True或False。
from threading import Event
import time
from threading import  Thread
# 调用Event类实例化一个对象
e = Event()

def light():
    print('红灯亮...')
    time.sleep(5)
    # 发送信号true,其他所有进程准备执行
    e.set()  # flag变为true ,阻塞态变为运行
    print('绿灯亮...')

def car(name):
    print('正在等红灯...')
    # 所有汽车						

原文地址:https://www.cnblogs.com/fwzzz/p/11743223.html