[实战篇] Python 运维中使用并发
今天从大哥手里接了一个需求:
验证一下新的 Docker 镜像仓库(Docker Registry)是否迁移成功了
简单粗暴的方法就是拿到老仓库中的镜像列表(Image List),在新仓库模拟用户重新拉取(pull)一遍来验证,我们开始
subprocess
如果我们用 Shell 来写,执行 Docker 命令很容易,直接写就是了,但是对结果的判断就不那么友好了(Shell 大神忽略),那么 Python 呢,如何优雅的执行 Linux 命令呢?这里我们用到了一个 Python 标准库(standard module) :
import subprocess
我们都知道,命令执行过程中会有标准输出(stdout)和标准错误(stderror):
def run_cmd(cmd):
return subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE).communicate()
上面代码封装了一个方法,它会启动一个子进程执行命令,并将标准输出和标准错误通过管道(进程间通信最常用的方式)收集
管道其实就是文件描述符对,子进程会继承父进程中的所有文件描述符
最后,通过序列解包:
stdout, stderr = run_cmd('uname -a')
获取标准输出和标准错误,这个方法我们后面要用到好多
我拿到镜像列表文件了,先使用
cat imagelist | wc -l
查看了一下行数(镜像数量),4254 个,还行,不算太多
思路:
- 拉取列表中的镜像,拉取成功后将其删除并标记为成功
- 拉取失败就标记为失败和并记录错误
- 如果拉取超时,就标记超时
如何标记呢,因为我们将会使用多进程,多个进程间通信还是蛮麻烦的,这里偷个懒:直接使用 append 模式直接将结果写入文件
with open('timeout_image.txt','a') as timeout_file:
timeout_file.write(image)
我们先写出如何验证一个镜像的逻辑:
def pull_worker(image):
# ?
公众号代码支持太差了,可以去文末的点击阅读原文查看
后面就仅仅是并发的问题了
sys
首先我们想控制并发数量,最简单是使用 sys 模块
if len(sys.argv) == 4:
pass
else:
print “Need three params
return
# 这里同样使用了序列解包,第一个参数是脚本名字,忽略掉
_, file, coreNum, poolNum = sys.argv
这样的程序执行起来像这样:
python check_images.py imagelist 8 5
gevent
然后是实现,我们使用的这个模块需要安装,它是大名鼎鼎的 gevent,为什么使用它,因为我们的任务是 I/O密集 型的,gevent 擅长处理这类任务(有兴趣可以去了解下猴子补丁)
pip install gevent
我们看导入模块的代码:
import gevent.pool
import gevent.monkey
from gevent import Timeout
gevent.monkey.patch_all() # 猴子补丁
from multiprocessing import Process
最后一行也是使用了 Python 的标准库,多进程模块:multiprocessing
不要和我说什么Python 有全局解释器锁(GIL),多进程没有 GIL,多进程没有 GIL,多进程没有 GIL
如何并发呢:
- 启动和核数相等的进程(跑满机器,尽快完成任务为目的)
- 每个进程里面 docker pull 的并发为 5(gevent 协程池)
所以我们总的并发数就是 40,这样就完成了可控制并发的脚本
代码如下:
def each_process(task_object_list):
pool = gevent.pool.Pool(int(poolNum))
pool.map(pull_worker, task_object_list)
stop = time.time()
elapsed = stop - start
print "End precess with {0} s".format(elapsed)
with open(file) as f:
for line in f:
line = line.strip()
all_task_list.append(line)
print "All task: {0}".format(len(all_task_list))
for sliced_task_list in slice_list(all_task_list, int(coreNum)):
print "Start process with tasks: {0}".format(len(sliced_task_list))
p = Process(target=each_process, args=(sliced_task_list,))
p.start()
这里需要注意的一点是,4254 个镜像,是按照核心数量分组(slice_list),然后交给不同的进程处理的。
- 简单易学的机器学习算法——基于密度的聚类算法DBSCAN
- 厚土Go学习笔记 | 29. 接口
- Golang Template 简明笔记
- hotspare的copyback(r7笔记第30天)
- Spring-AOP
- DBA和开发同事的一些代沟(三)(r7笔记第29天)
- 简单易学的机器学习算法——非线性支持向量机
- 一条关于swap争用的报警邮件分析(一)(r7笔记第28天)
- Spring-IOC(2)
- Python3 pandas read_csv 读取txt文件报错:IOError: Initializing from file failed
- dataguard添加临时数据文件的bug(r7笔记第27天)
- 简单易学的机器学习算法——线性支持向量机
- Java设计模式-模板方式模式
- 由一条create语句的问题对比mysql和oracle中的date差别 (r7笔记第26天)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法