Python Redis pipeline操作
Redis是建立在TCP协议基础上的CS架构,客户端client对redis server采取请求响应的方式交互。
一般来说客户端从提交请求到得到服务器相应,需要传送两个tcp报文。
设想这样的一个场景,你要批量的执行一系列redis命令,例如执行100次get key,这时你要向redis请求100次+获取响应100次。如果能一次性将100个请求提交给redis server,执行完成之后批量的获取相应,只需要向redis请求1次,然后批量执行完命令,一次性结果,性能是不是会好很多呢?
答案是肯定的,节约的时间是客户端client和服务器redis server之间往返网络延迟的时间。这个时间可以用ping命令查看。
网络延迟高:批量执行,性能提升明显
网络延迟低(本机):批量执行,性能提升不明显
某些客户端(java和python)提供了一种叫做pipeline的编程模式用来解决批量提交请求的方式。
这里我们用python客户端来举例说明一下。
1、pipeline
网络延迟
client与server机器之间网络延迟如下,大约是30ms。
测试用例
分别执行其中的try_pipeline和without_pipeline统计处理时间。
# -*- coding:utf-8 -*-
import redis
import time
from concurrent.futures import ProcessPoolExecutor
r = redis.Redis(host='10.93.84.53', port=6379, password='bigdata123')
def try_pipeline():
start = time.time()
with r.pipeline(transaction=False) as p:
p.sadd('seta', 1).sadd('seta', 2).srem('seta', 2).lpush('lista', 1).lrange('lista', 0, -1)
p.execute()
print time.time() - start
def without_pipeline():
start = time.time()
r.sadd('seta', 1)
r.sadd('seta', 2)
r.srem('seta', 2)
r.lpush('lista', 1)
r.lrange('lista', 0, -1)
print time.time() - start
def worker():
while True:
try_pipeline()
with ProcessPoolExecutor(max_workers=12) as pool:
for _ in range(10):
pool.submit(worker)
结果分析
try_pipeline平均处理时间:0.04659
without_pipeline平均处理时间:0.16672
我们的批量里有5个操作,在处理时间维度上性能提升了4倍!
网络延迟大约是30ms,不使用批量的情况下,网络上的时间损耗就有0.15s(30ms*5)以上。而pipeline批量操作只进行一次网络往返,所以延迟只有0.03s。可以看到节省的时间基本都是网路延迟。
2、pipeline与transation
pipeline不仅仅用来批量的提交命令,还用来实现事务transation。
这里对redis事务的讨论不会太多,只是给出一个demo。详细的描述你可以参见这篇博客。redis事务
细心的你可能发现了,使用transaction与否不同之处在与创建pipeline实例的时候,transaction是否打开,默认是打开的。
# -*- coding:utf-8 -*-
import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutor
r = redis.Redis(host='127.0.0.1', port=6379)
# 减库存函数, 循环直到减库存完成
# 库存充足, 减库存成功, 返回True
# 库存不足, 减库存失败, 返回False
def decr_stock():
# python中redis事务是通过pipeline的封装实现的
with r.pipeline() as pipe:
while True:
try:
# watch库存键, multi后如果该key被其他客户端改变, 事务操作会抛出WatchError异常
pipe.watch('stock:count')
count = int(pipe.get('stock:count'))
if count > 0: # 有库存
# 事务开始
pipe.multi()
pipe.decr('stock:count')
# 把命令推送过去
# execute返回命令执行结果列表, 这里只有一个decr返回当前值
print pipe.execute()[0]
return True
else:
return False
except WatchError, ex:
# 打印WatchError异常, 观察被watch锁住的情况
print ex
pipe.unwatch()
def worker():
while True:
# 没有库存就退出
if not decr_stock():
break
# 实验开始
# 设置库存为100
r.set("stock:count", 100)
# 多进程模拟多个客户端提交
with ProcessPoolExecutor(max_workers=2) as pool:
for _ in range(10):
pool.submit(worker)
- Elasticsearch聚合后分页深入详解
- 可扩展机器学习——线性回归(linear Regression)
- 简单易学的机器学习算法——Label Propagation
- 利用Theano理解深度学习——Convolutional Neural Networks
- 持续精进——我的2017年终总结
- 实战 | Elasticsearch打造知识库检索系统
- Elasticsearch实战 | 必要的时候,还得空间换时间!
- 转--以io.Writer为例看go中的interface{}
- Go支持https协议的简单例子
- Elasticsearch索引增量统计及定时邮件实现
- 机器学习中的特征空间
- 简单易学的机器学习算法——马尔可夫链蒙特卡罗方法MCMC
- 推荐算法——基于图的推荐算法PersonalRank算法
- 推荐算法——非负矩阵分解(NMF)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Oracle数据库相关函数解析
- Oracle用户操作、数据类型、表格修改、约束设置详解
- MyBatis三个查询方法_selectList_selectOne_selectMap
- 通过JDBC、DBUtil实现登陆的练习
- JDBC小项目—员工管理系统
- LOG4J(log for java)详解
- MyBatis-事务管理
- HTML知识清单(附学习网站)
- CSS、CSS3知识点清单
- linux使用MAT分析dump文件
- RabbitMQ 自动创建队列/交换器/绑定
- SpringBoot 整合Shiro实现动态权限加载更新+Session共享+单点登录
- 又一个布局利器, CSS 伪类 :placeholder-shown
- 聊聊微前端的原理和实践
- javax.el.PropertyNotFoundException: Property 'XXX' not found on type xx.xx.xx.xx(实体类具体路径)